Author: david.lloyd(a)jboss.com
Date: 2008-07-02 20:44:19 -0400 (Wed, 02 Jul 2008)
New Revision: 4342
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientContextImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceContextImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/TaggingExecutor.java
Removed:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealClient.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealClientSource.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientInitiator.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientResponder.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceMarker.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreStream.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolClientInitiator.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolClientResponder.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestInitiator.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestResponder.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceInitiator.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceResponder.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageInput.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageOutput.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/IteratorStreamSerializerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSinkStreamSerializerFactory.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestListener.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/build.xml
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/StreamResolver.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ClassLoaderResourceListener.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ServiceLocatorListener.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/DefaultStreamDetector.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DecodingBuilder.java
remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/StreamingRot13RequestListener.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/StringRot13RequestListener.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
Get r3 building again (amazingly)
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-07-02
22:54:29 UTC (rev 4341)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -21,24 +21,6 @@
ConcurrentMap<Object, Object> getAttributes();
/**
- * Open an outbound session to another endpoint. The protocol used is determined by
the URI scheme. The URI user-info part
- * must be {@code null} unless the specific protocol has an additional authentication
scheme (e.g. HTTP BASIC). The
- * authority is used to locate the server (the exact interpretation is dependent upon
the protocol). The path may be
- * relative to a protocol-specific deployment path.
- *
- * You must have the TODO permission to invoke this method.
- *
- * @param remoteUri the URI of the server to connect to
- * @param attributeMap the attribute map to use to configure this session
- * @param rootListener the root request listener for this end of the session
- * @return a new session
- *
- * @throws RemotingException if there is a problem creating the session, or if the
request or reply type does not
- * match the remote service
- */
- Session openSession(URI remoteUri, AttributeMap attributeMap, RequestListener<?,
?> rootListener) throws RemotingException;
-
- /**
* Get the name of this endpoint.
*
* @return the endpoint name, or {@code null} if there is no name
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestListener.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestListener.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestListener.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -23,8 +23,7 @@
/**
* Handle a request. If this method throws {@code RemoteExecutionException}, then
that exception is passed
- * back to the caller and the request is marked as complete. If this method throws
{@code InterruptedException},
- * the request is cancelled, and the interrupted status is propagated to the
executor.. Otherwise, the request
+ * back to the caller and the request is marked as complete. Otherwise, the request
* listener must send back either a reply (using the {@code sendReply()} method on
the {@code RequestContext}) or
* an exception (using the {@code sendException()} method on the {@code
RequestContext}). Failure to do so may
* cause the client to hang indefinitely.
@@ -33,9 +32,8 @@
* @param request the received request
*
* @throws RemoteExecutionException if the execution failed in some way
- * @throws InterruptedException if the thread is interrupted
*/
- void handleRequest(RequestContext<O> context, I request) throws
RemoteExecutionException, InterruptedException;
+ void handleRequest(RequestContext<O> context, I request) throws
RemoteExecutionException;
/**
* Handle the close of a service.
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
(rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi;
+
+import org.jboss.cx.remoting.spi.remote.ReplyHandler;
+import org.jboss.cx.remoting.RequestCancelHandler;
+import org.jboss.cx.remoting.RequestContext;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * Utility methods for Remoting service providers.
+ */
+public final class SpiUtils {
+ private SpiUtils() {}
+
+ private static final Logger log = Logger.getLogger(SpiUtils.class);
+
+ /**
+ * Safely notify a reply handler of an exception.
+ *
+ * @param replyHandler the reply handler
+ * @param msg the message
+ * @param cause the cause
+ */
+ public static void safeHandleException(ReplyHandler<?> replyHandler, String
msg, Throwable cause) {
+ try {
+ replyHandler.handleException(msg, cause);
+ } catch (Throwable t) {
+ log.error(t, "Failed to properly handle exception");
+ }
+ }
+
+ /**
+ * Safely notify a reply handler of a reply.
+ *
+ * @param <O> the reply type
+ * @param replyHandler the reply handler
+ * @param reply the reply
+ */
+ public static <O> void safeHandleReply(ReplyHandler<O> replyHandler, O
reply) {
+ try {
+ replyHandler.handleReply(reply);
+ } catch (Throwable t) {
+ log.error(t, "Failed to properly handle reply");
+ }
+ }
+
+ /**
+ * Safely notify a reply handler of a cancellation.
+ *
+ * @param replyHandler the reply handler
+ */
+ public static void safeHandleCancellation(ReplyHandler<?> replyHandler) {
+ try {
+ replyHandler.handleCancellation();
+ } catch (Throwable t) {
+ log.error(t, "Failed to properly handle cancellation");
+ }
+ }
+
+ /**
+ * Safely notify a request listener's cancel handler of cancellation.
+ *
+ * @param handler the request cancel handler
+ * @param requestContext the request context
+ * @param mayInterrupt {@code true} if the request listener threads may be
interrupted
+ */
+ public static <O> void safeNotifyCancellation(RequestCancelHandler<O>
handler, RequestContext<O> requestContext, boolean mayInterrupt) {
+ try {
+ handler.notifyCancel(requestContext, mayInterrupt);
+ } catch (Throwable t) {
+ log.error(t, "Request cancel handler threw an exception when calling
notifyCancel()");
+ }
+ }
+
+
+}
+
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -36,7 +36,7 @@
/**
* Receive a request from a remote system. This method is intended to be called by
protocol handlers. If the
* request cannot be accepted for some reason, the
- * {@link org.jboss.cx.remoting.spi.remote.ReplyHandler#handleException(Throwable)}
+ * {@link ReplyHandler#handleException(String, Throwable)}
* method is called immediately.
*
* @param request the request
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -38,9 +38,10 @@
/**
* Handle a remote exception.
*
+ * @param msg the message
* @param cause the cause
*/
- void handleException(Throwable cause);
+ void handleException(final String msg, Throwable cause);
/**
* Handle a cancellation request.
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -9,6 +9,8 @@
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.Session;
import org.jboss.cx.remoting.SessionListener;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
import org.jboss.cx.remoting.util.AttributeMap;
/**
@@ -36,13 +38,6 @@
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public Session openSession(final URI remoteUri, final AttributeMap attributeMap,
final RequestListener<?, ?> rootListener) throws RemotingException {
- return delegate.openSession(remoteUri, attributeMap, rootListener);
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
public String getName() {
return delegate.getName();
}
@@ -50,14 +45,14 @@
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> Client<I, O> createClient(final RequestListener<I, O>
requestListener) {
+ public <I, O> RemoteClientEndpoint<I, O> createClient(final
RequestListener<I, O> requestListener) {
return delegate.createClient(requestListener);
}
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> ClientSource<I, O> createService(final
RequestListener<I, O> requestListener) {
+ public <I, O> RemoteServiceEndpoint<I,O> createService(final
RequestListener<I, O> requestListener) {
return delegate.createService(requestListener);
}
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-07-02 22:54:29 UTC (rev 4341)
+++ remoting3/trunk/build.xml 2008-07-03 00:44:19 UTC (rev 4342)
@@ -285,6 +285,7 @@
<path refid="util.classpath"/>
<path refid="version.classpath"/>
<pathelement
location="${lib.jboss-serialization.local}"/>
+ <pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
<touch file="core/target/main/.lastcompile"
verbose="false"/>
@@ -561,6 +562,7 @@
<path refid="srp.classpath"/>
<path refid="standalone.classpath"/>
<path refid="util.classpath"/>
+ <pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
<touch file="samples/target/main/.lastcompile"
verbose="false"/>
@@ -636,6 +638,7 @@
<compilerarg value="-Xlint:unchecked"/>
<classpath>
<path refid="util.classpath"/>
+ <pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
<touch file="srp/target/main/.lastcompile"
verbose="false"/>
@@ -982,8 +985,10 @@
<doctitle><![CDATA[<h1>JBoss Remoting
3</h1>]]></doctitle>
<bottom><![CDATA[<i>Copyright © 2008 JBoss, a
division of Red Hat, Inc.</i>]]></bottom>
<link
href="http://java.sun.com/j2se/1.5.0/docs/api/"/>
+ <link
href="http://docs.jboss.org/xnio/1.0/api/"/>
<classpath>
<path refid="core.classpath"/>
+ <pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javadoc>
</target>
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealClient.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealClient.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealClient.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,31 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import org.jboss.cx.remoting.Client;
-
-/**
- *
- */
-public abstract class AbstractRealClient<I, O> implements Client<I, O> {
-
- private ClientResponder<I,O> clientResponder;
- private ClassLoader classLoader;
-
- protected AbstractRealClient(final ClientResponder<I, O> clientResponder, final
ClassLoader classLoader) {
- if (clientResponder == null) {
- throw new NullPointerException("clientResponder is null");
- }
- if (classLoader == null) {
- throw new NullPointerException("classLoader is null");
- }
- this.clientResponder = clientResponder;
- this.classLoader = classLoader;
- }
-
- protected ClientResponder<I, O> getContextServer() {
- return clientResponder;
- }
-
- public ClassLoader getClassLoader() {
- return classLoader;
- }
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealClientSource.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealClientSource.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealClientSource.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,21 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import org.jboss.cx.remoting.ClientSource;
-
-/**
- *
- */
-public abstract class AbstractRealClientSource<I, O> implements ClientSource<I,
O> {
- private ServiceResponder<I, O> serviceResponder;
-
- protected AbstractRealClientSource(final ServiceResponder<I, O>
serviceResponder) {
- if (serviceResponder == null) {
- throw new NullPointerException("serviceResponder is null");
- }
- this.serviceResponder = serviceResponder;
- }
-
- public ServiceResponder<I, O> getServiceServer() {
- return serviceResponder;
- }
-}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientContextImpl.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientContextImpl.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.ClientContext;
+import org.jboss.cx.remoting.ServiceContext;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public final class ClientContextImpl implements ClientContext {
+
+ private Executor executor;
+
+ public ConcurrentMap<Object, Object> getAttributes() {
+ return null;
+ }
+
+ public ServiceContext getServiceContext() {
+ return null;
+ }
+
+ public void close() throws RemotingException {
+ }
+
+ public void addCloseHandler(final CloseHandler<ClientContext>
clientContextCloseHandler) {
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientInitiator.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientInitiator.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientInitiator.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,10 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import org.jboss.cx.remoting.RemotingException;
-
-/**
- *
- */
-public interface ClientInitiator {
- void handleClosing(boolean done) throws RemotingException;
-}
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,27 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.io.Serializable;
-
-/**
- *
- */
-public final class ClientMarker implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private ClientIdentifier clientIdentifer;
-
- public ClientMarker() {
- }
-
- public ClientMarker(final ClientIdentifier clientIdentifer) {
- this.clientIdentifer = clientIdentifer;
- }
-
- public ClientIdentifier getClientIdentifer() {
- return clientIdentifer;
- }
-
- public void setContextIdentifer(final ClientIdentifier clientIdentifer) {
- this.clientIdentifer = clientIdentifer;
- }
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientResponder.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientResponder.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientResponder.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,12 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import org.jboss.cx.remoting.RemotingException;
-
-/**
- *
- */
-public interface ClientResponder<I, O> {
- RequestResponder<I> createNewRequest(RequestInitiator<O>
requestInitiator) throws RemotingException;
-
- void handleClose(boolean immediate, boolean cancel) throws RemotingException;
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceMarker.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceMarker.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceMarker.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,27 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.io.Serializable;
-
-/**
- *
- */
-public final class ClientSourceMarker implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private ServiceIdentifier serviceIdentifier;
-
- public ClientSourceMarker() {
- }
-
- public ClientSourceMarker(final ServiceIdentifier serviceIdentifier) {
- this.serviceIdentifier = serviceIdentifier;
- }
-
- public ServiceIdentifier getServiceIdentifier() {
- return serviceIdentifier;
- }
-
- public void setServiceIdentifier(final ServiceIdentifier serviceIdentifier) {
- this.serviceIdentifier = serviceIdentifier;
- }
-}
\ No newline at end of file
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,276 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.jboss.cx.remoting.Client;
-import org.jboss.cx.remoting.ClientSource;
-import org.jboss.cx.remoting.Endpoint;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.RequestListener;
-import org.jboss.cx.remoting.Session;
-import org.jboss.cx.remoting.SessionListener;
-import org.jboss.cx.remoting.core.util.OrderedExecutorFactory;
-import org.jboss.xnio.log.Logger;
-import org.jboss.cx.remoting.util.AtomicStateMachine;
-import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.util.NamingThreadFactory;
-import org.jboss.cx.remoting.version.Version;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-
-/**
- *
- */
-public class CoreEndpoint implements Endpoint {
-
- static {
- // Print Remoting "greeting" message
- Logger.getLogger("org.jboss.cx.remoting").info("JBoss Remoting
version %s", Version.VERSION);
- }
-
- private enum State implements org.jboss.cx.remoting.util.State<State> {
- INITIAL,
- UP,
- DOWN;
-
- public boolean isReachable(final State dest) {
- return compareTo(dest) < 0;
- }
- }
-
- private String name;
-
- private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.INITIAL);
- private final Set<SessionListener> sessionListeners =
CollectionUtil.synchronizedSet(new LinkedHashSet<SessionListener>());
-
- private OrderedExecutorFactory orderedExecutorFactory;
- private ExecutorService executorService;
-
- private final ConcurrentMap<Object, Object> endpointMap =
CollectionUtil.concurrentMap();
- private final ConcurrentMap<String, CoreProtocolRegistration> protocolMap =
CollectionUtil.concurrentMap();
- private final Set<CoreSession> sessions =
CollectionUtil.synchronizedSet(CollectionUtil.<CoreSession>hashSet());
-
- public CoreEndpoint() {
- }
-
- // Dependencies
-
- private Executor executor;
-
- public Executor getExecutor() {
- return executor;
- }
-
- public Executor getOrderedExecutor() {
- return orderedExecutorFactory.getOrderedExecutor();
- }
-
- public void setExecutor(final Executor executor) {
- this.executor = executor;
- orderedExecutorFactory = new OrderedExecutorFactory(executor);
- }
-
- // Configuration
-
- public void setName(final String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
- // Lifecycle
-
- public void create() {
- // todo security check
- }
-
- public void start() {
- // todo security check
- if (executor == null) {
- executorService = Executors.newCachedThreadPool(new
NamingThreadFactory(Executors.defaultThreadFactory(), "Remoting endpoint %s"));
- setExecutor(executorService);
- }
- state.requireTransition(State.INITIAL, State.UP);
- }
-
- public void stop() {
- // todo security check
- if (executorService != null) {
- executorService.shutdown();
- executorService = null;
- }
- // todo
- }
-
- public void destroy() {
- executor = null;
- }
-
-
- // Endpoint implementation
-
- public ConcurrentMap<Object, Object> getAttributes() {
- return endpointMap;
- }
-
- public Session openSession(final URI uri, final AttributeMap attributeMap, final
RequestListener<?, ?> rootListener) throws RemotingException {
- if (uri == null) {
- throw new NullPointerException("uri is null");
- }
- if (attributeMap == null) {
- throw new NullPointerException("attributeMap is null");
- }
- final String scheme = uri.getScheme();
- if (scheme == null) {
- throw new RemotingException("No scheme on remote endpoint URI");
- }
- state.requireHold(State.UP);
- try {
- final CoreProtocolRegistration registration = protocolMap.get(scheme);
- if (registration == null) {
- throw new RemotingException("No handler available for URI scheme
\"" + scheme + "\"");
- }
- final ProtocolHandlerFactory factory =
registration.getProtocolHandlerFactory();
- try {
- final CoreSession session = new CoreSession(CoreEndpoint.this);
- session.initializeClient(factory, uri, attributeMap,
createClient(rootListener));
- sessions.add(session);
- final Session userSession = session.getUserSession();
- for (final SessionListener listener : sessionListeners) {
- executor.execute(new Runnable() {
- public void run() {
- listener.handleSessionOpened(userSession);
- }
- });
- }
- return userSession;
- } catch (IOException e) {
- RemotingException rex = new RemotingException("Failed to create
protocol handler: " + e.getMessage());
- rex.setStackTrace(e.getStackTrace());
- throw rex;
- }
- } finally {
- state.release();
- }
- }
-
- public ProtocolContext openSession(final ProtocolHandler handler, final
RequestListener<?, ?> rootListener) throws RemotingException {
- state.requireHold(State.UP);
- try {
- final CoreSession session = new CoreSession(CoreEndpoint.this);
- session.initializeServer(handler, rootListener == null ? null :
createClient(rootListener));
- sessions.add(session);
- return session.getProtocolContext();
- } finally {
- state.release();
- }
- }
-
- public Registration registerProtocol(final String scheme, final
ProtocolHandlerFactory protocolHandlerFactory) throws RemotingException,
IllegalArgumentException {
- if (scheme == null) {
- throw new NullPointerException("scheme is null");
- }
- if (protocolHandlerFactory == null) {
- throw new NullPointerException("protocolHandlerFactory is null");
- }
- state.requireHold(State.UP);
- try {
- final CoreProtocolRegistration registration = new
CoreProtocolRegistration(protocolHandlerFactory);
- protocolMap.put(scheme, registration);
- return registration;
- } finally {
- state.release();
- }
- }
-
- public <I, O> Client<I, O> createClient(RequestListener<I, O>
requestListener) {
- final CoreInboundClient<I, O> inbound = new CoreInboundClient<I,
O>(requestListener, executor);
- final CoreOutboundClient<I, O> outbound = new CoreOutboundClient<I,
O>(executor);
- inbound.initialize(outbound.getClientInitiator());
- outbound.initialize(inbound.getClientResponder());
- return outbound.getUserContext();
- }
-
- public <I, O> ClientSource<I, O> createService(RequestListener<I,
O> requestListener) {
- final CoreInboundService<I, O> inbound = new CoreInboundService<I,
O>(requestListener, executor);
- final CoreOutboundService<I, O> outbound = new CoreOutboundService<I,
O>(executor);
- inbound.initialize(outbound.getServiceClient());
- outbound.initialize(inbound.getServiceResponder());
- return outbound.getUserContextSource();
- }
-
- public void addSessionListener(final SessionListener sessionListener) {
- // TODO security check
- sessionListeners.add(sessionListener);
- }
-
- public void removeSessionListener(final SessionListener sessionListener) {
- // TODO security check
- sessionListeners.remove(sessionListener);
- }
-
- void removeSession(CoreSession coreSession) {
- synchronized (sessions) {
- if (!sessions.remove(coreSession)) {
- return;
- }
- sessions.notifyAll();
- }
- }
-
- public final class CoreProtocolRegistration implements Registration {
- private final ProtocolHandlerFactory protocolHandlerFactory;
-
- private CoreProtocolRegistration(final ProtocolHandlerFactory
protocolHandlerFactory) {
- this.protocolHandlerFactory = protocolHandlerFactory;
- }
-
- public void start() {
- }
-
- public void stop() {
- }
-
- public void unregister() {
- }
-
- private ProtocolHandlerFactory getProtocolHandlerFactory() {
- return protocolHandlerFactory;
- }
- }
-
- public static final class SimpleClientCallbackHandler implements CallbackHandler {
- private final String userName;
- private final char[] password;
-
- public SimpleClientCallbackHandler(final String userName, final char[] password)
{
- this.userName = userName;
- this.password = password;
- }
-
- public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
- for (Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- ((NameCallback) callback).setName(userName);
- } else if (callback instanceof PasswordCallback) {
- ((PasswordCallback) callback).setPassword(password);
- } else {
- throw new UnsupportedCallbackException(callback, "This handler
only supports username/password callbacks");
- }
- }
- }
- }
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,148 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.ClientContext;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.RequestListener;
-import org.jboss.cx.remoting.ServiceContext;
-import org.jboss.xnio.log.Logger;
-import org.jboss.cx.remoting.util.AtomicStateMachine;
-import static org.jboss.cx.remoting.util.AtomicStateMachine.start;
-import org.jboss.cx.remoting.util.CollectionUtil;
-import static org.jboss.cx.remoting.util.CollectionUtil.synchronizedHashSet;
-
-/**
- *
- */
-public final class CoreInboundClient<I, O> {
- private static final Logger log =
org.jboss.xnio.log.Logger.getLogger(CoreInboundClient.class);
-
- private final RequestListener<I, O> requestListener;
- private final Executor executor;
- private final ServiceContext serviceContext;
- private final Set<CoreInboundRequest<I, O>> requests =
synchronizedHashSet();
- private final AtomicStateMachine<State> state = start(State.NEW);
- private final ClientContext clientContext = new UserClientContext();
-
- private ClientInitiator clientInitiator;
- private ConcurrentMap<Object, Object> attributes =
CollectionUtil.concurrentMap();
-
- private enum State implements org.jboss.cx.remoting.util.State<State> {
- NEW,
- UP,
- STOPPING,
- DOWN;
-
- public boolean isReachable(final State dest) {
- return compareTo(dest) < 0;
- }
- }
-
- public CoreInboundClient(final RequestListener<I, O> requestListener, final
Executor executor) {
- this.requestListener = requestListener;
- this.executor = executor;
- serviceContext = null;
- }
-
- public CoreInboundClient(final RequestListener<I, O> requestListener, final
Executor executor, final ServiceContext serviceContext) {
- this.requestListener = requestListener;
- this.executor = executor;
- this.serviceContext = serviceContext;
- }
-
- public ClientResponder<I, O> getClientResponder() {
- return new Responder();
- }
-
- public void initialize(final ClientInitiator clientInitiator) {
- state.requireTransitionExclusive(State.NEW, State.UP);
- this.clientInitiator = clientInitiator;
- state.releaseDowngrade();
- try {
- if (requestListener != null) {
- requestListener.handleClientOpen(clientContext);
- }
- } finally {
- state.release();
- }
- }
-
- public void remove(final CoreInboundRequest<I, O> request) {
- final State current = state.getStateHold();
- try {
- requests.remove(request);
- if (current != State.STOPPING) {
- return;
- }
- } finally {
- state.release();
- }
- if (requests.isEmpty()) {
- state.transition(State.STOPPING, State.DOWN);
- }
- }
-
- // Accessors
-
- public ClientContext getClientContext() {
- return clientContext;
- }
-
- // Support classes
-
- public final class Responder implements ClientResponder<I, O> {
- private Responder() {
- }
-
- public RequestResponder<I> createNewRequest(final RequestInitiator<O>
requestInitiator) throws RemotingException {
- if (state.inHold(State.UP)) try {
- final CoreInboundRequest<I, O> inboundRequest = new
CoreInboundRequest<I, O>(requestListener, executor, clientContext);
- inboundRequest.initialize(requestInitiator);
- requests.add(inboundRequest);
- return inboundRequest.getRequestResponder();
- } finally {
- state.release();
- } else {
- throw new RemotingException("Client is not up");
- }
- }
-
- public void handleClose(final boolean immediate, final boolean cancel) throws
RemotingException {
- if (state.transition(State.UP, State.STOPPING)) {
- clientInitiator.handleClosing(false);
- if (immediate || cancel) {
- for (CoreInboundRequest<I, O> inboundRequest : requests) {
- try {
-
inboundRequest.getRequestResponder().handleCancelRequest(immediate );
- } catch (Exception e) {
- log.trace("Failed to notify inbound request of
cancellation upon context close: %s", e);
- }
- }
- }
- }
- }
- }
-
- public final class UserClientContext implements ClientContext {
- private UserClientContext() {
- }
-
- public ConcurrentMap<Object, Object> getAttributes() {
- return attributes;
- }
-
- public ServiceContext getServiceContext() {
- return serviceContext;
- }
-
- public void close() throws RemotingException {
- clientInitiator.handleClosing(false);
- }
-
- public void addCloseHandler(final CloseHandler<ClientContext>
contextContextCloseHandler) {
- }
- }
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,231 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.RequestCancelHandler;
-import org.jboss.cx.remoting.RequestContext;
-import org.jboss.cx.remoting.RequestListener;
-import org.jboss.cx.remoting.ClientContext;
-import org.jboss.cx.remoting.util.AtomicStateMachine;
-
-/**
- *
- */
-public final class CoreInboundRequest<I, O> {
- private static final org.jboss.xnio.log.Logger log =
org.jboss.xnio.log.Logger.getLogger(CoreInboundRequest.class);
-
- private final RequestListener<I,O> requestListener;
- private final Executor executor;
- private final ClientContext clientContext;
-
- private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.INITIAL);
- private final UserRequestContext userRequestContext = new UserRequestContext();
- private final RequestResponder<I> request = new Request();
-
- private RequestInitiator<O> requestInitiator;
-
- /**
- * @protectedby {@code this}
- */
- private boolean mayInterrupt;
- /**
- * @protectedby {@code this}
- */
- private boolean cancel;
- /**
- * @protectedby {@code this}
- */
- private Set<Thread> tasks;
- /**
- * @protectedby {@code this}
- */
- private List<RequestCancelHandler<O>> cancelHandlers;
-
- public CoreInboundRequest(final RequestListener<I, O> requestListener, final
Executor executor, final ClientContext clientContext) {
- this.requestListener = requestListener;
- this.executor = executor;
- this.clientContext = clientContext;
- }
-
- private enum State implements org.jboss.cx.remoting.util.State<State> {
- INITIAL,
- UNSENT,
- SENT,
- TERMINATED;
-
- public boolean isReachable(final State dest) {
- return compareTo(dest) < 0;
- }
- }
-
- public void initialize(final RequestInitiator<O> requestInitiator) {
- state.requireTransitionExclusive(State.INITIAL, State.UNSENT);
- this.requestInitiator = requestInitiator;
- state.releaseExclusive();
- }
-
- public RequestResponder<I> getRequestResponder() {
- return request;
- }
-
- /**
- * Execute the given command. The command will be sensitive to interruption if the
request is cancelled.
- *
- * @param command the command to execute
- */
- private void executeTagged(final Runnable command) {
- executor.execute(new Runnable() {
- public void run() {
- final Thread thread = Thread.currentThread();
- synchronized(CoreInboundRequest.this) {
- if (tasks == null) {
- tasks = new HashSet<Thread>();
- }
- tasks.add(thread);
- }
- try {
- command.run();
- } finally {
- synchronized(CoreInboundRequest.this) {
- tasks.remove(thread);
- }
- }
- }
- });
- }
-
- public final class Request implements RequestResponder<I> {
- public void handleCancelRequest(final boolean mayInterrupt) {
- synchronized(CoreInboundRequest.this) {
- if (! cancel) {
- cancel = true;
- CoreInboundRequest.this.mayInterrupt |= mayInterrupt;
- if (mayInterrupt) {
- if (tasks != null) {
- for (Thread t : tasks) {
- t.interrupt();
- }
- }
- }
- if (cancelHandlers != null) {
- final Iterator<RequestCancelHandler<O>> i =
cancelHandlers.iterator();
- while (i.hasNext()) {
- final RequestCancelHandler<O> handler = i.next();
- i.remove();
- executor.execute(new Runnable() {
- public void run() {
- handler.notifyCancel(userRequestContext,
mayInterrupt);
- }
- });
- }
- }
- }
- }
- }
-
- public void handleRequest(final I request, final Executor streamExecutor) {
- executeTagged(new Runnable() {
- public void run() {
- try {
- requestListener.handleRequest(userRequestContext, request);
- } catch (InterruptedException e) {
- final boolean wasCancelled;
- synchronized(CoreInboundRequest.this) {
- wasCancelled = cancel;
- }
- if (state.transition(State.UNSENT, State.SENT)) {
- if (wasCancelled) {
- try {
- requestInitiator.handleCancelAcknowledge();
- } catch (RemotingException e1) {
- try {
- requestInitiator.handleException(new
RemoteExecutionException("Failed to send a cancel ack to client: " +
e1.toString(), e1));
- } catch (RemotingException e2) {
- log.debug("Tried and failed to send an
exception (%s): %s", e1, e2);
- }
- }
- } else {
- try {
- requestInitiator.handleException(new
RemoteExecutionException("Execution failed: " + e.toString(), e));
- } catch (RemotingException e1) {
- log.debug("Tried and failed to send an exception
(%s): %s", e, e1);
- }
- }
- log.trace(e, "Request listener %s recevied an
exception", requestListener);
- }
- } catch (Throwable e) {
- if (state.transition(State.UNSENT, State.SENT)) {
- try {
- if (e instanceof RemoteExecutionException) {
-
requestInitiator.handleException((RemoteExecutionException) e);
- } else {
- requestInitiator.handleException(new
RemoteExecutionException("Execution failed: " + e.toString(), e));
- }
- } catch (RemotingException e1) {
- log.debug("Tried and failed to send an exception
(%s): %s", e, e1);
- }
- }
- log.trace(e, "Request listener %s recevied an
exception", requestListener);
- }
- }
- });
- }
- }
-
- public final class UserRequestContext implements RequestContext<O> {
- private UserRequestContext() {}
-
- public ClientContext getContext() {
- return clientContext;
- }
-
- public boolean isCancelled() {
- synchronized(CoreInboundRequest.this) {
- return cancel;
- }
- }
-
- public void sendReply(final O reply) throws RemotingException,
IllegalStateException {
- state.requireTransition(State.UNSENT, State.SENT);
- requestInitiator.handleReply(reply);
- }
-
- public void sendFailure(final String msg, final Throwable cause) throws
RemotingException, IllegalStateException {
- state.requireTransition(State.UNSENT, State.SENT);
- final RemoteExecutionException rex = new RemoteExecutionException(msg,
cause);
- rex.setStackTrace(cause.getStackTrace());
- requestInitiator.handleException(rex);
- }
-
- public void sendCancelled() throws RemotingException, IllegalStateException {
- state.requireTransition(State.UNSENT, State.SENT);
- requestInitiator.handleCancelAcknowledge();
- }
-
- public void addCancelHandler(final RequestCancelHandler<O>
requestCancelHandler) {
- final boolean mayInterrupt;
- synchronized(CoreInboundRequest.this) {
- if (!cancel) {
- if (cancelHandlers == null) {
- cancelHandlers = new
LinkedList<RequestCancelHandler<O>>();
- }
- cancelHandlers.add(requestCancelHandler);
- return;
- }
- // otherwise, unlock and notify now
- mayInterrupt = CoreInboundRequest.this.mayInterrupt;
- }
- requestCancelHandler.notifyCancel(this, mayInterrupt);
- }
-
- public void execute(final Runnable command) {
- executeTagged(command);
- }
- }
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,114 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.RequestListener;
-import org.jboss.cx.remoting.ServiceContext;
-import org.jboss.cx.remoting.util.AtomicStateMachine;
-import static org.jboss.cx.remoting.util.AtomicStateMachine.start;
-import org.jboss.cx.remoting.util.CollectionUtil;
-
-/**
- *
- */
-public final class CoreInboundService<I, O> {
-
- private final RequestListener<I, O> requestListener;
- private final Executor executor;
-
- private final ServiceContext serviceContext = new UserServiceContext();
- private final AtomicStateMachine<State> state = start(State.INITIAL);
- private final ConcurrentMap<Object, Object> attributes =
CollectionUtil.concurrentMap();
-
- private ServiceInitiator serviceInitiator;
- private final Set<CloseHandler<ServiceContext>> closeHandlers =
CollectionUtil.synchronizedSet(new
LinkedHashSet<CloseHandler<ServiceContext>>());
-
- private enum State implements org.jboss.cx.remoting.util.State<State> {
- INITIAL,
- UP,
- DOWN;
-
- public boolean isReachable(final State dest) {
- return compareTo(dest) < 0;
- }
- }
-
- public CoreInboundService(final RequestListener<I, O> requestListener, final
Executor executor) {
- this.requestListener = requestListener;
- this.executor = executor;
- }
-
- public void initialize(final ServiceInitiator serviceInitiator) {
- state.requireTransitionExclusive(State.INITIAL, State.UP);
- this.serviceInitiator = serviceInitiator;
- state.releaseDowngrade();
- try {
- requestListener.handleServiceOpen(serviceContext);
- } finally {
- state.release();
- }
- }
-
- private void doClose() {
- if (state.transition(State.DOWN)) {
- synchronized (closeHandlers) {
- for (final CloseHandler<ServiceContext> closeHandler :
closeHandlers) {
- executor.execute(new Runnable() {
- public void run() {
- closeHandler.handleClose(serviceContext);
- }
- });
- }
- closeHandlers.clear();
- }
- }
- }
-
- public ServiceResponder<I, O> getServiceResponder() {
- return new ServiceResponder<I, O>() {
- public void handleClose() throws RemotingException {
- doClose();
- }
-
- public ClientResponder<I, O> createNewClient(final ClientInitiator
clientInitiator) {
- final CoreInboundClient<I, O> client = new CoreInboundClient<I,
O>(requestListener, executor, serviceContext);
- client.initialize(clientInitiator);
- return client.getClientResponder();
- }
- };
- }
-
- public final class UserServiceContext implements ServiceContext {
- private UserServiceContext() {
- }
-
- public ConcurrentMap<Object, Object> getAttributes() {
- return attributes;
- }
-
- public void close() throws RemotingException {
- doClose();
- serviceInitiator.handleClosing();
- }
-
- public void addCloseHandler(final CloseHandler<ServiceContext>
closeHandler) {
- final State current = state.getStateHold();
- try {
- switch (current) {
- case DOWN:
- closeHandler.handleClose(this);
- break;
- default:
- closeHandlers.add(closeHandler);
- break;
- }
- } finally {
- state.release();
- }
- }
- }
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,182 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.Client;
-import org.jboss.cx.remoting.FutureReply;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.RequestCompletionHandler;
-import org.jboss.cx.remoting.core.util.QueueExecutor;
-import org.jboss.cx.remoting.util.AtomicStateMachine;
-import org.jboss.cx.remoting.util.CollectionUtil;
-
-/**
- *
- */
-public final class CoreOutboundClient<I, O> {
- private static final org.jboss.xnio.log.Logger log =
org.jboss.xnio.log.Logger.getLogger(CoreOutboundClient.class);
-
- private final ConcurrentMap<Object, Object> contextMap =
CollectionUtil.concurrentMap();
- private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.INITIAL);
- private final ClientInitiator clientInitiator = new ClientInitiatorImpl();
- private final Set<CloseHandler<Client<I, O>>> closeHandlers =
CollectionUtil.synchronizedSet(new LinkedHashSet<CloseHandler<Client<I,
O>>>());
- private final Executor executor;
-
- private Client<I, O> userClient;
- private ClientResponder<I, O> clientResponder;
-
- public CoreOutboundClient(final Executor executor) {
- this.executor = executor;
- }
-
- public void initialize(final ClientResponder<I, O> clientResponder) {
- state.requireTransitionExclusive(State.INITIAL, State.UP);
- this.clientResponder = clientResponder;
- userClient = new UserClient();
- state.releaseExclusive();
- }
-
- private enum State implements org.jboss.cx.remoting.util.State<State> {
- INITIAL,
- UP,
- STOPPING,
- DOWN,;
-
- public boolean isReachable(final State dest) {
- return compareTo(dest) < 0;
- }
- }
-
- // Getters
-
- Client<I,O> getUserContext() {
- return userClient;
- }
-
- ClientInitiator getClientInitiator() {
- return clientInitiator;
- }
-
- // Other mgmt
-
- protected void finalize() throws Throwable {
- try {
- super.finalize();
- } finally {
- // todo close it
- log.trace("Leaked a context instance: %s", this);
- }
- }
-
- void invalidate() {
- // An extra instance was mistakenly created; we'll kill this one and quietly
step away
- state.transition(State.DOWN);
- userClient = null;
- clientResponder = null;
- }
-
- public final class UserClient extends AbstractRealClient<I, O> {
-
- private UserClient() {
- super(clientResponder, Thread.currentThread().getContextClassLoader() /* TODO
*/);
- }
-
- private void doClose(final boolean immediate, final boolean cancel) throws
RemotingException {
- state.waitForNot(State.INITIAL);
- if (state.transitionHold(State.UP, State.STOPPING)) try {
- synchronized (closeHandlers) {
- for (final CloseHandler<Client<I, O>> handler :
closeHandlers) {
- executor.execute(new Runnable() {
- public void run() {
- handler.handleClose(UserClient.this);
- }
- });
- }
- closeHandlers.clear();
- }
- clientResponder.handleClose(immediate, cancel);
- } finally {
- state.release();
- }
- }
-
- public void close() throws RemotingException {
- doClose(false, false);
- }
-
- public void addCloseHandler(final CloseHandler<Client<I, O>>
closeHandler) {
- final State current = state.getStateHold();
- try {
- switch (current) {
- case STOPPING:
- case DOWN:
- closeHandler.handleClose(this);
- break;
- default:
- closeHandlers.add(closeHandler);
- break;
- }
- } finally {
- state.release();
- }
- }
-
- public O invoke(final I request) throws RemotingException,
RemoteExecutionException {
- state.requireHold(State.UP);
- try {
- final QueueExecutor queueExecutor = new QueueExecutor();
- final CoreOutboundRequest<I, O> outboundRequest = new
CoreOutboundRequest<I, O>();
- final RequestResponder<I> requestResponder =
clientResponder.createNewRequest(outboundRequest.getReplier());
- outboundRequest.setRequestResponder(requestResponder);
- requestResponder.handleRequest(request, queueExecutor);
- final FutureReply<O> futureReply =
outboundRequest.getFutureReply();
- futureReply.addCompletionNotifier(new RequestCompletionHandler<O>()
{
- public void notifyComplete(final FutureReply<O> futureReply) {
- queueExecutor.shutdown();
- }
- });
- queueExecutor.runQueue();
- return futureReply.get();
- } finally {
- state.release();
- }
- }
-
- public FutureReply<O> send(final I request) throws RemotingException {
- state.requireHold(State.UP);
- try {
- final CoreOutboundRequest<I, O> outboundRequest = new
CoreOutboundRequest<I, O>();
- final RequestResponder<I> requestTerminus =
clientResponder.createNewRequest(outboundRequest.getReplier());
- outboundRequest.setRequestResponder(requestTerminus);
- requestTerminus.handleRequest(request, executor);
- return outboundRequest.getFutureReply();
- } finally {
- state.release();
- }
- }
-
- public void sendOneWay(final I request) throws RemotingException {
- state.requireHold(State.UP);
- try {
- final RequestResponder<I> requestResponder =
clientResponder.createNewRequest(null);
- requestResponder.handleRequest(request, executor);
- } finally {
- state.release();
- }
- }
-
- public ConcurrentMap<Object, Object> getAttributes() {
- return contextMap;
- }
- }
-
- public final class ClientInitiatorImpl implements ClientInitiator {
- public void handleClosing(boolean done) throws RemotingException {
- // todo - remote side is closing
- }
- }
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,266 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.TimeUnit;
-import org.jboss.cx.remoting.FutureReply;
-import org.jboss.cx.remoting.IndeterminateOutcomeException;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.RequestCompletionHandler;
-import org.jboss.xnio.log.Logger;
-import org.jboss.cx.remoting.util.AtomicStateMachine;
-
-/**
- *
- */
-public final class CoreOutboundRequest<I, O> {
-
- private static final org.jboss.xnio.log.Logger log =
Logger.getLogger(CoreOutboundRequest.class);
-
- private RequestResponder<I> requestResponder;
-
- private final RequestInitiator<O> requestInitiator = new
RequestInitiatorImpl();
- private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.WAITING);
- private final FutureReply<O> futureReply = new FutureReplyImpl();
-
- /* Protected by {@code state} */
- private O reply;
- /* Protected by {@code state} */
- private RemoteExecutionException exception;
- /* Protected by {@code state} */
- private List<RequestCompletionHandler<O>> handlers =
Collections.synchronizedList(new LinkedList<RequestCompletionHandler<O>>());
-
- public CoreOutboundRequest() {
- }
-
- public RequestResponder<I> getRequester() {
- return requestResponder;
- }
-
- public void setRequestResponder(final RequestResponder<I> requestResponder) {
- this.requestResponder = requestResponder;
- }
-
- public FutureReply<O> getFutureReply() {
- return futureReply;
- }
-
- public RequestInitiator<O> getReplier() {
- return requestInitiator;
- }
-
- private enum State implements org.jboss.cx.remoting.util.State<State> {
- WAITING,
- DONE,
- EXCEPTION,
- CANCELLED,
- TERMINATED,;
-
- public boolean isReachable(final State dest) {
- switch (this) {
- case WAITING:
- case DONE:
- return compareTo(dest) < 0;
- default:
- return false;
- }
- }
- }
-
- /**
- * Complete the request. Call only with the monitor held, not in WAITING state.
- */
- private void complete() {
- final List<RequestCompletionHandler<O>> handlers = this.handlers;
- if (handlers != null) {
- this.handlers = null;
- final Iterator<RequestCompletionHandler<O>> iterator =
handlers.iterator();
- while (iterator.hasNext()) {
- final RequestCompletionHandler<O> handler = iterator.next();
- try {
- handler.notifyComplete(futureReply);
- } catch (Throwable t) {
- log.trace(t, "Request completion notifier failed for notifier
object %s", String.valueOf(handler));
- }
- iterator.remove();
- }
- }
- }
-
- public final class RequestInitiatorImpl implements RequestInitiator<O> {
- public void handleCancelAcknowledge() {
- if (state.transitionHold(State.WAITING, State.CANCELLED)) try {
- complete();
- } finally {
- state.release();
- }
- }
-
- public void handleReply(final O reply) {
- if (state.transitionExclusive(State.WAITING, State.DONE)) try {
- CoreOutboundRequest.this.reply = reply;
- } finally {
- state.releaseDowngrade();
- try {
- complete();
- } finally {
- state.release();
- }
- }
- }
-
- public void handleException(final RemoteExecutionException exception) {
- if (state.transitionExclusive(State.WAITING, State.EXCEPTION)) try {
- CoreOutboundRequest.this.exception = exception;
- } finally {
- state.releaseDowngrade();
- try {
- complete();
- } finally {
- state.release();
- }
- }
- }
- }
-
- public final class FutureReplyImpl implements FutureReply<O> {
-
- private FutureReplyImpl() {
- }
-
- public boolean cancel(final boolean mayInterruptIfRunning) {
- if (state.inHold(State.WAITING)) try {
- try {
- requestResponder.handleCancelRequest(mayInterruptIfRunning);
- } catch (RemotingException e) {
- return false;
- }
- } finally {
- state.release();
- }
- return state.waitForNot(State.WAITING) == State.CANCELLED;
- }
-
- public FutureReply<O> sendCancel(final boolean mayInterruptIfRunning) {
- if (state.inHold(State.WAITING)) try {
- try {
- requestResponder.handleCancelRequest(mayInterruptIfRunning);
- } catch (RemotingException e) {
- // do nothing
- }
- } finally {
- state.release();
- }
- return this;
- }
-
- public boolean isCancelled() {
- return state.in(State.CANCELLED);
- }
-
- public boolean isDone() {
- return state.in(State.DONE);
- }
-
- public O get() throws CancellationException, RemoteExecutionException {
- final State newState = state.waitForNotHold(State.WAITING);
- try {
- switch(newState) {
- case CANCELLED:
- throw new CancellationException("Request was
cancelled");
- case EXCEPTION:
- throw exception;
- case DONE:
- return reply;
- case TERMINATED:
- throw new IndeterminateOutcomeException("Request terminated
abruptly; outcome unknown");
- default:
- throw new IllegalStateException("Wrong state");
- }
- } finally {
- state.release();
- }
- }
-
- public O getInterruptibly() throws InterruptedException, CancellationException,
RemoteExecutionException {
- final State newState = state.waitInterruptiblyForNotHold(State.WAITING);
- try {
- switch(newState) {
- case CANCELLED:
- throw new CancellationException("Request was
cancelled");
- case EXCEPTION:
- throw exception;
- case DONE:
- return reply;
- case TERMINATED:
- throw new IndeterminateOutcomeException("Request terminated
abruptly; outcome unknown");
- default:
- throw new IllegalStateException("Wrong state");
- }
- } finally {
- state.release();
- }
- }
-
- public O get(long timeout, TimeUnit unit) throws CancellationException,
RemoteExecutionException {
- final State newState = state.waitForNotHold(State.WAITING, timeout, unit);
- try {
- switch (newState) {
- case CANCELLED:
- throw new CancellationException("Request was
cancelled");
- case EXCEPTION:
- throw exception;
- case DONE:
- return reply;
- case TERMINATED:
- throw new IndeterminateOutcomeException("Request terminated
abruptly; outcome unknown");
- }
- throw new IllegalStateException("Wrong state");
- } finally {
- state.release();
- }
- }
-
- public O getInterruptibly(final long timeout, final TimeUnit unit) throws
InterruptedException, CancellationException, RemoteExecutionException {
- final State newState = state.waitInterruptiblyForNotHold(State.WAITING,
timeout, unit);
- try {
- switch (newState) {
- case CANCELLED:
- throw new CancellationException("Request was
cancelled");
- case EXCEPTION:
- throw exception;
- case DONE:
- return reply;
- case TERMINATED:
- throw new IndeterminateOutcomeException("Request terminated
abruptly; outcome unknown");
- }
- throw new IllegalStateException("Wrong state");
- } finally {
- state.release();
- }
- }
-
- public FutureReply<O>
addCompletionNotifier(RequestCompletionHandler<O> handler) {
- final State currentState = state.getStateHold();
- try {
- switch (currentState) {
- case CANCELLED:
- case DONE:
- case EXCEPTION:
- handler.notifyComplete(this);
- break;
- case WAITING:
- handlers.add(handler);
- break;
- }
- } finally {
- state.release();
- }
- return this;
- }
- }
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,117 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.Client;
-import org.jboss.cx.remoting.ClientSource;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.xnio.log.Logger;
-import org.jboss.cx.remoting.util.AtomicStateMachine;
-import org.jboss.cx.remoting.util.CollectionUtil;
-
-/**
- *
- */
-public final class CoreOutboundService<I, O> {
- private static final Logger log = Logger.getLogger(CoreOutboundService.class);
-
- private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.INITIAL);
- private final ClientSource<I, O> userClientSource = new UserClientSource();
- private final ServiceInitiator serviceInitiator = new ServiceInitiatorImpl();
- private final Executor executor;
-
- private ServiceResponder<I,O> serviceResponder;
- private Set<CloseHandler<ClientSource<I,O>>> closeHandlers =
CollectionUtil.synchronizedSet(new LinkedHashSet<CloseHandler<ClientSource<I,
O>>>());
-
- public CoreOutboundService(final Executor executor) {
- this.executor = executor;
- }
-
- private enum State implements org.jboss.cx.remoting.util.State<State> {
- INITIAL,
- UP,
- CLOSING,
- DOWN;
-
- public boolean isReachable(final State dest) {
- return compareTo(dest) < 0;
- }
- }
-
- // Getters
-
- ClientSource<I, O> getUserContextSource() {
- return userClientSource;
- }
-
- public ServiceInitiator getServiceClient() {
- return serviceInitiator;
- }
-
- public void initialize(final ServiceResponder<I, O> serviceResponder) {
- state.requireTransitionExclusive(State.INITIAL, State.UP);
- this.serviceResponder = serviceResponder;
- state.releaseExclusive();
- }
-
- @SuppressWarnings
({"SerializableInnerClassWithNonSerializableOuterClass"})
- public final class UserClientSource extends AbstractRealClientSource<I, O> {
- protected UserClientSource() {
- super(serviceResponder);
- }
-
- private void doClose() throws RemotingException {
- state.waitForNot(State.INITIAL);
- if (state.transitionHold(State.UP, State.DOWN)) try {
- synchronized (closeHandlers) {
- for (final CloseHandler<ClientSource<I, O>> handler :
closeHandlers) {
- executor.execute(new Runnable() {
- public void run() {
- handler.handleClose(UserClientSource.this);
- }
- });
- }
- closeHandlers.clear();
- }
- serviceResponder.handleClose();
- } finally {
- state.release();
- }
- }
-
- public void close() throws RemotingException {
- doClose();
- }
-
- public void addCloseHandler(final CloseHandler<ClientSource<I, O>>
closeHandler) {
- final State current = state.getStateHold();
- try {
- switch (current) {
- case DOWN:
- closeHandler.handleClose(this);
- break;
- default:
- closeHandlers.add(closeHandler);
- break;
- }
- } finally {
- state.release();
- }
- }
-
- public Client<I, O> createContext() throws RemotingException {
- final CoreOutboundClient<I, O> client = new CoreOutboundClient<I,
O>(executor);
- final ClientResponder<I, O> clientResponder =
serviceResponder.createNewClient(client.getClientInitiator());
- client.initialize(clientResponder);
- return client.getUserContext();
- }
- }
-
- public final class ServiceInitiatorImpl implements ServiceInitiator {
- public void handleClosing() throws RemotingException {
- // todo - remote side is closing
- }
- }
-}
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,819 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.io.IOException;
-import java.lang.ref.WeakReference;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.Client;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.Session;
-import org.jboss.cx.remoting.core.stream.DefaultStreamDetector;
-import org.jboss.xnio.log.Logger;
-import org.jboss.cx.remoting.util.ByteMessageInput;
-import org.jboss.cx.remoting.util.ByteMessageOutput;
-import org.jboss.cx.remoting.util.ObjectMessageInput;
-import org.jboss.cx.remoting.util.ObjectMessageOutput;
-import org.jboss.cx.remoting.spi.stream.StreamDetector;
-import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
-import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
-import org.jboss.cx.remoting.util.AtomicStateMachine;
-import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.cx.remoting.util.CollectionUtil;
-
-/**
- * Three execution contexts:
- *
- * - Inbound protocol handler - controlled by server/network handler
- * - Context client context - controlled by user/container
- * - Local work handler - ExecutorService provided to Endpoint
- */
-public final class CoreSession {
- private static final Logger log = Logger.getLogger(CoreSession.class);
-
- private final ProtocolContextImpl protocolContext = new ProtocolContextImpl();
- private final UserSession userSession = new UserSession();
-
- // stream serialization detectors - immutable (for now?)
- private final List<StreamDetector> streamDetectors;
- private List<ObjectResolver> resolvers = new
ArrayList<ObjectResolver>();
- private MarshallerFactory marshallerFactory;
-
- // Contexts and services that are available on the remote end of this session
- // In these paris, the Server points to the ProtocolHandler, and the Client points
to...whatever
- private final ConcurrentMap<ClientIdentifier, ClientContextPair> clientContexts
= CollectionUtil.concurrentMap();
- private final ConcurrentMap<ServiceIdentifier, ClientServicePair>
clientServices = CollectionUtil.concurrentMap();
-
- // Contexts and services that are available on this end of this session
- // In these pairs, the Client points to the ProtocolHandler, and the Server points
to... whatever
- private final ConcurrentMap<ClientIdentifier, ServerContextPair> serverContexts
= CollectionUtil.concurrentMap();
- private final ConcurrentMap<ServiceIdentifier, ServerServicePair>
serverServices = CollectionUtil.concurrentMap();
-
- // streams - strong references, only clean up if a close message is sent or received
- private final ConcurrentMap<StreamIdentifier, CoreStream> streams =
CollectionUtil.concurrentMap();
-
- // don't GC the endpoint while a session lives
- private final CoreEndpoint endpoint;
- private final Executor executor;
- private final Set<CloseHandler<Session>> closeHandlers =
CollectionUtil.synchronizedSet(new LinkedHashSet<CloseHandler<Session>>());
-
- /** The protocol handler. Set on NEW -> CONNECTING */
- private ProtocolHandler protocolHandler;
- /** The remote endpoint name. Set on CONNECTING -> UP */
- private String remoteEndpointName;
- /** The root context. Set on CONNECTING -> UP */
- private Client<?, ?> rootClient;
-
- private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.NEW);
- private ObjectResolver resolver; // todo - initialize to a composite resolver
-
- // Constructors
-
- CoreSession(final CoreEndpoint endpoint) {
- if (endpoint == null) {
- throw new NullPointerException("endpoint is null");
- }
- this.endpoint = endpoint;
- executor = endpoint.getExecutor();
- // todo - make stream detectors pluggable
- streamDetectors =
java.util.Collections.singletonList(DefaultStreamDetector.INSTANCE);
- }
-
- UserSession getUserSession() {
- state.waitForHold(State.UP);
- try {
- return userSession;
- } finally {
- state.release();
- }
- }
-
- public MarshallerFactory getMarshallerFactory() {
- return marshallerFactory;
- }
-
- public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
- this.marshallerFactory = marshallerFactory;
- }
-
- public void addFirstResolver(ObjectResolver resolver) {
- resolvers.add(0, resolver);
- }
-
- public void addLastResolver(ObjectResolver resolver) {
- resolvers.add(resolver);
- }
-
-
- // Initializers
-
- private <I, O> void doInitialize(final ProtocolHandler protocolHandler, final
Client<I, O> rootClient) {
- if (protocolHandler == null) {
- throw new NullPointerException("protocolHandler is null");
- }
- this.protocolHandler = protocolHandler;
- if (rootClient instanceof AbstractRealClient) {
- final AbstractRealClient<I, O> abstractRealContext =
(AbstractRealClient<I, O>) rootClient;
- // Forward local context
- final ClientIdentifier localIdentifier =
protocolHandler.getLocalRootClientIdentifier();
- if (localIdentifier == null) {
- throw new NullPointerException("localIdentifier is null");
- }
- final ProtocolClientInitiatorImpl<I, O> contextClient = new
ProtocolClientInitiatorImpl<I, O>(localIdentifier);
- serverContexts.put(localIdentifier, new ServerContextPair<I,
O>(contextClient, abstractRealContext.getContextServer()));
- log.trace("Initialized session with local context %s",
localIdentifier);
- }
- // Forward remote context
- final ClientIdentifier remoteIdentifier =
protocolHandler.getRemoteRootClientIdentifier();
- if (remoteIdentifier == null) {
- throw new NullPointerException("remoteIdentifier is null");
- }
- final ProtocolClientResponderImpl<I, O> contextServer = new
ProtocolClientResponderImpl<I,O>(remoteIdentifier);
- final CoreOutboundClient<I, O> coreOutboundClient = new
CoreOutboundClient<I, O>(executor);
- clientContexts.put(remoteIdentifier, new ClientContextPair<I,
O>(coreOutboundClient.getClientInitiator(), contextServer, remoteIdentifier));
- coreOutboundClient.initialize(contextServer);
- this.rootClient = coreOutboundClient.getUserContext();
- log.trace("Initialized session with remote context %s",
remoteIdentifier);
- }
-
- <I, O> void initializeServer(final ProtocolHandler protocolHandler, final
Client<I, O> rootClient) {
- if (protocolHandler == null) {
- throw new NullPointerException("protocolHandler is null");
- }
- boolean ok = false;
- state.requireTransitionExclusive(State.NEW, State.CONNECTING);
- try {
- doInitialize(protocolHandler, rootClient);
- ok = true;
- } finally {
- state.releaseExclusive();
- if (! ok) {
- state.transition(State.DOWN);
- }
- }
- }
-
- <I, O> void initializeClient(final ProtocolHandlerFactory
protocolHandlerFactory, final URI remoteUri, final AttributeMap attributeMap, final
Client<I, O> rootClient) throws IOException {
- if (protocolHandlerFactory == null) {
- throw new NullPointerException("protocolHandlerFactory is null");
- }
- boolean ok = false;
- state.requireTransitionExclusive(State.NEW, State.CONNECTING);
- try {
- doInitialize(protocolHandlerFactory.createHandler(protocolContext, remoteUri,
attributeMap), rootClient);
- ok = true;
- } finally {
- state.releaseExclusive();
- if (! ok) {
- state.transition(State.DOWN);
- }
- }
- }
-
- public ProtocolContext getProtocolContext() {
- return protocolContext;
- }
-
- public ProtocolHandler getProtocolHandler() {
- return protocolHandler;
- }
-
- // State mgmt
-
- private enum State implements org.jboss.cx.remoting.util.State<State> {
- NEW,
- CONNECTING,
- UP,
- STOPPING,
- DOWN;
-
- public boolean isReachable(final State dest) {
- return compareTo(dest) < 0;
- }
- }
-
- // Client mgmt
-
- // User session impl
-
- public final class UserSession implements Session {
- private UserSession() {}
-
- private final ConcurrentMap<Object, Object> sessionMap =
CollectionUtil.concurrentMap();
-
- public void close() throws RemotingException {
- // todo - maybe drain the session first?
- shutdown();
- state.waitFor(State.DOWN);
- }
-
- public void addCloseHandler(final CloseHandler<Session> closeHandler) {
- final State current = state.getStateHold();
- try {
- switch (current) {
- case DOWN:
- case STOPPING:
- closeHandler.handleClose(this);
- break;
- default:
- closeHandlers.add(closeHandler);
- }
- } finally {
- state.release();
- }
- }
-
- public ConcurrentMap<Object, Object> getAttributes() {
- return sessionMap;
- }
-
- public String getLocalEndpointName() {
- return endpoint.getName();
- }
-
- public String getRemoteEndpointName() {
- return remoteEndpointName;
- }
-
- @SuppressWarnings ({"unchecked"})
- public <I, O> Client<I, O> getRootClient() {
- return (Client<I, O>) rootClient;
- }
- }
-
- // Protocol context
-
- @SuppressWarnings ({"unchecked"})
- private static <O> void doSendReply(RequestInitiator<O> requestInitiator,
Object data) throws RemotingException {
- requestInitiator.handleReply((O)data);
- }
-
- // Lifecycle
-
- private void shutdown() {
- if (state.transition(State.UP, State.STOPPING)) {
- for (final CloseHandler<Session> closeHandler : closeHandlers) {
- executor.execute(new Runnable() {
- public void run() {
- closeHandler.handleClose(userSession);
- }
- });
- }
- closeHandlers.clear();
- try {
- log.trace("Initiating session shutdown");
- protocolHandler.closeSession();
- } catch (IOException e) {
- log.trace(e, "Protocol handler session close failed");
- } finally {
- endpoint.removeSession(this);
- }
- }
- }
-
- public final class ProtocolContextImpl implements ProtocolContext {
-
- public void closeSession() {
- shutdown();
- if (state.transition(State.STOPPING, State.DOWN)) {
- log.trace("Session shut down");
- }
- }
-
- public ObjectMessageOutput getMessageOutput(ByteMessageOutput target) throws
IOException {
- if (target == null) {
- throw new NullPointerException("target is null");
- }
- return marshallerFactory.createMarshaller(resolver,
getClass().getClassLoader() /* todo this is WRONG */).getMarshalingSink(target);
- }
-
- public ObjectMessageOutput getMessageOutput(ByteMessageOutput target, Executor
streamExecutor) throws IOException {
- if (target == null) {
- throw new NullPointerException("target is null");
- }
- if (streamExecutor == null) {
- throw new NullPointerException("streamExecutor is null");
- }
- return marshallerFactory.createMarshaller(resolver,
getClass().getClassLoader() /* todo this is WRONG */).getMarshalingSink(target);
- }
-
- public ObjectMessageInput getMessageInput(ByteMessageInput source) throws
IOException {
- if (source == null) {
- throw new NullPointerException("source is null");
- }
- return marshallerFactory.createMarshaller(resolver,
getClass().getClassLoader() /* todo this is WRONG */).getUnmarshalingSource(source);
- }
-
- public String getLocalEndpointName() {
- return endpoint.getName();
- }
-
- public void receiveClientClose(ClientIdentifier remoteClientIdentifier, final
boolean immediate, final boolean cancel, final boolean interrupt) {
- if (remoteClientIdentifier == null) {
- throw new NullPointerException("remoteClientIdentifier is
null");
- }
- final ServerContextPair contextPair =
serverContexts.remove(remoteClientIdentifier);
- // todo - do the whole close operation
- try {
- contextPair.clientResponder.handleClose(immediate, cancel);
- } catch (RemotingException e) {
- log.trace(e, "Failed to forward a context close");
- }
- }
-
- public void closeStream(StreamIdentifier streamIdentifier) {
- if (streamIdentifier == null) {
- throw new NullPointerException("streamIdentifier is null");
- }
- final CoreStream coreStream = streams.remove(streamIdentifier);
- try {
- coreStream.getStreamSerializer().handleClose();
- } catch (IOException e) {
- log.trace(e, "Failed to close the stream");
- }
- }
-
- public void receiveServiceClose(ServiceIdentifier serviceIdentifier) {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- final ServerServicePair servicePair =
serverServices.remove(serviceIdentifier);
- try {
- servicePair.serviceResponder.handleClose();
- } catch (RemotingException e) {
- log.trace(e, "Failed to forward a service close");
- }
- }
-
- @SuppressWarnings ({"unchecked"})
- public void receiveOpenedContext(ServiceIdentifier remoteServiceIdentifier,
ClientIdentifier remoteClientIdentifier) {
- if (remoteServiceIdentifier == null) {
- throw new NullPointerException("remoteServiceIdentifier is
null");
- }
- if (remoteClientIdentifier == null) {
- throw new NullPointerException("remoteClientIdentifier is
null");
- }
- try {
- // This operation needs to be idempotent
- if (! serverContexts.containsKey(remoteClientIdentifier)) {
- final ServerServicePair servicePair =
serverServices.get(remoteServiceIdentifier);
- final ProtocolClientInitiatorImpl contextClient = new
ProtocolClientInitiatorImpl(remoteClientIdentifier);
- final ClientResponder clientResponder =
servicePair.serviceResponder.createNewClient(contextClient);
- if (serverContexts.putIfAbsent(remoteClientIdentifier, new
ServerContextPair(contextClient, clientResponder)) != null) {
- clientResponder.handleClose(true, true);
- }
- }
- } catch (RemotingException e) {
- log.trace(e, "Failed to add a context to a service");
- }
- }
-
- public void receiveServiceClosing(ServiceIdentifier serviceIdentifier) {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- final ClientServicePair servicePair = clientServices.get(serviceIdentifier);
- try {
- servicePair.serviceInitiator.handleClosing();
- } catch (RemotingException e) {
- log.trace(e, "Failed to signal that a service was closing on the
remote side");
- }
- }
-
- public void receiveClientClosing(ClientIdentifier clientIdentifier, boolean done)
{
- if (clientIdentifier == null) {
- throw new NullPointerException("clientIdentifier is null");
- }
- final ClientContextPair contextPair = clientContexts.get(clientIdentifier);
- try {
- contextPair.clientInitiator.handleClosing(done);
- } catch (RemotingException e) {
- log.trace(e, "Failed to signal that a context was closing on the
remote side");
- }
- }
-
- public void receiveReply(ClientIdentifier clientIdentifier, RequestIdentifier
requestIdentifier, Object reply) {
- if (clientIdentifier == null) {
- throw new NullPointerException("clientIdentifier is null");
- }
- if (requestIdentifier == null) {
- throw new NullPointerException("requestIdentifier is null");
- }
- final ClientContextPair contextPair = clientContexts.get(clientIdentifier);
- if (contextPair == null) {
- log.trace("Got reply for request %s on unknown context %s",
requestIdentifier, clientIdentifier);
- } else {
- final ProtocolClientResponderImpl<?, ?> contextServer =
contextPair.contextServerRef.get();
- if (contextServer == null) {
- log.trace("Got reply for request %s on unknown recently leaked
context %s", requestIdentifier, clientIdentifier);
- } else {
- final RequestInitiator<?> requestInitiator =
(RequestInitiator<?>) contextServer.requests.get(requestIdentifier);
- if (requestInitiator == null) {
- log.trace("Got reply for unknown request %s on context
%s", requestIdentifier, clientIdentifier);
- } else try {
- doSendReply(requestInitiator, reply);
- } catch (RemotingException e) {
- log.trace(e, "Failed to receive a reply");
- }
- }
- }
- }
-
- public void receiveException(ClientIdentifier clientIdentifier, RequestIdentifier
requestIdentifier, RemoteExecutionException exception) {
- if (clientIdentifier == null) {
- throw new NullPointerException("clientIdentifier is null");
- }
- if (requestIdentifier == null) {
- throw new NullPointerException("requestIdentifier is null");
- }
- if (exception == null) {
- throw new NullPointerException("exception is null");
- }
- final ClientContextPair contextPair = clientContexts.get(clientIdentifier);
- if (contextPair == null) {
- log.trace("Got exception reply for request %s on unknown context
%s", requestIdentifier, clientIdentifier);
- } else {
- final ProtocolClientResponderImpl<?, ?> contextServer =
contextPair.contextServerRef.get();
- if (contextServer == null) {
- log.trace("Got exception reply for request %s on unknown
recently leaked context %s", requestIdentifier, clientIdentifier);
- } else {
- final RequestInitiator<?> requestInitiator =
(RequestInitiator<?>) contextServer.requests.get(requestIdentifier);
- if (requestInitiator == null) {
- log.trace("Got exception reply for unknown request %s on
context %s", requestIdentifier, clientIdentifier);
- } else try {
- requestInitiator.handleException(exception);
- } catch (RemotingException e) {
- log.trace(e, "Failed to receive an exception reply");
- }
- }
- }
- }
-
- public void receiveCancelAcknowledge(ClientIdentifier clientIdentifier,
RequestIdentifier requestIdentifier) {
- if (clientIdentifier == null) {
- throw new NullPointerException("clientIdentifier is null");
- }
- if (requestIdentifier == null) {
- throw new NullPointerException("requestIdentifier is null");
- }
- final ClientContextPair contextPair = clientContexts.get(clientIdentifier);
- if (contextPair == null) {
- log.trace("Got cancellation acknowledgement for request %s on
unknown context %s", requestIdentifier, clientIdentifier);
- } else {
- final ProtocolClientResponderImpl<?, ?> contextServer =
contextPair.contextServerRef.get();
- if (contextServer == null) {
- log.trace("Got cancellation acknowledgement for request %s on
unknown recently leaked context %s", requestIdentifier, clientIdentifier);
- } else {
- final RequestInitiator<?> requestInitiator =
(RequestInitiator<?>) contextServer.requests.get(requestIdentifier);
- if (requestInitiator == null) {
- log.trace("Got cancellation acknowledgement for unknown
request %s on context %s", requestIdentifier, clientIdentifier);
- } else try {
- requestInitiator.handleCancelAcknowledge();
- } catch (RemotingException e) {
- log.trace(e, "Failed to receive a cancellation
acknowledgement");
- }
- }
- }
- }
-
- public void receiveCancelRequest(ClientIdentifier remoteClientIdentifier,
RequestIdentifier requestIdentifier, boolean mayInterrupt) {
- if (remoteClientIdentifier == null) {
- throw new NullPointerException("remoteClientIdentifier is
null");
- }
- if (requestIdentifier == null) {
- throw new NullPointerException("requestIdentifier is null");
- }
- final ServerContextPair contextPair =
serverContexts.get(remoteClientIdentifier);
- final RequestResponder<?> requestResponder =
(RequestResponder<?>) contextPair.contextClient.requests.get(requestIdentifier);
- try {
- requestResponder.handleCancelRequest(mayInterrupt);
- } catch (RemotingException e) {
- log.trace(e, "Failed to receive a cancellation request");
- }
- }
-
- public void receiveStreamData(StreamIdentifier streamIdentifier,
ObjectMessageInput data) {
- if (streamIdentifier == null) {
- throw new NullPointerException("streamIdentifier is null");
- }
- if (data == null) {
- throw new NullPointerException("data is null");
- }
- final CoreStream coreStream = streams.get(streamIdentifier);
- if (coreStream == null) {
- log.trace("Received stream data on an unknown context %s",
streamIdentifier);
- } else {
- coreStream.receiveStreamData(data);
- }
- }
-
- @SuppressWarnings ({"unchecked"})
- public void receiveRemoteSideReady(String remoteEndpointName) {
- state.waitFor(State.CONNECTING);
- state.requireTransitionExclusive(State.CONNECTING, State.UP);
- try {
- CoreSession.this.remoteEndpointName = remoteEndpointName;
- } finally {
- state.releaseExclusive();
- }
- }
-
- @SuppressWarnings ({"unchecked"})
- public void receiveRequest(final ClientIdentifier remoteClientIdentifier, final
RequestIdentifier requestIdentifier, final Object request) {
- if (remoteClientIdentifier == null) {
- throw new NullPointerException("remoteClientIdentifier is
null");
- }
- if (requestIdentifier == null) {
- throw new NullPointerException("requestIdentifier is null");
- }
- final ServerContextPair contextPair =
serverContexts.get(remoteClientIdentifier);
- if (contextPair == null) {
- log.trace("Received a request on an unknown context %s",
remoteClientIdentifier);
- return;
- }
- try {
- final RequestInitiator requestInitiator =
contextPair.contextClient.addClient(requestIdentifier);
- final RequestResponder requestResponder =
contextPair.clientResponder.createNewRequest(requestInitiator);
- requestResponder.handleRequest(request, executor);
- } catch (RemotingException e) {
- e.printStackTrace();
- }
- }
- }
-
- private final class WeakProtocolContextServerReference<I, O> extends
WeakReference<ProtocolClientResponderImpl<I, O>> {
- private final ClientContextPair<I, O> contextPair;
-
- private WeakProtocolContextServerReference(ProtocolClientResponderImpl<I,
O> referent, ClientContextPair<I, O> contextPair) {
- super(referent);
- this.contextPair = contextPair;
- }
-
- public ProtocolClientResponderImpl<I, O> get() {
- return super.get();
- }
-
- public boolean enqueue() {
- try {
- clientContexts.remove(contextPair.clientIdentifier, contextPair);
- // todo close?
- } finally {
- return super.enqueue();
- }
- }
- }
-
- private final class ClientContextPair<I, O> {
- private final ClientInitiator clientInitiator;
- private final WeakProtocolContextServerReference<I, O> contextServerRef;
- private final ClientIdentifier clientIdentifier;
-
- private ClientContextPair(final ClientInitiator clientInitiator, final
ProtocolClientResponderImpl<I, O> contextServer, final ClientIdentifier
clientIdentifier) {
- this.clientInitiator = clientInitiator;
- this.clientIdentifier = clientIdentifier;
- contextServerRef = new WeakProtocolContextServerReference<I,
O>(contextServer, this);
- // todo - auto-cleanup
- }
- }
-
- private static final class ServerContextPair<I, O> {
- private final ProtocolClientInitiatorImpl<I, O> contextClient;
- private final ClientResponder<I, O> clientResponder;
-
- private ServerContextPair(final ProtocolClientInitiatorImpl<I, O>
contextClient, final ClientResponder<I, O> clientResponder) {
- if (contextClient == null) {
- throw new NullPointerException("clientInitiator is null");
- }
- if (clientResponder == null) {
- throw new NullPointerException("clientResponder is null");
- }
- this.contextClient = contextClient;
- this.clientResponder = clientResponder;
- }
- }
-
- private static final class ClientServicePair<I, O> {
- private final ServiceInitiator serviceInitiator;
- private final ProtocolServiceResponderImpl<I, O> serviceServer;
-
- private ClientServicePair(final ServiceInitiator serviceInitiator, final
ProtocolServiceResponderImpl<I, O> serviceServer) {
- if (serviceInitiator == null) {
- throw new NullPointerException("serviceInitiator is null");
- }
- if (serviceServer == null) {
- throw new NullPointerException("serviceResponder is null");
- }
- this.serviceInitiator = serviceInitiator;
- this.serviceServer = serviceServer;
- }
- }
-
- private static final class ServerServicePair<I, O> {
- private final ProtocolServiceInitiatorImpl serviceClient;
- private final ServiceResponder<I, O> serviceResponder;
-
- private ServerServicePair(final ProtocolServiceInitiatorImpl serviceClient, final
ServiceResponder<I, O> serviceResponder) {
- if (serviceClient == null) {
- throw new NullPointerException("serviceInitiator is null");
- }
- if (serviceResponder == null) {
- throw new NullPointerException("serviceResponder is null");
- }
- this.serviceClient = serviceClient;
- this.serviceResponder = serviceResponder;
- }
- }
-
- private final class ProtocolServiceInitiatorImpl implements ServiceInitiator {
- private final ServiceIdentifier serviceIdentifier;
-
- public ProtocolServiceInitiatorImpl(final ServiceIdentifier serviceIdentifier) {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- this.serviceIdentifier = serviceIdentifier;
- }
-
- public void handleClosing() throws RemotingException {
- try {
- protocolHandler.sendServiceClosing(serviceIdentifier);
- } catch (RemotingException e) {
- throw e;
- } catch (IOException e) {
- throw new RemotingException("Failed to send service closing message:
" + e.getMessage(), e);
- }
- }
- }
-
- private final class ProtocolServiceResponderImpl<I, O> implements
ServiceResponder<I, O> {
- private final ServiceIdentifier serviceIdentifier;
-
- public ProtocolServiceResponderImpl(final ServiceIdentifier serviceIdentifier) {
- this.serviceIdentifier = serviceIdentifier;
- }
-
- public void handleClose() throws RemotingException {
- try {
- protocolHandler.sendServiceClose(serviceIdentifier);
- } catch (RemotingException e) {
- throw e;
- } catch (IOException e) {
- throw new RemotingException("Failed to send service close message:
" + e.getMessage(), e);
- }
- }
-
- public ClientResponder<I, O> createNewClient(final ClientInitiator
clientInitiator) throws RemotingException {
- try {
- final ClientIdentifier clientIdentifier =
protocolHandler.openClient(serviceIdentifier);
- if (clientIdentifier == null) {
- throw new NullPointerException("clientIdentifier is
null");
- }
- clientContexts.put(clientIdentifier, new ClientContextPair<I,
O>(clientInitiator, new ProtocolClientResponderImpl<I, O>(clientIdentifier),
clientIdentifier));
- return new ProtocolClientResponderImpl<I, O>(clientIdentifier);
- } catch (RemotingException e) {
- throw e;
- } catch (IOException e) {
- throw new RemotingException("Failed to open a context: " +
e.getMessage(), e);
- }
- }
- }
-
- private final class ProtocolClientInitiatorImpl<I, O> implements
ClientInitiator {
- private final ClientIdentifier clientIdentifier;
- private final ConcurrentMap<RequestIdentifier, RequestResponder<I>>
requests = CollectionUtil.concurrentMap();
-
- public ProtocolClientInitiatorImpl(final ClientIdentifier clientIdentifier) {
- this.clientIdentifier = clientIdentifier;
- }
-
- public void handleClosing(boolean done) throws RemotingException {
- try {
- if (state.inHold(State.UP)) {
- protocolHandler.sendClientClosing(clientIdentifier, done);
- }
- } catch (RemotingException e) {
- throw e;
- } catch (IOException e) {
- throw new RemotingException("Failed to send context closing message:
" + e.getMessage(), e);
- }
- }
-
- private RequestInitiator<O> addClient(RequestIdentifier identifier) {
- return new ProtocolRequestInitiatorImpl<O>(clientIdentifier,
identifier);
- }
-
- private final class ProtocolRequestInitiatorImpl<O> implements
RequestInitiator<O> {
- private final ClientIdentifier clientIdentifer;
- private final RequestIdentifier requestIdentifer;
-
- public ProtocolRequestInitiatorImpl(final ClientIdentifier clientIdentifer,
final RequestIdentifier requestIdentifer) {
- this.clientIdentifer = clientIdentifer;
- this.requestIdentifer = requestIdentifer;
- }
-
- public void handleReply(final O reply) throws RemotingException {
- try {
- protocolHandler.sendReply(clientIdentifer, requestIdentifer, reply);
- } catch (RemotingException e) {
- throw e;
- } catch (IOException e) {
- throw new RemotingException("Failed to send a reply: " +
e.getMessage(), e);
- } finally {
- requests.remove(requestIdentifer);
- }
- }
-
- public void handleException(final RemoteExecutionException cause) throws
RemotingException {
- try {
- protocolHandler.sendException(clientIdentifer, requestIdentifer,
cause);
- } catch (RemotingException e) {
- throw e;
- } catch (IOException e) {
- throw new RemotingException("Failed to send an exception: "
+ e.getMessage(), e);
- } finally {
- requests.remove(requestIdentifer);
- }
- }
-
- public void handleCancelAcknowledge() throws RemotingException {
- try {
- protocolHandler.sendCancelAcknowledge(clientIdentifer,
requestIdentifer);
- } catch (RemotingException e) {
- throw e;
- } catch (IOException e) {
- throw new RemotingException("Failed to send a cancel
acknowledgement: " + e.getMessage(), e);
- } finally {
- requests.remove(requestIdentifer);
- }
- }
- }
- }
-
- private final class ProtocolClientResponderImpl<I, O> implements
ClientResponder<I, O> {
- private final ClientIdentifier clientIdentifier;
- private final ConcurrentMap<RequestIdentifier, RequestInitiator<O>>
requests = CollectionUtil.concurrentMap();
-
- public ProtocolClientResponderImpl(final ClientIdentifier clientIdentifier) {
- this.clientIdentifier = clientIdentifier;
- }
-
- public RequestResponder<I> createNewRequest(final RequestInitiator<O>
requestInitiator) throws RemotingException {
- try {
- final RequestIdentifier requestIdentifier =
protocolHandler.openRequest(clientIdentifier);
- if (requestIdentifier == null) {
- throw new NullPointerException("requestIdentifier is
null");
- }
- requests.put(requestIdentifier, requestInitiator);
- return new ProtocolRequestResponderImpl(requestIdentifier);
- } catch (RemotingException e) {
- throw e;
- } catch (IOException e) {
- throw new RemotingException("Failed to open a request: " +
e.getMessage(), e);
- }
- }
-
- public void handleClose(final boolean immediate, final boolean cancel) throws
RemotingException {
- try {
- protocolHandler.sendClientClose(clientIdentifier, immediate, cancel,
false);
- } catch (RemotingException e) {
- throw e;
- } catch (IOException e) {
- throw new RemotingException("Failed to send context close message:
" + e.getMessage(), e);
- }
- }
-
- private final class ProtocolRequestResponderImpl implements
RequestResponder<I> {
- private final RequestIdentifier requestIdentifier;
-
- public ProtocolRequestResponderImpl(final RequestIdentifier
requestIdentifier) {
- this.requestIdentifier = requestIdentifier;
- }
-
- public void handleRequest(final I request, final Executor streamExecutor)
throws RemotingException {
- try {
- protocolHandler.sendRequest(clientIdentifier, requestIdentifier,
request, streamExecutor);
- } catch (RemotingException e) {
- throw e;
- } catch (IOException e) {
- throw new RemotingException("Failed to send a request: " +
e.getMessage(), e);
- }
- }
-
- public void handleCancelRequest(final boolean mayInterrupt) throws
RemotingException {
- try {
- protocolHandler.sendCancelRequest(clientIdentifier,
requestIdentifier, mayInterrupt);
- } catch (RemotingException e) {
- throw e;
- } catch (IOException e) {
- throw new RemotingException("Failed to send a cancel request:
" + e.getMessage(), e);
- }
- }
- }
- }
-}
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreStream.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreStream.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreStream.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,100 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.io.IOException;
-import java.util.concurrent.Executor;
-import org.jboss.xnio.log.Logger;
-import org.jboss.cx.remoting.util.ObjectMessageInput;
-import org.jboss.cx.remoting.util.ObjectMessageOutput;
-import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
-
-/**
- *
- */
-public final class CoreStream {
- private static final Logger log = Logger.getLogger(CoreStream.class);
-
- private final CoreSession coreSession;
- private final Executor executor;
- private final StreamIdentifier streamIdentifier;
- private final StreamSerializer streamSerializer;
-
- private final ProtocolHandler protocolHandler;
-
- private final StreamContext streamContext;
-
- /**
- * A new stream (local side). The {@code executor} must specify an executor that is
guaranteed to execute all tasks in order.
- *
- * @param coreSession the session
- * @param executor the executor to use to handle data
- * @param streamIdentifier the stream identifier
- * @param streamSerializerFactory the stream serializer
- * @param local the local side
- */
- CoreStream(final CoreSession coreSession, final Executor executor, final
StreamIdentifier streamIdentifier, final StreamSerializerFactory streamSerializerFactory,
final Object local) throws IOException {
- this.coreSession = coreSession;
- this.executor = executor;
- this.streamIdentifier = streamIdentifier;
- protocolHandler = coreSession.getProtocolHandler();
- streamContext = new StreamContextImpl();
- streamSerializer = streamSerializerFactory.getLocalSide(streamContext, local);
- }
-
- /**
- * A new stream (remote side). The {@code executor} must specify an executor that is
guaranteed to execute all tasks in order.
- *
- * @param coreSession the session
- * @param executor the executor to use to handle data
- * @param streamIdentifier the stream identifier
- * @param streamSerializerFactory the stream serializer
- */
- CoreStream(final CoreSession coreSession, final Executor executor, final
StreamIdentifier streamIdentifier, final StreamSerializerFactory streamSerializerFactory)
throws IOException {
- this.coreSession = coreSession;
- this.executor = executor;
- this.streamIdentifier = streamIdentifier;
- protocolHandler = coreSession.getProtocolHandler();
- streamContext = new StreamContextImpl();
- streamSerializer = streamSerializerFactory.getRemoteSide(streamContext);
- }
-
- public void receiveStreamData(final ObjectMessageInput data) {
- executor.execute(new Runnable() {
- public void run() {
- try {
- streamSerializer.handleData(data);
- } catch (Exception e) {
- log.trace(e, "Stream failed to handle incoming data (%s)",
data);
- }
- }
- });
- }
-
- public RemoteStreamSerializer getRemoteSerializer() {
- return (RemoteStreamSerializer) streamSerializer;
- }
-
- public StreamSerializer getStreamSerializer() {
- return streamSerializer;
- }
-
- // stream context
-
- private final class StreamContextImpl implements StreamContext {
-
- private StreamContextImpl() {
- }
-
- public ObjectMessageOutput writeMessage() throws IOException {
- return protocolHandler.sendStreamData(streamIdentifier, executor);
- }
-
- public void close() throws IOException {
- try {
- protocolHandler.closeStream(streamIdentifier);
- } finally {
- // todo clean up stream
-// coreSession.removeStream(streamIdentifier);
- }
- }
- }
-}
Copied: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
(from rev 4339,
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java)
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -0,0 +1,132 @@
+package org.jboss.cx.remoting.core;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.jboss.cx.remoting.Endpoint;
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.SessionListener;
+import org.jboss.cx.remoting.core.util.OrderedExecutorFactory;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.util.AtomicStateMachine;
+import org.jboss.cx.remoting.util.CollectionUtil;
+import org.jboss.cx.remoting.util.NamingThreadFactory;
+import org.jboss.cx.remoting.version.Version;
+import org.jboss.xnio.log.Logger;
+
+/**
+ *
+ */
+public class EndpointImpl implements Endpoint {
+
+ static {
+ // Print Remoting "greeting" message
+ Logger.getLogger("org.jboss.cx.remoting").info("JBoss Remoting
version %s", Version.VERSION);
+ }
+
+ private enum State implements org.jboss.cx.remoting.util.State<State> {
+ INITIAL,
+ UP,
+ DOWN;
+
+ public boolean isReachable(final State dest) {
+ return compareTo(dest) < 0;
+ }
+ }
+
+ private String name;
+
+ private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.INITIAL);
+ private final Set<SessionListener> sessionListeners =
CollectionUtil.synchronizedSet(new LinkedHashSet<SessionListener>());
+
+ private OrderedExecutorFactory orderedExecutorFactory;
+ private ExecutorService executorService;
+
+ private final ConcurrentMap<Object, Object> endpointMap =
CollectionUtil.concurrentMap();
+
+ public EndpointImpl() {
+ }
+
+ // Dependencies
+
+ private Executor executor;
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public Executor getOrderedExecutor() {
+ return orderedExecutorFactory.getOrderedExecutor();
+ }
+
+ public void setExecutor(final Executor executor) {
+ this.executor = executor;
+ orderedExecutorFactory = new OrderedExecutorFactory(executor);
+ }
+
+ // Configuration
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ // Lifecycle
+
+ public void create() {
+ // todo security check
+ }
+
+ public void start() {
+ // todo security check
+ if (executor == null) {
+ executorService = Executors.newCachedThreadPool(new
NamingThreadFactory(Executors.defaultThreadFactory(), "Remoting endpoint %s"));
+ setExecutor(executorService);
+ }
+ state.requireTransition(State.INITIAL, State.UP);
+ }
+
+ public void stop() {
+ // todo security check
+ if (executorService != null) {
+ executorService.shutdown();
+ executorService = null;
+ }
+ // todo
+ }
+
+ public void destroy() {
+ executor = null;
+ }
+
+ // Endpoint implementation
+
+ public ConcurrentMap<Object, Object> getAttributes() {
+ return endpointMap;
+ }
+
+ public <I, O> RemoteClientEndpoint<I, O> createClient(final
RequestListener<I, O> requestListener) {
+ return new RemoteClientEndpointLocalImpl<I, O>(this, requestListener);
+ }
+
+ public <I, O> RemoteServiceEndpoint<I, O> createService(final
RequestListener<I, O> requestListener) {
+ return new RemoteServiceEndpointLocalImpl<I, O>(this, requestListener);
+ }
+
+ public void addSessionListener(final SessionListener sessionListener) {
+ // TODO security check
+ sessionListeners.add(sessionListener);
+ }
+
+ public void removeSessionListener(final SessionListener sessionListener) {
+ // TODO security check
+ sessionListeners.remove(sessionListener);
+ }
+}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolClientInitiator.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolClientInitiator.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolClientInitiator.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,9 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-/**
- *
- */
-public interface ProtocolClientInitiator<I, O> extends ClientInitiator {
- RequestInitiator<O> addRequest(RequestIdentifier requestIdentifier);
-
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolClientResponder.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolClientResponder.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolClientResponder.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,8 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-/**
- *
- */
-public interface ProtocolClientResponder<I, O> extends ClientResponder<I, O>
{
- RequestInitiator<O> getRequestClient(RequestIdentifier requestIdentifier);
-}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.cx.remoting.spi.remote.ReplyHandler;
+import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.cx.remoting.spi.SpiUtils;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.Client;
+import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.xnio.log.Logger;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public final class RemoteClientEndpointLocalImpl<I, O> implements
RemoteClientEndpoint<I, O> {
+
+ private final EndpointImpl endpointImpl;
+ private final RequestListener<I, O> requestListener;
+ private final Executor executor;
+ private final ClientContextImpl clientContext = new ClientContextImpl();
+
+ private static final Logger log =
Logger.getLogger(RemoteClientEndpointLocalImpl.class);
+
+ public RemoteClientEndpointLocalImpl(final EndpointImpl endpointImpl, final
RequestListener<I, O> requestListener) {
+ this.endpointImpl = endpointImpl;
+ this.requestListener = requestListener;
+ executor = endpointImpl.getExecutor();
+ }
+
+ public RemoteClientEndpointLocalImpl(final EndpointImpl endpointImpl, final
RemoteServiceEndpointLocalImpl<I, O> service, final RequestListener<I, O>
requestListener) {
+ this.endpointImpl = endpointImpl;
+ this.requestListener = requestListener;
+ executor = endpointImpl.getExecutor();
+ }
+
+ public RemoteRequestContext receiveRequest(final I request, final
ReplyHandler<O> replyHandler) {
+ final RequestContextImpl<O> context = new
RequestContextImpl<O>(replyHandler, clientContext);
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ requestListener.handleRequest(context, request);
+ } catch (RemoteExecutionException e) {
+ SpiUtils.safeHandleException(replyHandler, e.getMessage(),
e.getCause());
+ } catch (Throwable t) {
+ SpiUtils.safeHandleException(replyHandler, "Unexpected exception
in request listener", t);
+ }
+ }
+ });
+ return new RemoteRequestContext() {
+ public void cancel() {
+ context.cancel();
+ }
+ };
+ }
+
+ public Handle<RemoteClientEndpoint<I, O>> getHandle() throws
RemotingException {
+ return null;
+ }
+
+ public Client<I, O> getClient() throws RemotingException {
+ return null;
+ }
+
+ public void autoClose() {
+ }
+
+ public void close() throws RemotingException {
+ }
+
+ public void addCloseHandler(final CloseHandler<RemoteClientEndpoint<I,
O>> handler) {
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.ClientSource;
+import org.jboss.cx.remoting.CloseHandler;
+
+/**
+ *
+ */
+public final class RemoteServiceEndpointLocalImpl<I, O> implements
RemoteServiceEndpoint<I, O> {
+
+ private final EndpointImpl endpointImpl;
+ private final RequestListener<I, O> requestListener;
+
+ public RemoteServiceEndpointLocalImpl(final EndpointImpl endpointImpl, final
RequestListener<I, O> requestListener) {
+ this.endpointImpl = endpointImpl;
+ this.requestListener = requestListener;
+ }
+
+ public RemoteClientEndpoint<I, O> openClient() throws RemotingException {
+ return new RemoteClientEndpointLocalImpl<I, O>(endpointImpl, this,
requestListener);
+ }
+
+ public Handle<RemoteServiceEndpoint<I, O>> getHandle() throws
RemotingException {
+ return null;
+ }
+
+ public ClientSource<I, O> getClientSource() throws RemotingException {
+ return null;
+ }
+
+ public void autoClose() {
+ }
+
+ public void close() throws RemotingException {
+ }
+
+ public void addCloseHandler(final CloseHandler<RemoteServiceEndpoint<I,
O>> handler) {
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -0,0 +1,119 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.RequestContext;
+import org.jboss.cx.remoting.ClientContext;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.RequestCancelHandler;
+import org.jboss.cx.remoting.core.util.TaggingExecutor;
+import org.jboss.cx.remoting.spi.remote.ReplyHandler;
+import org.jboss.cx.remoting.spi.SpiUtils;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Set;
+import java.util.HashSet;
+
+/**
+ *
+ */
+public final class RequestContextImpl<O> implements RequestContext<O> {
+
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final Object cancelLock = new Object();
+ private final ReplyHandler<O> replyHandler;
+ private final ClientContextImpl clientContext;
+
+ private final AtomicBoolean cancelled = new AtomicBoolean();
+ // @protectedby cancelLock
+ private Set<RequestCancelHandler<O>> cancelHandlers;
+ private final TaggingExecutor executor;
+
+ RequestContextImpl(final ReplyHandler<O> replyHandler, final ClientContextImpl
clientContext) {
+ this.replyHandler = replyHandler;
+ this.clientContext = clientContext;
+ executor = new TaggingExecutor(clientContext.getExecutor());
+ }
+
+ public ClientContext getContext() {
+ return clientContext;
+ }
+
+ public boolean isCancelled() {
+ return cancelled.get();
+ }
+
+ public void sendReply(final O reply) throws RemotingException, IllegalStateException
{
+ if (! closed.getAndSet(true)) {
+ replyHandler.handleReply(reply);
+ } else {
+ throw new IllegalStateException("Reply already sent");
+ }
+ }
+
+ public void sendFailure(final String msg, final Throwable cause) throws
RemotingException, IllegalStateException {
+ if (! closed.getAndSet(true)) {
+ replyHandler.handleException(msg, cause);
+ } else {
+ throw new IllegalStateException("Reply already sent");
+ }
+ }
+
+ public void sendCancelled() throws RemotingException, IllegalStateException {
+ if (! closed.getAndSet(true)) {
+ replyHandler.handleCancellation();
+ } else {
+ throw new IllegalStateException("Reply already sent");
+ }
+ }
+
+ public void addCancelHandler(final RequestCancelHandler<O> handler) {
+ synchronized (cancelLock) {
+ if (cancelled.get()) {
+ SpiUtils.safeNotifyCancellation(handler, this, false);
+ } else {
+ if (cancelHandlers == null) {
+ cancelHandlers = new HashSet<RequestCancelHandler<O>>();
+ }
+ cancelHandlers.add(handler);
+ }
+ }
+ }
+
+ public void execute(final Runnable command) {
+ executor.execute(command);
+ }
+
+ protected void cancel() {
+ if (! cancelled.getAndSet(true)) {
+ synchronized (cancelLock) {
+ if (cancelHandlers != null) {
+ for (RequestCancelHandler<O> handler : cancelHandlers) {
+ SpiUtils.safeNotifyCancellation(handler, this, false);
+ }
+ cancelHandlers = null;
+ }
+ }
+ executor.interruptAll();
+ }
+ }
+}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestInitiator.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestInitiator.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestInitiator.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,17 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.RemotingException;
-
-/**
- *
- */
-public interface RequestInitiator<O> {
- // Outbound protocol messages
-
- void handleReply(final O reply) throws RemotingException;
-
- void handleException(final RemoteExecutionException cause) throws RemotingException;
-
- void handleCancelAcknowledge() throws RemotingException;
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestResponder.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestResponder.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestResponder.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,16 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.RemotingException;
-
-/**
- *
- */
-public interface RequestResponder<I> {
-
- // Outbound protocol messages
-
- void handleRequest(I request, final Executor streamExecutor) throws
RemotingException;
-
- void handleCancelRequest(boolean mayInterrupt) throws RemotingException;
-}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceContextImpl.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceContextImpl.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.ServiceContext;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ *
+ */
+public final class ServiceContextImpl implements ServiceContext {
+
+ public ConcurrentMap<Object, Object> getAttributes() {
+ return null;
+ }
+
+ public void close() throws RemotingException {
+ }
+
+ public void addCloseHandler(final CloseHandler<ServiceContext>
serviceContextCloseHandler) {
+ }
+}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceInitiator.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceInitiator.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceInitiator.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,10 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import org.jboss.cx.remoting.RemotingException;
-
-/**
- *
- */
-public interface ServiceInitiator {
- void handleClosing() throws RemotingException;
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceResponder.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceResponder.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceResponder.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,12 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import org.jboss.cx.remoting.RemotingException;
-
-/**
- *
- */
-public interface ServiceResponder<I, O> {
- void handleClose() throws RemotingException;
-
- ClientResponder<I, O> createNewClient(ClientInitiator clientInitiator) throws
RemotingException;
-}
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,51 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
-
-/**
- *
- */
-public final class StreamMarker implements Externalizable {
-
- private static final long serialVersionUID = 1L;
-
- private Class<? extends StreamSerializerFactory> factoryClass;
- private StreamIdentifier streamIdentifier;
-
- public StreamMarker(final Class<? extends StreamSerializerFactory>
factoryClass, final StreamIdentifier streamIdentifier) {
- if (factoryClass == null) {
- throw new NullPointerException("factoryClass is null");
- }
- if (streamIdentifier == null) {
- throw new NullPointerException("streamIdentifier is null");
- }
- this.factoryClass = factoryClass;
- this.streamIdentifier = streamIdentifier;
- }
-
- public StreamMarker() {
- }
-
- public Class<? extends StreamSerializerFactory> getFactoryClass() {
- return factoryClass;
- }
-
- public StreamIdentifier getStreamIdentifier() {
- return streamIdentifier;
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(factoryClass);
- out.writeObject(streamIdentifier);
- }
-
- @SuppressWarnings ({"unchecked"})
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
- factoryClass = (Class<? extends StreamSerializerFactory>) in.readObject();
- streamIdentifier = (StreamIdentifier) in.readObject();
- }
-}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,43 +1,155 @@
package org.jboss.cx.remoting.core.marshal;
import java.io.IOException;
+import java.io.ObjectStreamClass;
+import java.nio.ByteBuffer;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.HashMap;
import org.jboss.cx.remoting.spi.marshal.Marshaller;
import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
-import org.jboss.cx.remoting.util.ByteMessageInput;
-import org.jboss.cx.remoting.util.ByteMessageOutput;
-import org.jboss.cx.remoting.util.ObjectMessageInput;
-import org.jboss.cx.remoting.util.ObjectMessageOutput;
import org.jboss.cx.remoting.stream.ObjectSink;
import org.jboss.cx.remoting.stream.ObjectSource;
+import org.jboss.cx.remoting.stream.ByteBufferOutputStream;
+import org.jboss.cx.remoting.stream.ByteBufferInputStream;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.serial.io.JBossObjectOutputStream;
+import org.jboss.serial.io.JBossObjectInputStream;
/**
*
*/
-public class JBossSerializationMarhsaller implements Marshaller {
+public class JBossSerializationMarhsaller implements Marshaller<ByteBuffer> {
private static final long serialVersionUID = -8197192536466706414L;
+ private final BufferAllocator<ByteBuffer> allocator;
private final ObjectResolver resolver;
private final ClassLoader classLoader;
- public JBossSerializationMarhsaller(final ObjectResolver resolver, final ClassLoader
classLoader) {
+ public JBossSerializationMarhsaller(final BufferAllocator<ByteBuffer>
allocator, final ObjectResolver resolver, final ClassLoader classLoader) {
+ this.allocator = allocator;
this.resolver = resolver;
this.classLoader = classLoader;
}
- public ObjectMessageOutput getMessageOutput(final ByteMessageOutput
byteMessageOutput) throws IOException {
- return new JBossSerializationObjectMessageOutput(resolver, byteMessageOutput);
+ public ObjectSink<Object> getMarshalingSink(final ObjectSink<ByteBuffer>
bufferSink) throws IOException {
+ return new MarshalingSink(bufferSink, allocator, resolver);
}
- public ObjectMessageInput getMessageInput(final ByteMessageInput byteMessageInput)
throws IOException {
- return new JBossSerializationObjectMessageInput(resolver, byteMessageInput,
classLoader);
+ public ObjectSource<Object> getUnmarshalingSource(final
ObjectSource<ByteBuffer> bufferSource) throws IOException {
+ return new MarshalingSource(bufferSource, allocator, resolver, classLoader);
}
- public ObjectSink getMarshalingSink(final ObjectSink bufferSink) throws IOException
{
- return null;
+ public static final class MarshalingSink implements ObjectSink<Object> {
+ private final OurObjectOutputStream stream;
+
+ private MarshalingSink(final ObjectSink<ByteBuffer> bufferSink, final
BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver) throws
IOException {
+ stream = new OurObjectOutputStream(bufferSink, allocator, resolver);
+ }
+
+ public void accept(final Object instance) throws IOException {
+ stream.writeObject(instance);
+ }
+
+ public void flush() throws IOException {
+ stream.flush();
+ }
+
+ public void close() throws IOException {
+ stream.close();
+ }
}
- public ObjectSource getUnmarshalingSource(final ObjectSource bufferSource) throws
IOException {
- return null;
+ private static final class OurObjectOutputStream extends JBossObjectOutputStream {
+ private final ObjectResolver resolver;
+
+ private OurObjectOutputStream(final ObjectSink<ByteBuffer> sink, final
BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver) throws
IOException {
+ super(new ByteBufferOutputStream(sink, allocator));
+ enableReplaceObject(true);
+ this.resolver = resolver;
+ }
+
+ protected Object replaceObject(final Object obj) throws IOException {
+ return resolver.writeReplace(obj);
+ }
}
+
+ public static final class MarshalingSource implements ObjectSource<Object> {
+ private final OurObjectInputStream stream;
+
+ private MarshalingSource(final ObjectSource<ByteBuffer> bufferSource, final
BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver, final
ClassLoader classLoader) throws IOException {
+ stream = new OurObjectInputStream(bufferSource, allocator, resolver,
classLoader);
+ }
+
+ public boolean hasNext() throws IOException {
+ return true;
+ }
+
+ public Object next() throws IOException {
+ try {
+ return stream.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new RemotingException("No class found for next object in
stream", e);
+ }
+ }
+
+ public void close() throws IOException {
+ stream.close();
+ }
+ }
+
+ private static final class OurObjectInputStream extends JBossObjectInputStream {
+ private final ClassLoader classLoader;
+ private final ObjectResolver resolver;
+
+ private OurObjectInputStream(final ObjectSource<ByteBuffer> bufferSource,
final BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver, final
ClassLoader classLoader) throws IOException {
+ super(new ByteBufferInputStream(bufferSource, allocator), classLoader);
+ this.classLoader = classLoader;
+ this.resolver = resolver;
+ }
+
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException,
ClassNotFoundException {
+ final String name = desc.getName();
+ if (primitiveTypes.containsKey(name)) {
+ return primitiveTypes.get(name);
+ } else {
+ return Class.forName(name, false, classLoader);
+ }
+ }
+
+ protected Class<?> resolveProxyClass(final String[] interfaceNames) throws
IOException, ClassNotFoundException {
+ final int length = interfaceNames.length;
+ final Class<?>[] interfaces = new Class[length];
+ for (int i = 0; i < length; i ++) {
+ interfaces[i] = Class.forName(interfaceNames[i], false, classLoader);
+ }
+ return Proxy.getProxyClass(classLoader, interfaces);
+ }
+
+ protected Object resolveObject(final Object obj) throws IOException {
+ return resolver.readResolve(obj);
+ }
+
+ private static final Map<String, Class<?>> primitiveTypes = new
HashMap<String, Class<?>>();
+
+ private static <T> void add(Class<T> type) {
+ primitiveTypes.put(type.getName(), type);
+ }
+
+ static {
+ add(void.class);
+ add(boolean.class);
+ add(byte.class);
+ add(short.class);
+ add(int.class);
+ add(long.class);
+ add(float.class);
+ add(double.class);
+ add(char.class);
+ }
+ }
+
+
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,16 +1,18 @@
package org.jboss.cx.remoting.core.marshal;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.jboss.cx.remoting.spi.marshal.Marshaller;
import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.xnio.BufferAllocator;
/**
*
*/
-public class JBossSerializationMarshallerFactory implements MarshallerFactory {
+public class JBossSerializationMarshallerFactory implements
MarshallerFactory<ByteBuffer> {
- public Marshaller createRootMarshaller(final ObjectResolver resolver, final
ClassLoader classLoader) throws IOException {
- return new JBossSerializationMarhsaller(resolver, classLoader);
+ public Marshaller<ByteBuffer> createMarshaller(final
BufferAllocator<ByteBuffer> allocator, final ObjectResolver resolver, final
ClassLoader classLoader) throws IOException {
+ return new JBossSerializationMarhsaller(allocator, resolver, classLoader);
}
}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageInput.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageInput.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageInput.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,108 +0,0 @@
-package org.jboss.cx.remoting.core.marshal;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectStreamClass;
-import java.lang.reflect.Proxy;
-import java.util.HashMap;
-import java.util.Map;
-import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
-import org.jboss.cx.remoting.util.ByteMessageInput;
-import org.jboss.cx.remoting.util.ObjectMessageInput;
-import org.jboss.serial.io.JBossObjectInputStream;
-
-/**
- *
- */
-public class JBossSerializationObjectMessageInput extends JBossObjectInputStream
implements ObjectMessageInput {
-
- private final ObjectResolver resolver;
- private final ByteMessageInput dataMessageInput;
-
- public JBossSerializationObjectMessageInput(final ObjectResolver resolver, final
ByteMessageInput dataMessageInput, final ClassLoader classLoader) throws IOException {
- super(new InputStream() {
-
- public int read(final byte b[]) throws IOException {
- return dataMessageInput.read(b);
- }
-
- public int read(final byte b[], final int off, final int len) throws
IOException {
- return dataMessageInput.read(b, off, len);
- }
-
- public int available() throws IOException {
- return dataMessageInput.remaining();
- }
-
- public void close() throws IOException {
- dataMessageInput.close();
- }
-
- public boolean markSupported() {
- return false;
- }
-
- public int read() throws IOException {
- return dataMessageInput.read();
- }
- }, classLoader);
- if (resolver == null) {
- throw new NullPointerException("resolver is null");
- }
- if (dataMessageInput == null) {
- throw new NullPointerException("dataMessageInput is null");
- }
- if (classLoader == null) {
- throw new NullPointerException("classLoader is null");
- }
- enableResolveObject(true);
- this.resolver = resolver;
- this.dataMessageInput = dataMessageInput;
- }
-
- public int remaining() {
- return dataMessageInput.remaining();
- }
-
- protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException,
ClassNotFoundException {
- final String name = desc.getName();
- ClassLoader classLoader = getClassLoader();
- if (primitiveTypes.containsKey(name)) {
- return primitiveTypes.get(name);
- } else {
- return Class.forName(name, false, classLoader);
- }
- }
-
- protected Class<?> resolveProxyClass(final String[] interfaceNames) throws
IOException, ClassNotFoundException {
- final ClassLoader classLoader = getClassLoader();
- final int length = interfaceNames.length;
- final Class<?>[] interfaces = new Class[length];
- for (int i = 0; i < length; i ++) {
- interfaces[i] = Class.forName(interfaceNames[i], false, classLoader);
- }
- return Proxy.getProxyClass(classLoader, interfaces);
- }
-
- protected Object resolveObject(final Object obj) throws IOException {
- return resolver.readResolve(obj);
- }
-
- private static final Map<String, Class<?>> primitiveTypes = new
HashMap<String, Class<?>>();
-
- private static <T> void add(Class<T> type) {
- primitiveTypes.put(type.getName(), type);
- }
-
- static {
- add(void.class);
- add(boolean.class);
- add(byte.class);
- add(short.class);
- add(int.class);
- add(long.class);
- add(float.class);
- add(double.class);
- add(char.class);
- }
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageOutput.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageOutput.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationObjectMessageOutput.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,58 +0,0 @@
-package org.jboss.cx.remoting.core.marshal;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
-import org.jboss.cx.remoting.util.ByteMessageOutput;
-import org.jboss.cx.remoting.util.ObjectMessageOutput;
-import org.jboss.serial.io.JBossObjectOutputStream;
-
-/**
- *
- */
-public class JBossSerializationObjectMessageOutput extends JBossObjectOutputStream
implements ObjectMessageOutput {
-
- private final ObjectResolver resolver;
- private final ByteMessageOutput dataMessageOutput;
-
- public JBossSerializationObjectMessageOutput(final ObjectResolver resolver, final
ByteMessageOutput dataMessageOutput) throws IOException {
- super(new OutputStream() {
- public void write(final int b) throws IOException {
- dataMessageOutput.write(b);
- }
-
- public void write(final byte b[]) throws IOException {
- dataMessageOutput.write(b);
- }
-
- public void write(final byte b[], final int off, final int len) throws
IOException {
- dataMessageOutput.write(b, off, len);
- }
-
- public void flush() throws IOException {
- dataMessageOutput.flush();
- }
-
- public void close() throws IOException {
- dataMessageOutput.close();
- }
- });
- enableReplaceObject(true);
- this.resolver = resolver;
- this.dataMessageOutput = dataMessageOutput;
- }
-
- public void commit() throws IOException {
- flush();
- dataMessageOutput.commit();
- }
-
- public int getBytesWritten() throws IOException {
- flush();
- return dataMessageOutput.getBytesWritten();
- }
-
- protected Object replaceObject(final Object obj) throws IOException {
- return resolver.writeReplace(obj);
- }
-}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/StreamResolver.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/StreamResolver.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/StreamResolver.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -2,7 +2,6 @@
import java.io.IOException;
import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.core.StreamMarker;
import org.jboss.cx.remoting.core.util.OrderedExecutorFactory;
import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
@@ -29,11 +28,7 @@
}
public Object readResolve(final Object original) throws IOException {
- if (original instanceof StreamMarker) {
- StreamMarker streamMarker = (StreamMarker) original;
- return null;
- } else {
- return original;
- }
+ // todo
+ return null;
}
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ClassLoaderResourceListener.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ClassLoaderResourceListener.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ClassLoaderResourceListener.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -31,7 +31,7 @@
this.classLoader = classLoader;
}
- public void handleRequest(final RequestContext<ClassLoaderResourceReply>
requestContext, final ClassLoaderResourceRequest request) throws RemoteExecutionException,
InterruptedException {
+ public void handleRequest(final RequestContext<ClassLoaderResourceReply>
requestContext, final ClassLoaderResourceRequest request) throws RemoteExecutionException
{
try {
final Enumeration<URL> urlResources =
classLoader.getResources(request.getName());
final Enumeration<RemoteResource> actualResources =
CollectionUtil.translate(urlResources, new Translator<URL, RemoteResource>() {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ServiceLocatorListener.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ServiceLocatorListener.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ServiceLocatorListener.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -45,7 +45,7 @@
private final ConcurrentMap<String, ConcurrentMap<String, ClientSource<?,
?>>> deployments = syncMap();
- public void handleRequest(final RequestContext<ServiceReply<I, O>>
requestContext, final ServiceRequest<I, O> request) throws RemoteExecutionException,
InterruptedException {
+ public void handleRequest(final RequestContext<ServiceReply<I, O>>
requestContext, final ServiceRequest<I, O> request) throws RemoteExecutionException
{
final URI uri = request.getUri();
final ServiceURI serviceURI = new ServiceURI(uri);
final String endpointName = serviceURI.getEndpointName();
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/DefaultStreamDetector.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/DefaultStreamDetector.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/DefaultStreamDetector.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -22,9 +22,9 @@
} else if (candidate instanceof ObjectSource) {
return new ObjectSourceStreamSerializerFactory();
} else if (candidate instanceof ObjectSink) {
- return new ObjectSinkStreamSerializerFactory();
+ return null;
} else if (candidate instanceof Iterator) {
- return new IteratorStreamSerializerFactory();
+ return null;
} else {
return null;
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -44,7 +44,8 @@
}
public Object getRemoteSide(final Client<StreamChannel> remoteClient) throws
IOException {
- return new RemoteInputStream(taskList, futureChannel);
+// return new RemoteInputStream(taskList, futureChannel);
+ return null;
}
public BufferAllocator<ByteBuffer> getAllocator() {
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/IteratorStreamSerializerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/IteratorStreamSerializerFactory.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/IteratorStreamSerializerFactory.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,29 +0,0 @@
-package org.jboss.cx.remoting.core.stream;
-
-import java.io.IOException;
-import java.util.Iterator;
-import org.jboss.cx.remoting.util.ObjectMessageInput;
-import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
-import org.jboss.cx.remoting.stream.ObjectSource;
-import org.jboss.cx.remoting.stream.Streams;
-import org.jboss.xnio.channels.StreamChannel;
-import org.jboss.xnio.IoHandler;
-import org.jboss.xnio.Client;
-
-/**
- *
- */
-public final class IteratorStreamSerializerFactory implements StreamSerializerFactory {
-
- private static final long serialVersionUID = 5106872230130868988L;
-
- private
-
- public IoHandler<? super StreamChannel> getLocalSide(final Object localSide)
throws IOException {
- return null;
- }
-
- public Object getRemoteSide(final Client<StreamChannel> remoteClient) throws
IOException {
- return null;
- }
-}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSinkStreamSerializerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSinkStreamSerializerFactory.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSinkStreamSerializerFactory.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,100 +0,0 @@
-package org.jboss.cx.remoting.core.stream;
-
-import java.io.IOException;
-import org.jboss.cx.remoting.util.ObjectMessageInput;
-import org.jboss.cx.remoting.util.ObjectMessageOutput;
-import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
-import org.jboss.cx.remoting.stream.ObjectSink;
-
-/**
- *
- */
-public final class ObjectSinkStreamSerializerFactory implements StreamSerializerFactory
{
- public StreamSerializer getLocalSide(StreamContext context, Object local) throws
IOException {
- return new StreamSerializerImpl(context, (ObjectSink<?>)local);
- }
-
- public RemoteStreamSerializer getRemoteSide(StreamContext context) throws IOException
{
- return new RemoteStreamSerializerImpl(context);
- }
-
- /**
- * KEEP IN ORDER.
- */
- private enum MessageType {
- DATA,
- FLUSH,
- }
-
- public static final class StreamSerializerImpl implements StreamSerializer {
- private final StreamContext context;
- private final ObjectSink<?> objectSink;
-
- public StreamSerializerImpl(final StreamContext context, final
ObjectSink<?> objectSink) {
- this.context = context;
- this.objectSink = objectSink;
- }
-
- public void handleOpen() throws IOException {
- }
-
- @SuppressWarnings ({"unchecked"})
- public void handleData(ObjectMessageInput data) throws IOException {
- MessageType messageType = MessageType.values()[data.read()];
- switch (messageType) {
- case DATA:
- try {
- ((ObjectSink)objectSink).accept(data.readObject());
- } catch (ClassNotFoundException e) {
- throw new IOException("Cannot deserialize object from
message (class not found): " + e.toString());
- }
- break;
- case FLUSH:
- objectSink.flush();
- break;
- }
- }
-
- public void handleClose() throws IOException {
- objectSink.flush();
- }
- }
-
- public static final class RemoteStreamSerializerImpl implements
RemoteStreamSerializer {
- private final StreamContext context;
-
- public RemoteStreamSerializerImpl(final StreamContext context) {
- this.context = context;
- }
-
- public ObjectSink<?> getRemoteInstance() {
- return new ObjectSink<Object>() {
- public void accept(final Object instance) throws IOException {
- final ObjectMessageOutput msg = context.writeMessage();
- msg.write(MessageType.DATA.ordinal());
- msg.writeObject(instance);
- msg.commit();
- }
-
- public void flush() throws IOException {
- final ObjectMessageOutput msg = context.writeMessage();
- msg.write(MessageType.FLUSH.ordinal());
- msg.commit();
- }
-
- public void close() throws IOException {
- flush();
- }
- };
- }
-
- public void handleOpen() throws IOException {
- }
-
- public void handleData(ObjectMessageInput data) throws IOException {
- }
-
- public void handleClose() throws IOException {
- }
- }
-}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -50,6 +50,10 @@
public static class LocalHandler implements IoHandler<StreamSinkChannel> {
private final ObjectSource objectSource;
+ public LocalHandler(final ObjectSource source) {
+ objectSource = source;
+ }
+
public void handleOpened(final StreamSinkChannel channel) {
if (channel.getOptions().contains(CommonOptions.TCP_NODELAY)) try {
channel.setOption(CommonOptions.TCP_NODELAY, Boolean.TRUE);
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DecodingBuilder.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DecodingBuilder.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DecodingBuilder.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -73,6 +73,7 @@
stringBuilder.append(flip(holder));
holder.clear();
} while (oflow);
+ return this;
}
public static final ByteBuffer EMPTY = ByteBuffer.allocate(0);
@@ -84,6 +85,7 @@
stringBuilder.append(flip(holder));
holder.clear();
} while (oflow);
+ return this;
}
public String toString() {
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/TaggingExecutor.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/TaggingExecutor.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/TaggingExecutor.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.util;
+
+import java.util.concurrent.Executor;
+import java.util.Set;
+import org.jboss.cx.remoting.util.CollectionUtil;
+
+/**
+ *
+ */
+public final class TaggingExecutor implements Executor {
+
+ private final Set<Task> tasks = CollectionUtil.synchronizedHashSet();
+ private final Executor executor;
+
+ public TaggingExecutor(final Executor executor) {
+ this.executor = executor;
+ }
+
+ private final class Task implements Runnable {
+ private volatile Thread thread;
+ private final Runnable runnable;
+
+ private Task(final Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ public void run() {
+ thread = Thread.currentThread();
+ tasks.add(this);
+ try {
+ runnable.run();
+ } finally {
+ tasks.remove(this);
+ thread = null;
+ }
+ }
+ }
+
+ public void execute(final Runnable command) {
+ executor.execute(new Task(command));
+ }
+
+ public void interruptAll() {
+ synchronized (tasks) {
+ for (Task task : tasks) {
+ final Thread thread = task.thread;
+ if (thread != null) {
+ thread.interrupt();
+ }
+ }
+ }
+ }
+}
Modified:
remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java
===================================================================
---
remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -47,7 +47,7 @@
}
public void start() throws RemotingException {
- session = endpoint.openSession(destination, attributeMap, null);
+// session = endpoint.openSession(destination, attributeMap, null);
}
public void stop() throws RemotingException {
Deleted:
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,51 +0,0 @@
-package org.jboss.cx.remoting.samples.simple;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.Security;
-import org.jboss.cx.remoting.Client;
-import org.jboss.cx.remoting.Endpoint;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.Remoting;
-import org.jboss.cx.remoting.Session;
-import org.jboss.cx.remoting.core.security.sasl.Provider;
-import org.jboss.cx.remoting.jrpp.JrppServer;
-import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.xnio.IoUtils;
-
-/**
- *
- */
-public final class JrppBasicExampleMain {
-
- public static void main(String[] args) throws IOException, RemoteExecutionException,
URISyntaxException {
- Security.addProvider(new Provider());
- final StringRot13RequestListener listener = new StringRot13RequestListener();
- final Endpoint endpoint = Remoting.createEndpoint("simple");
- try {
- final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new
InetSocketAddress(12345), listener, AttributeMap.EMPTY);
- try {
- Session session = endpoint.openSession(new
URI("jrpp://localhost:12345"), AttributeMap.EMPTY, null);
- try {
- final Client<String,String> client = session.getRootClient();
- try {
- final String original = "The Secret Message\n";
- final String result = client.invoke(original);
- System.out.printf("The secret message \"%s\"
became \"%s\"!\n", original.trim(), result.trim());
- } finally {
- IoUtils.safeClose(client);
- }
- } finally {
- IoUtils.safeClose(session);
- }
- } finally {
- jrppServer.stop();
- jrppServer.destroy();
- }
- } finally {
- Remoting.closeEndpoint(endpoint);
- }
- }
-}
\ No newline at end of file
Deleted:
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,70 +0,0 @@
-package org.jboss.cx.remoting.samples.simple;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.Reader;
-import java.io.StringReader;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.Security;
-import org.jboss.cx.remoting.Client;
-import org.jboss.cx.remoting.Endpoint;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.Remoting;
-import org.jboss.cx.remoting.Session;
-import org.jboss.cx.remoting.core.security.sasl.Provider;
-import org.jboss.cx.remoting.jrpp.JrppServer;
-import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.xnio.IoUtils;
-
-/**
- *
- */
-public final class JrppStreamExampleMain {
-
- public static void main(String[] args) throws IOException, RemoteExecutionException,
URISyntaxException {
- Security.addProvider(new Provider());
- final StreamingRot13RequestListener listener = new
StreamingRot13RequestListener();
- final Endpoint endpoint = Remoting.createEndpoint("simple");
- try {
- final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new
InetSocketAddress(12345), listener, AttributeMap.EMPTY);
- try {
- Session session = endpoint.openSession(new
URI("jrpp://localhost:12345"), AttributeMap.EMPTY, listener);
- try {
- final Client<Reader,Reader> client = session.getRootClient();
- try {
- final String original = "The Secret Message\n";
- final StringReader originalReader = new StringReader(original);
- try {
- final Reader reader = client.send(originalReader).get();
- try {
- final BufferedReader bufferedReader = new
BufferedReader(reader);
- try {
- final String secretLine = bufferedReader.readLine();
- System.out.printf("The secret message
\"%s\" became \"%s\"!\n", original.trim(), secretLine);
- } finally {
- IoUtils.safeClose(bufferedReader);
- }
- } finally {
- IoUtils.safeClose(reader);
- }
- } finally {
- IoUtils.safeClose(originalReader);
- }
- } finally {
- IoUtils.safeClose(client);
- }
- } finally {
- IoUtils.safeClose(session);
- }
- } finally {
- jrppServer.stop();
- jrppServer.destroy();
- }
- } finally {
- Remoting.closeEndpoint(endpoint);
- }
-
- }
-}
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -19,7 +19,7 @@
final StringRot13RequestListener listener = new StringRot13RequestListener();
final Endpoint endpoint = Remoting.createEndpoint("simple");
try {
- final Client<String,String> client = endpoint.createClient(listener);
+ final Client<String,String> client =
endpoint.createClient(listener).getClient();
try {
final String original = "The Secret Message\n";
final String result = client.invoke(original);
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -17,12 +17,15 @@
*/
public final class LocalStreamExampleMain {
+ private LocalStreamExampleMain() {
+ }
+
public static void main(String[] args) throws IOException, RemoteExecutionException
{
Security.addProvider(new Provider());
final StreamingRot13RequestListener listener = new
StreamingRot13RequestListener();
final Endpoint endpoint = Remoting.createEndpoint("simple");
try {
- final Client<Reader,Reader> client = endpoint.createClient(listener);
+ final Client<Reader,Reader> client =
endpoint.createClient(listener).getClient();
try {
final String original = "The Secret Message\n";
final StringReader originalReader = new StringReader(original);
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/StreamingRot13RequestListener.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/StreamingRot13RequestListener.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/StreamingRot13RequestListener.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -12,7 +12,7 @@
*/
public final class StreamingRot13RequestListener extends
AbstractRequestListener<Reader, Reader> {
- public void handleRequest(final RequestContext<Reader> readerRequestContext,
final Reader request) throws RemoteExecutionException, InterruptedException {
+ public void handleRequest(final RequestContext<Reader> readerRequestContext,
final Reader request) throws RemoteExecutionException {
try {
readerRequestContext.sendReply(new Reader() {
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/StringRot13RequestListener.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/StringRot13RequestListener.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/StringRot13RequestListener.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -10,7 +10,7 @@
*/
public final class StringRot13RequestListener extends AbstractRequestListener<String,
String> {
- public void handleRequest(final RequestContext<String> readerRequestContext,
final String request) throws RemoteExecutionException, InterruptedException {
+ public void handleRequest(final RequestContext<String> readerRequestContext,
final String request) throws RemoteExecutionException {
try {
StringBuilder b = new StringBuilder(request.length());
for (int i = 0; i < request.length(); i ++) {
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
---
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-07-02
22:54:29 UTC (rev 4341)
+++
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-07-03
00:44:19 UTC (rev 4342)
@@ -1,61 +1,28 @@
package org.jboss.cx.remoting;
import java.io.IOException;
-import java.net.SocketAddress;
import java.util.concurrent.ConcurrentMap;
-import org.jboss.cx.remoting.core.CoreEndpoint;
-import org.jboss.cx.remoting.core.protocol.LocalProtocolHandlerFactory;
-import org.jboss.cx.remoting.jrpp.JrppProtocolSupport;
-import org.jboss.cx.remoting.jrpp.JrppServer;
-import org.jboss.cx.remoting.util.AttributeMap;
+import org.jboss.cx.remoting.core.EndpointImpl;
/**
*
*/
public final class Remoting {
- private static final String JRPP_SUPPORT_KEY =
"org.jboss.cx.remoting.standalone.jrpp.support";
-
// lifecycle lock
private static final Object lifecycle = new Object();
public static Endpoint createEndpoint(String name) throws IOException {
synchronized (lifecycle) {
boolean ok = false;
- final CoreEndpoint coreEndpoint = new CoreEndpoint();
- coreEndpoint.setName(name);
- coreEndpoint.create();
+ final EndpointImpl endpointImpl = new EndpointImpl();
+ endpointImpl.setName(name);
+ endpointImpl.create();
try {
- coreEndpoint.start();
- try {
- LocalProtocolHandlerFactory.addTo(coreEndpoint);
- final JrppProtocolSupport jrppProtocolSupport = new
JrppProtocolSupport();
- jrppProtocolSupport.setEndpoint(coreEndpoint);
- jrppProtocolSupport.create();
- try {
- jrppProtocolSupport.start();
- try {
- final ConcurrentMap<Object, Object> attributes =
coreEndpoint.getAttributes();
- attributes.put(JRPP_SUPPORT_KEY, jrppProtocolSupport);
- ok = true;
- return coreEndpoint;
- } finally {
- if (! ok) {
- jrppProtocolSupport.stop();
- }
- }
- } finally {
- if (! ok) {
- jrppProtocolSupport.destroy();
- }
- }
- } finally {
- if (! ok) {
- coreEndpoint.stop();
- }
- }
+ endpointImpl.start();
+ return endpointImpl;
} finally {
if (! ok) {
- coreEndpoint.destroy();
+ endpointImpl.destroy();
}
}
}
@@ -63,42 +30,15 @@
public static void closeEndpoint(Endpoint endpoint) {
synchronized (lifecycle) {
- if (endpoint instanceof CoreEndpoint) {
- final CoreEndpoint coreEndpoint = (CoreEndpoint) endpoint;
- final ConcurrentMap<Object, Object> attributes =
coreEndpoint.getAttributes();
- final JrppProtocolSupport jrppProtocolSupport = (JrppProtocolSupport)
attributes.remove(JRPP_SUPPORT_KEY);
- coreEndpoint.stop();
- coreEndpoint.destroy();
- if (jrppProtocolSupport != null) {
- jrppProtocolSupport.stop();
- jrppProtocolSupport.destroy();
- }
+ if (endpoint instanceof EndpointImpl) {
+ final EndpointImpl endpointImpl = (EndpointImpl) endpoint;
+ final ConcurrentMap<Object, Object> attributes =
endpointImpl.getAttributes();
+ endpointImpl.stop();
+ endpointImpl.destroy();
}
}
}
- public static JrppServer addJrppServer(Endpoint endpoint, SocketAddress address,
RequestListener<?, ?> rootRequestListener, AttributeMap attributeMap) throws
IOException {
- synchronized (lifecycle) {
- boolean ok = false;
- final JrppServer jrppServer = new JrppServer();
- jrppServer.setProtocolSupport((JrppProtocolSupport)
endpoint.getAttributes().get(JRPP_SUPPORT_KEY));
- jrppServer.setSocketAddress(address);
- jrppServer.setAttributeMap(attributeMap);
- jrppServer.setEndpoint(endpoint);
- jrppServer.setRootListener(rootRequestListener);
- jrppServer.create();
- try {
- jrppServer.start();
- ok = true;
- return jrppServer;
- } finally {
- if (! ok) {
- jrppServer.destroy();
- }
- }
- }
- }
-
// privates
private Remoting() { /* empty */ }