JBoss Remoting SVN: r3605 - remoting2/branches/2.x/src/main/org/jboss/remoting/detection.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-03-12 03:54:08 -0400 (Wed, 12 Mar 2008)
New Revision: 3605
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/detection/AbstractDetector.java
Log:
JBREM-826: Removed some log.error() calls.
Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/detection/AbstractDetector.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/detection/AbstractDetector.java 2008-03-12 07:53:05 UTC (rev 3604)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/detection/AbstractDetector.java 2008-03-12 07:54:08 UTC (rev 3605)
@@ -499,7 +499,8 @@
}
catch (Exception e)
{
- log.error("Error during detection of: " + detection, e);
+ log.warn("Error during detection of: " + detection);
+ log.debug("Error during detection of: " + detection, e);
}
}
else if (log.isTraceEnabled())
16 years, 9 months
JBoss Remoting SVN: r3604 - remoting2/branches/2.x/src/main/org/jboss/remoting/callback.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-03-12 03:53:05 -0400 (Wed, 12 Mar 2008)
New Revision: 3604
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java
Log:
JBREM-826: Removed some log.error() calls.
Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java 2008-03-12 07:52:33 UTC (rev 3603)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java 2008-03-12 07:53:05 UTC (rev 3604)
@@ -634,7 +634,7 @@
}
catch(IOException e)
{
- log.error("Can not get persisted callbacks.", e);
+ log.debug("Can not get persisted callbacks.", e);
throw new RuntimeException("Error getting callbacks", e);
}
callbackList.addAll(persistedCallbacks);
@@ -765,7 +765,7 @@
}
catch(IOException e)
{
- log.error("Unable to persist callback", e);
+ log.debug("Unable to persist callback", e);
throw new HandleCallbackException("Unable to persist callback and will not " +
"be able to deliver.", e);
}
@@ -852,7 +852,7 @@
}
catch(Throwable t)
{
- log.error("Error handling callback", t);
+ log.debug("Error handling callback", t);
throw new HandleCallbackException("Error handling callback", t);
}
}
16 years, 9 months
JBoss Remoting SVN: r3603 - remoting2/branches/2.x/src/main/org/jboss/remoting/callback.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-03-12 03:52:33 -0400 (Wed, 12 Mar 2008)
New Revision: 3603
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/callback/CallbackPoller.java
Log:
JBREM-826: Removed some log.error() calls.
Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/callback/CallbackPoller.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/callback/CallbackPoller.java 2008-03-12 06:08:24 UTC (rev 3602)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/callback/CallbackPoller.java 2008-03-12 07:52:33 UTC (rev 3603)
@@ -209,7 +209,8 @@
return;
}
- log.error(this + " Error getting callbacks from server.", throwable);
+ log.info(this + " Error getting callbacks from server.");
+ log.debug(this + " Error getting callbacks from server.", throwable);
String errorMessage = throwable.getMessage();
if (errorMessage != null)
{
@@ -362,7 +363,7 @@
}
catch (InterruptedException e)
{
- log.warn("unexpected interrupt");
+ log.debug("unexpected interrupt");
continue;
}
}
@@ -462,7 +463,7 @@
}
catch (InterruptedException e)
{
- log.warn("unexpected interrupt");
+ log.debug("unexpected interrupt");
continue;
}
}
16 years, 9 months
JBoss Remoting SVN: r3602 - remoting2/branches/2.x/src/main/org/jboss/remoting.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-03-12 02:08:24 -0400 (Wed, 12 Mar 2008)
New Revision: 3602
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/ServerInvoker.java
Log:
JBREM-826: Removed some log.error() calls.
Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/ServerInvoker.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/ServerInvoker.java 2008-03-12 06:05:46 UTC (rev 3601)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/ServerInvoker.java 2008-03-12 06:08:24 UTC (rev 3602)
@@ -1556,13 +1556,13 @@
}
catch (Exception e)
{
- log.error("unable to instantiate class: " + o, e);
+ log.warn("unable to instantiate class: " + o, e);
return ssf;
}
}
else
{
- log.error("unrecognized type for socket creation server listener: " + o);
+ log.warn("unrecognized type for socket creation server listener: " + o);
return ssf;
}
}
16 years, 9 months
JBoss Remoting SVN: r3601 - remoting2/branches/2.x/src/main/org/jboss/remoting.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-03-12 02:05:46 -0400 (Wed, 12 Mar 2008)
New Revision: 3601
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/AbstractInvoker.java
Log:
JBREM-826: Removed some log.error() calls.
Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/AbstractInvoker.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/AbstractInvoker.java 2008-03-11 23:54:16 UTC (rev 3600)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/AbstractInvoker.java 2008-03-12 06:05:46 UTC (rev 3601)
@@ -89,7 +89,7 @@
}
catch (PrivilegedActionException e)
{
- log.error(e.toString(), e);
+ log.debug(e.toString(), e);
throw new RuntimeException("Can't create a ClassLoader", e);
}
this.locator = locator;
@@ -277,7 +277,7 @@
}
catch (PrivilegedActionException e)
{
- log.error(e.toString(), e);
+ log.debug(e.toString(), e);
throw new RuntimeException("Can't create a ClassLoader", e);
}
}
@@ -413,13 +413,13 @@
}
catch (Exception e)
{
- log.error("unable to instantiate class: " + o, e);
+ log.warn("unable to instantiate class: " + o, e);
return socketFactory;
}
}
else
{
- log.error("unrecognized type for socket creation client listener: " + o);
+ log.warn("unrecognized type for socket creation client listener: " + o);
return socketFactory;
}
}
16 years, 9 months
JBoss Remoting SVN: r3600 - remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-03-11 19:54:16 -0400 (Tue, 11 Mar 2008)
New Revision: 3600
Modified:
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AtomicStateMachine.java
Log:
Quiet a generics warning by providing a non-varargs variation of in*()
Modified: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AtomicStateMachine.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AtomicStateMachine.java 2008-03-11 23:46:24 UTC (rev 3599)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AtomicStateMachine.java 2008-03-11 23:54:16 UTC (rev 3600)
@@ -488,6 +488,19 @@
return state;
}
+ public boolean inHoldExclusive(T state) {
+ writeLock.lock();
+ boolean ok = false;
+ try {
+ ok = this.state == state;
+ return ok;
+ } finally {
+ if (! ok) {
+ writeLock.unlock();
+ }
+ }
+ }
+
public boolean inHoldExclusive(T... states) {
if (states == null) {
throw new NullPointerException("states is null");
@@ -502,6 +515,19 @@
return false;
}
+ public boolean inHold(T state) {
+ readLock.lock();
+ boolean ok = false;
+ try {
+ ok = this.state == state;
+ return ok;
+ } finally {
+ if (! ok) {
+ readLock.unlock();
+ }
+ }
+ }
+
public boolean inHold(T... states) {
if (states == null) {
throw new NullPointerException("states is null");
@@ -516,6 +542,15 @@
return false;
}
+ public boolean in(T state) {
+ readLock.lock();
+ try {
+ return this.state == state;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
public boolean in(T... states) {
if (states == null) {
throw new NullPointerException("states is null");
16 years, 9 months
JBoss Remoting SVN: r3599 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/service and 17 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-03-11 19:46:24 -0400 (Tue, 11 Mar 2008)
New Revision: 3599
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/package-info.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/package-info.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ByteMessageInput.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ByteMessageOutput.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/DataMessageInput.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/DataMessageOutput.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ObjectMessageInput.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ObjectMessageOutput.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/ObjectResolver.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSinkWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSourceWrapper.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealContext.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/BaseContextClient.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextClient.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextMarker.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextServer.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolContextClient.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolContextServer.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestClient.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestServer.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceClient.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceServer.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ClassLoaderResourceListener.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ServiceLocatorListener.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DelegatingObjectInput.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/QueueExecutor.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteMessageInput.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteMessageOutput.java
Removed:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptors.png
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ProgressStream.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreDeployedService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/DelegatingObjectInput.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalProtocol.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/OrderedExecutorFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/QueueExecutor.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ProgressStreamStreamSerializerFactory.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteInput.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteOutput.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ByteInput.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ByteOutput.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/MessageInput.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/MessageOutput.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/IndeterminateOutcomeException.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestListener.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandler.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolServerContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamSerializer.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextSourceWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/SessionWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/Streams.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreStream.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClassLoader.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/DefaultStreamDetector.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/IteratorStreamSerializerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSinkStreamSerializerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/OutputStreamStreamSerailizerFactory.java
remoting3/trunk/http-se6/src/main/java/org/jboss/cx/remoting/http/se6/ServerInstance.java
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionImpl.java
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/IncomingHttpMessage.java
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/OutgoingHttpMessage.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppRequestIdentifier.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppServiceIdentifier.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AbstractTypeMap.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AtomicMap.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ServiceURI.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/TypeMap.java
Log:
Context forwarding revamp - now context objects themselves are forwarded over a connection. Compiles, does not run yet
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -4,7 +4,27 @@
*
*/
public interface Closeable<T> extends java.io.Closeable {
+ // TODO - do we need an async close method?
+
+ /**
+ * Close, waiting for any outstanding processing to finish.
+ *
+ * @throws RemotingException if the close failed
+ */
void close() throws RemotingException;
+ /**
+ * Close immediately. Any outstanding processing is immediately aborted.
+ *
+ * @throws RemotingException if the close failed
+ */
+ void closeImmediate() throws RemotingException;
+
+ /**
+ * Add a handler that will be called upon close. The handler may be called before or after the close acutally
+ * takes place.
+ *
+ * @param handler the close handler
+ */
void addCloseHandler(CloseHandler<T> handler);
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -7,9 +7,6 @@
* state, as well as other state maintained by the remote side.
*/
public interface Context<I, O> extends Closeable<Context<I, O>> {
-
- void close() throws RemotingException;
-
/**
* Send a request and block until a reply is received.
* <p/>
@@ -24,42 +21,37 @@
*
* @throws RemotingException if the request could not be sent
* @throws RemoteExecutionException if the remote handler threw an exception
- * @throws InterruptedException if the request was interrupted (and thereby cancelled)
*/
- O invokeInterruptibly(I request) throws RemotingException, RemoteExecutionException, InterruptedException;
+ O invoke(I request) throws RemotingException, RemoteExecutionException;
/**
- * Send a request and block until a reply is received.
+ * Send a request asynchronously.
* <p/>
- * Uses the default invocation policy for handling remote invocations. If the remote side manipulates a stream, the
- * current thread MAY be used to handle it.
+ * Uses the default invocation policy for handling remote invocations. If the remote side manipulates a stream, it
+ * MAY fail with an exception (e.g. if this method is called on a client with no threads to handle streaming).
* <p/>
- * If the remote session cannot handle the request, a {@code RemotingException} will be thrown.
+ * Returns immediately.
*
* @param request the request to send
*
- * @return the result of the request
+ * @return a future representing the result of the request
*
* @throws RemotingException if the request could not be sent
- * @throws RemoteExecutionException if the remote handler threw an exception
*/
- O invoke(I request) throws RemotingException, RemoteExecutionException;
+ FutureReply<O> send(I request) throws RemotingException;
/**
- * Send a request asynchronously.
- * <p/>
+ * Send a request asynchronously, ignoring the reply.
+ * </p>
* Uses the default invocation policy for handling remote invocations. If the remote side manipulates a stream, it
* MAY fail with an exception (e.g. if this method is called on a client with no threads to handle streaming).
* <p/>
* Returns immediately.
*
* @param request the request to send
- *
- * @return a future representing the result of the request
- *
* @throws RemotingException if the request could not be sent
*/
- FutureReply<O> send(I request) throws RemotingException;
+ void sendOneWay(I request) throws RemotingException;
/**
* Get the context map. This map holds metadata about the current context.
@@ -68,4 +60,11 @@
*/
ConcurrentMap<Object, Object> getAttributes();
+ void close() throws RemotingException;
+
+ void closeCancelling(boolean mayInterrupt) throws RemotingException;
+
+ void closeImmediate() throws RemotingException;
+
+ void addCloseHandler(final CloseHandler<Context<I, O>> closeHandler);
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -26,12 +26,13 @@
*
* @param remoteUri the URI of the server to connect to
* @param attributeMap the attribute map to use to configure this session
+ * @param rootContext the root context for the new session
* @return a new session
*
* @throws RemotingException if there is a problem creating the session, or if the request or reply type does not
* match the remote service
*/
- Session openSession(URI remoteUri, AttributeMap attributeMap) throws RemotingException;
+ <I, O> Session openSession(URI remoteUri, AttributeMap attributeMap, Context<I, O> rootContext) throws RemotingException;
/**
* Get the name of this endpoint.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -28,6 +28,14 @@
boolean cancel(boolean mayInterruptIfRunning);
/**
+ * Asynchronously send a request to cancel this request. Does not block the current method. Use the
+ * {@link #addCompletionNotifier(RequestCompletionHandler)} method to add a notifier to be called upon completion.
+ *
+ * @param mayInterruptIfRunning
+ */
+ FutureReply<T> sendCancel(boolean mayInterruptIfRunning);
+
+ /**
* Returns {@code true} if this task was cancelled before it completed normally.
*
* @return {@code true} if task was cancelled before it completed
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/IndeterminateOutcomeException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/IndeterminateOutcomeException.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/IndeterminateOutcomeException.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -3,7 +3,7 @@
/**
*
*/
-public final class IndeterminateOutcomeException extends RemoteExecutionException {
+public class IndeterminateOutcomeException extends RemoteExecutionException {
public IndeterminateOutcomeException() {
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestListener.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestListener.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestListener.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -5,7 +5,12 @@
*/
public interface RequestListener<I, O> {
/**
- * Handle the request. If this method throws {@code RemoteExecutionException}, then that exception is passed
+ * Handle the opening of a context.
+ */
+ void handleOpen();
+
+ /**
+ * Handle a request. If this method throws {@code RemoteExecutionException}, then that exception is passed
* back to the caller and the request is marked as complete. If this method throws {@code InterruptedException},
* the request is cancelled, and the interrupted status is propagated to the executor.. Otherwise, the request
* listener must send back either a reply (using the {@code sendReply()} method on the {@code RequestContext}) or
@@ -19,4 +24,9 @@
* @throws InterruptedException if the thread is interrupted
*/
void handleRequest(RequestContext<O> context, I request) throws RemoteExecutionException, InterruptedException;
+
+ /**
+ * Handle the close of a context.
+ */
+ void handleClose();
}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/package-info.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/package-info.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/package-info.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,4 @@
+/**
+ * The base Remoting 3 API package.
+ */
+package org.jboss.cx.remoting;
\ No newline at end of file
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/package-info.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/package-info.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/package-info.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,4 @@
+/**
+ *
+ */
+package org.jboss.cx.remoting.service;
\ No newline at end of file
Copied: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ByteMessageInput.java (from rev 3517, remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ByteInput.java)
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ByteMessageInput.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ByteMessageInput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,44 @@
+package org.jboss.cx.remoting.spi;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A readable source of byte data.
+ */
+public interface ByteMessageInput extends Closeable {
+ /**
+ * Read one byte.
+ *
+ * @return the byte, or -1 if the end of the stream has been reached.
+ * @throws IOException if an I/O error occurs
+ */
+ int read() throws IOException;
+
+ /**
+ * Read a series of bytes into an array.
+ *
+ * @param data the array into which data is to be read
+ * @return the total number of bytes read, or -1 if there are no bytes remaining to read
+ * @throws IOException if an I/O error occurs
+ */
+ int read(byte[] data) throws IOException;
+
+ /**
+ * Read a series of bytes into an array.
+ *
+ * @param data the array into which data is to be read
+ * @param offs the start offset in the {@code data} array at which the data is written
+ * @param len the maximum number of bytes to read
+ * @return the total number of bytes read, or -1 if there are no bytes remaining to read
+ * @throws IOException if an I/O error occurs
+ */
+ int read(byte[] data, int offs, int len) throws IOException;
+
+ /**
+ * Return the number of bytes remaining.
+ *
+ * @return the number of bytes, or -1 if the byte count cannot be determined
+ */
+ int remaining();
+}
Copied: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ByteMessageOutput.java (from rev 3517, remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ByteOutput.java)
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ByteMessageOutput.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ByteMessageOutput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,52 @@
+package org.jboss.cx.remoting.spi;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+
+/**
+ * A writable destination for byte data.
+ */
+public interface ByteMessageOutput extends Closeable, Flushable {
+ /**
+ * Write a single byte of data. The input argument is truncated to 8 bits.
+ *
+ * @param b the byte to write
+ * @throws IOException if an I/O error occurs
+ */
+ void write(int b) throws IOException;
+
+ /**
+ * Write many bytes of data.
+ *
+ * @param b the bytes to write
+ * @throws IOException if an I/O error occurs
+ */
+ void write(byte[] b) throws IOException;
+
+ /**
+ * Write many bytes of data.
+ *
+ * @param b the bytes to write
+ * @param offs the offset in {@code b} to start reading bytes from
+ * @param len the number of bytes to write
+ * @throws IOException if an I/O error occurs
+ */
+ void write(byte[] b, int offs, int len) throws IOException;
+
+ /**
+ * Commit the written data. This causes the accumulated data to be sent as a message on the underlying
+ * channel.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ void commit() throws IOException;
+
+ /**
+ * Get a count of the number of bytes written to this message.
+ *
+ * @return the count
+ * @throws IOException if an I/O error occurs
+ */
+ int getBytesWritten() throws IOException;
+}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/DataMessageInput.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/DataMessageInput.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/DataMessageInput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,9 @@
+package org.jboss.cx.remoting.spi;
+
+import java.io.DataInput;
+
+/**
+ *
+ */
+public interface DataMessageInput extends ByteMessageInput, DataInput {
+}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/DataMessageOutput.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/DataMessageOutput.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/DataMessageOutput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,9 @@
+package org.jboss.cx.remoting.spi;
+
+import java.io.DataOutput;
+
+/**
+ *
+ */
+public interface DataMessageOutput extends ByteMessageOutput, DataOutput {
+}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptors.png
===================================================================
(Binary files differ)
Copied: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ObjectMessageInput.java (from rev 3517, remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/MessageInput.java)
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ObjectMessageInput.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ObjectMessageInput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,29 @@
+package org.jboss.cx.remoting.spi;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+
+/**
+ * A readable message.
+ */
+public interface ObjectMessageInput extends DataMessageInput, ObjectInput {
+ /**
+ * Read an object using the current context classloader, or, if there is no such classloader, the classloader
+ * which loaded this interface.
+ *
+ * @return the object from the message
+ * @throws ClassNotFoundException if the class of the object could not be resolved by the classloader
+ * @throws IOException if an I/O error occurs
+ */
+ Object readObject() throws ClassNotFoundException, IOException;
+
+ /**
+ * Read an object using the given classloader.
+ *
+ * @param loader the classloader to use
+ * @return the object from the message
+ * @throws ClassNotFoundException if the class of the object could not be resolved by the classloader
+ * @throws IOException if an I/O error occurs
+ */
+ Object readObject(ClassLoader loader) throws ClassNotFoundException, IOException;
+}
Copied: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ObjectMessageOutput.java (from rev 3517, remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/MessageOutput.java)
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ObjectMessageOutput.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ObjectMessageOutput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,9 @@
+package org.jboss.cx.remoting.spi;
+
+import java.io.ObjectOutput;
+
+/**
+ * A writable message.
+ */
+public interface ObjectMessageOutput extends DataMessageOutput, ObjectOutput {
+}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,24 @@
+package org.jboss.cx.remoting.spi.marshal;
+
+import org.jboss.cx.remoting.spi.ObjectMessageOutput;
+import org.jboss.cx.remoting.spi.DataMessageOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageInput;
+import org.jboss.cx.remoting.spi.DataMessageInput;
+import java.io.IOException;
+
+/**
+ *
+ */
+public interface Marshaller {
+ ObjectMessageOutput getMessageOutput(DataMessageOutput dataMessageOutput) throws IOException;
+
+ ObjectMessageInput getMessageInput(DataMessageInput dataMessageInput) throws IOException;
+
+ Marshaller createChild() throws IOException;
+
+ Marshaller createChild(ClassLoader classLoader) throws IOException;
+
+ void addFirstObjectResolver(ObjectResolver resolver);
+
+ void addLastObjectResolver(ObjectResolver resolver);
+}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/MarshallerFactory.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,10 @@
+package org.jboss.cx.remoting.spi.marshal;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public interface MarshallerFactory {
+ Marshaller createRootMarshaller(ClassLoader classLoader) throws IOException;
+}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/ObjectResolver.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/ObjectResolver.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/ObjectResolver.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,12 @@
+package org.jboss.cx.remoting.spi.marshal;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public interface ObjectResolver {
+ Object readResolve(Object original) throws IOException;
+
+ Object writeReplace(Object original) throws IOException;
+}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolContext.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolContext.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -3,10 +3,10 @@
import java.io.IOException;
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.util.ByteInput;
-import org.jboss.cx.remoting.util.ByteOutput;
-import org.jboss.cx.remoting.util.MessageInput;
-import org.jboss.cx.remoting.util.MessageOutput;
+import org.jboss.cx.remoting.spi.ByteMessageInput;
+import org.jboss.cx.remoting.spi.ByteMessageOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageInput;
+import org.jboss.cx.remoting.spi.ObjectMessageOutput;
/**
*
@@ -21,13 +21,15 @@
void receiveCancelAcknowledge(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier);
- void receiveServiceTerminate(ServiceIdentifier serviceIdentifier);
+ void receiveServiceClosing(ServiceIdentifier serviceIdentifier);
+ void receiveContextClosing(ContextIdentifier contextIdentifier, boolean done);
+
/* SERVER methods */
- void closeContext(ContextIdentifier remoteContextIdentifier);
+ void receiveServiceClose(ServiceIdentifier remoteServiceIdentifier);
- void closeService(ServiceIdentifier remoteServiceIdentifier);
+ void receiveContextClose(ContextIdentifier remoteContextIdentifier, boolean immediate, boolean cancel, boolean interrupt);
void receiveOpenedContext(ServiceIdentifier remoteServiceIdentifier, ContextIdentifier remoteContextIdentifier);
@@ -39,7 +41,7 @@
void closeStream(StreamIdentifier streamIdentifier);
- void receiveStreamData(StreamIdentifier streamIdentifier, MessageInput data);
+ void receiveStreamData(StreamIdentifier streamIdentifier, ObjectMessageInput data);
void openSession(String remoteEndpointName);
@@ -47,11 +49,11 @@
/* CLIENT OR SERVER methods */
- MessageOutput getMessageOutput(ByteOutput target) throws IOException;
+ ObjectMessageOutput getMessageOutput(ByteMessageOutput target) throws IOException;
- MessageOutput getMessageOutput(ByteOutput target, Executor streamExecutor) throws IOException;
+ ObjectMessageOutput getMessageOutput(ByteMessageOutput target, Executor streamExecutor) throws IOException;
- MessageInput getMessageInput(ByteInput source) throws IOException;
+ ObjectMessageInput getMessageInput(ByteMessageInput source) throws IOException;
String getLocalEndpointName();
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandler.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandler.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -3,7 +3,7 @@
import java.io.IOException;
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.util.MessageOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageOutput;
/**
* A protocol handler.
@@ -59,14 +59,23 @@
void sendCancelAcknowledge(ContextIdentifier remoteContextIdentifier, RequestIdentifier requestIdentifier) throws IOException;
/**
- * Notify the client side that the service has been abruptly terminated on the server. A service may be terminated
- * if it is undeployed, the session is being shut down, or if the activation request was not accepted.
+ * Notify the client side that the service has been terminated on the server. A service may be terminated
+ * if it is undeployed, the session is being shut down, or if the client side requested the service to close.
*
* @param remoteServiceIdentifier the remote service identifier
* @throws IOException if an I/O error occurs
*/
- void sendServiceTerminate(ServiceIdentifier remoteServiceIdentifier) throws IOException;
+ void sendServiceClosing(ServiceIdentifier remoteServiceIdentifier) throws IOException;
+ /**
+ * Notify the client side that a context is closing and is no longer available for new requests.
+ *
+ * @param remoteContextIdentifier the remote context identifier
+ * @param done {@code true} if the context is closed and no more replies will arrive for it
+ * @throws IOException if an I/O error occurs
+ */
+ void sendContextClosing(ContextIdentifier remoteContextIdentifier, boolean done) throws IOException;
+
/* CLIENT methods */
/**
@@ -101,13 +110,16 @@
/**
* Close a previously opened context. The protocol handler should cause the
- * {@link org.jboss.cx.remoting.spi.protocol.ProtocolContext#closeContext(ContextIdentifier)} method to be called
+ * {@link ProtocolContext#receiveContextClose(ContextIdentifier,boolean,boolean,boolean)} method to be called
* on the remote side for this context identifier.
*
- * @param contextIdentifier
+ * @param contextIdentifier the context identifier
+ * @param immediate {@code true} to immediately shut down the context and throw out any executing requests (implies {@code cancel} and {@code interrupt})
+ * @param cancel {@code true} to cancel any outstanding requests
+ * @param interrupt {@code true} to interrupt tasks that are cancelled (ignored unless {@code immediate} or {@code cancel} are {@code true})
* @throws IOException if an I/O error occurs
*/
- void sendContextClose(ContextIdentifier contextIdentifier) throws IOException;
+ void sendContextClose(ContextIdentifier contextIdentifier, boolean immediate, boolean cancel, boolean interrupt) throws IOException;
/**
* Acquire a new request identifier that will be used to send a request.
@@ -124,7 +136,7 @@
* @param serviceIdentifier the service identifier
* @throws IOException if an I/O error occurs
*/
- void closeService(ServiceIdentifier serviceIdentifier) throws IOException;
+ void sendServiceClose(ServiceIdentifier serviceIdentifier) throws IOException;
/**
* Send a request to the remote side.
@@ -187,9 +199,9 @@
/**
* Send data over a stream. Returns a message output buffer that the message is written into. When the message
- * is fully written, the {@link org.jboss.cx.remoting.util.MessageOutput#commit()} method will be called to perform
+ * is fully written, the {@link org.jboss.cx.remoting.spi.ObjectMessageOutput#commit()} method will be called to perform
* the transmission. The supplied executor should be passed in to
- * {@link org.jboss.cx.remoting.spi.protocol.ProtocolContext#getMessageOutput(org.jboss.cx.remoting.util.ByteOutput, java.util.concurrent.Executor)},
+ * {@link org.jboss.cx.remoting.spi.protocol.ProtocolContext#getMessageOutput(org.jboss.cx.remoting.spi.ByteMessageOutput , java.util.concurrent.Executor)},
* if that method is used for serialization.
*
* @param streamIdentifier the stream to send data on
@@ -198,7 +210,7 @@
*
* @throws IOException if an I/O error occurs
*/
- MessageOutput sendStreamData(StreamIdentifier streamIdentifier, Executor streamExecutor) throws IOException;
+ ObjectMessageOutput sendStreamData(StreamIdentifier streamIdentifier, Executor streamExecutor) throws IOException;
/**
* Close the session.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolServerContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolServerContext.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolServerContext.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,9 +1,11 @@
package org.jboss.cx.remoting.spi.protocol;
+import org.jboss.cx.remoting.Context;
+
/**
*
*/
public interface ProtocolServerContext {
- ProtocolContext establishSession(ProtocolHandler handler);
+ <I, O> ProtocolContext establishSession(ProtocolHandler handler, Context<I, O> rootContext);
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamContext.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamContext.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -2,7 +2,7 @@
import java.io.Closeable;
import java.io.IOException;
-import org.jboss.cx.remoting.util.MessageOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageOutput;
/**
*
@@ -15,7 +15,7 @@
* @return the message output instance
* @throws IOException if an error occurs
*/
- MessageOutput writeMessage() throws IOException;
+ ObjectMessageOutput writeMessage() throws IOException;
/**
* Indicate that this stream is exhausted.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamSerializer.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamSerializer.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamSerializer.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,7 +1,7 @@
package org.jboss.cx.remoting.spi.stream;
import java.io.IOException;
-import org.jboss.cx.remoting.util.MessageInput;
+import org.jboss.cx.remoting.spi.ObjectMessageInput;
/**
*
@@ -20,7 +20,7 @@
* @param data the message
* @throws IOException if the stream data cannot be handled
*/
- void handleData(MessageInput data) throws IOException;
+ void handleData(ObjectMessageInput data) throws IOException;
/**
* Handle a close from the remote side.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextSourceWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextSourceWrapper.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextSourceWrapper.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -19,6 +19,10 @@
delegate.close();
}
+ public void closeImmediate() throws RemotingException {
+ delegate.closeImmediate();
+ }
+
public void addCloseHandler(final CloseHandler<ContextSource<I, O>> closeHandler) {
delegate.addCloseHandler(new CloseHandler<ContextSource<I, O>>() {
public void handleClose(final ContextSource<I, O> closed) {
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -21,6 +21,14 @@
delegate.close();
}
+ public void closeCancelling(final boolean mayInterrupt) throws RemotingException {
+ delegate.closeCancelling(mayInterrupt);
+ }
+
+ public void closeImmediate() throws RemotingException {
+ delegate.closeImmediate();
+ }
+
public void addCloseHandler(final CloseHandler<Context<I, O>> closeHandler) {
delegate.addCloseHandler(new CloseHandler<Context<I, O>>() {
public void handleClose(final Context<I, O> closed) {
@@ -29,10 +37,6 @@
});
}
- public O invokeInterruptibly(final I request) throws RemotingException, RemoteExecutionException, InterruptedException {
- return delegate.invokeInterruptibly(request);
- }
-
public O invoke(final I request) throws RemotingException, RemoteExecutionException {
return delegate.invoke(request);
}
@@ -41,6 +45,10 @@
return delegate.send(request);
}
+ public void sendOneWay(final I request) throws RemotingException {
+ delegate.sendOneWay(request);
+ }
+
public ConcurrentMap<Object, Object> getAttributes() {
return delegate.getAttributes();
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -27,8 +27,8 @@
return delegate.getAttributes();
}
- public Session openSession(final URI remoteUri, final AttributeMap attributeMap) throws RemotingException {
- return delegate.openSession(remoteUri, attributeMap);
+ public <I, O> Session openSession(final URI remoteUri, final AttributeMap attributeMap, final Context<I, O> rootContext) throws RemotingException {
+ return delegate.openSession(remoteUri, attributeMap, rootContext);
}
public String getName() {
@@ -51,6 +51,10 @@
delegate.close();
}
+ public void closeImmediate() throws RemotingException {
+ delegate.closeImmediate();
+ }
+
public void addCloseHandler(final CloseHandler<Endpoint> closeHandler) {
delegate.addCloseHandler(new CloseHandler<Endpoint>() {
public void handleClose(final Endpoint closed) {
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/SessionWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/SessionWrapper.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/SessionWrapper.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -20,6 +20,10 @@
delegate.close();
}
+ public void closeImmediate() throws RemotingException {
+ delegate.closeImmediate();
+ }
+
public void addCloseHandler(final CloseHandler<Session> closeHandler) {
delegate.addCloseHandler(new CloseHandler<Session>() {
public void handleClose(final Session closed) {
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSinkWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSinkWrapper.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSinkWrapper.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,37 @@
+package org.jboss.cx.remoting.stream;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public class ObjectSinkWrapper<T> implements ObjectSink<T> {
+ private ObjectSink<T> target;
+
+ protected ObjectSinkWrapper() {
+ }
+
+ public ObjectSinkWrapper(final ObjectSink<T> target) {
+ this.target = target;
+ }
+
+ protected final ObjectSink<T> getTarget() {
+ return target;
+ }
+
+ protected final void setTarget(final ObjectSink<T> target) {
+ this.target = target;
+ }
+
+ public void accept(final T instance) throws IOException {
+ target.accept(instance);
+ }
+
+ public void flush() throws IOException {
+ target.flush();
+ }
+
+ public void close() throws IOException {
+ target.close();
+ }
+}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSourceWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSourceWrapper.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSourceWrapper.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,37 @@
+package org.jboss.cx.remoting.stream;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public class ObjectSourceWrapper<T> implements ObjectSource<T> {
+ private ObjectSource<T> target;
+
+ protected ObjectSourceWrapper() {
+ }
+
+ public ObjectSourceWrapper(final ObjectSource<T> target) {
+ this.target = target;
+ }
+
+ protected ObjectSource<T> getTarget() {
+ return target;
+ }
+
+ protected void setTarget(final ObjectSource<T> target) {
+ this.target = target;
+ }
+
+ public boolean hasNext() throws IOException {
+ return target.hasNext();
+ }
+
+ public T next() throws IOException {
+ return target.next();
+ }
+
+ public void close() throws IOException {
+ target.close();
+ }
+}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ProgressStream.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ProgressStream.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ProgressStream.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,17 +0,0 @@
-package org.jboss.cx.remoting.stream;
-
-/**
- * A streamable object that represents the progress of a long-running operation. The {@code update} method is called
- * periodically with informational data that can be displayed to a user.
- */
-public interface ProgressStream {
- /**
- * Update progress information.
- *
- * @param operationTitle a string holding the title of the current operation
- * @param unitsDone the number of completed units of work
- * @param totalUnits the total number of units of work; -1 if unknown
- * @param approx {@code true} if the value of {@code totalUnits} is approximate (i.e. not exact)
- */
- void update(String operationTitle, int unitsDone, int totalUnits, boolean approx);
-}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/Streams.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/Streams.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/Streams.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -5,6 +5,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Enumeration;
/**
*
@@ -21,6 +22,10 @@
return new IteratorObjectSource<T>(iterator);
}
+ public static <T> ObjectSource<T> getEnumerationObjectSource(Enumeration<T> enumeration) {
+ return new EnumerationObjectSource<T>(enumeration);
+ }
+
private static final class CollectionObjectSink<T> implements ObjectSink<T> {
private final Collection<T> target;
@@ -64,4 +69,30 @@
//empty
}
}
+
+ private static final class EnumerationObjectSource<T> implements ObjectSource<T> {
+ private final Enumeration<T> src;
+
+ public EnumerationObjectSource(final Enumeration<T> src) {
+ this.src = src;
+ }
+
+ public boolean hasNext() throws IOException {
+ return src.hasMoreElements();
+ }
+
+ public T next() throws IOException {
+ try {
+ return src.nextElement();
+ } catch (NoSuchElementException ex) {
+ EOFException eex = new EOFException("Read past end of enumeration");
+ eex.setStackTrace(ex.getStackTrace());
+ throw eex;
+ }
+ }
+
+ public void close() throws IOException {
+ // empty
+ }
+ }
}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealContext.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealContext.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,25 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.Context;
+import java.io.Serializable;
+
+/**
+ *
+ */
+public abstract class AbstractRealContext<I, O> implements Context<I, O>, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private ContextServer<I,O> contextServer;
+
+ protected AbstractRealContext(final ContextServer<I, O> contextServer) {
+ this.contextServer = contextServer;
+ }
+
+ private Object writeReplace() {
+ return contextServer;
+ }
+
+ protected ContextServer<I, O> getContextServer() {
+ return contextServer;
+ }
+}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/BaseContextClient.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/BaseContextClient.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/BaseContextClient.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,11 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.RemotingException;
+
+/**
+ *
+ */
+public class BaseContextClient implements ContextClient {
+ public void handleClosing(final boolean done) throws RemotingException {
+ }
+}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextClient.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextClient.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextClient.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,10 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.RemotingException;
+
+/**
+ *
+ */
+public interface ContextClient {
+ void handleClosing(boolean done) throws RemotingException;
+}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextMarker.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextMarker.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextMarker.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,28 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
+import java.io.Serializable;
+
+/**
+ *
+ */
+public final class ContextMarker implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private ContextIdentifier contextIdentifer;
+
+ public ContextMarker() {
+ }
+
+ public ContextMarker(final ContextIdentifier contextIdentifer) {
+ this.contextIdentifer = contextIdentifer;
+ }
+
+ public ContextIdentifier getContextIdentifer() {
+ return contextIdentifer;
+ }
+
+ public void setContextIdentifer(final ContextIdentifier contextIdentifer) {
+ this.contextIdentifer = contextIdentifer;
+ }
+}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextServer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextServer.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ContextServer.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,12 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.RemotingException;
+
+/**
+ *
+ */
+public interface ContextServer<I, O> {
+ RequestServer<I> createNewRequest(RequestClient<O> requestClient) throws RemotingException;
+
+ void handleClose(boolean immediate, boolean cancel, boolean interrupt) throws RemotingException;
+}
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreDeployedService.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreDeployedService.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreDeployedService.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,30 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import org.jboss.cx.remoting.RequestListener;
-
-/**
- *
- */
-public final class CoreDeployedService<I, O> {
- private final String name;
- private final String type;
- private final RequestListener<I, O> requestListener;
-
- CoreDeployedService(final String name, final String type, final RequestListener<I, O> requestListener) {
- this.name = name;
- this.type = type;
- this.requestListener = requestListener;
- }
-
- public String getName() {
- return name;
- }
-
- public String getType() {
- return type;
- }
-
- public RequestListener<I, O> getRequestListener() {
- return requestListener;
- }
-}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -14,6 +14,7 @@
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.ContextSource;
import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.core.util.OrderedExecutorFactory;
import org.jboss.cx.remoting.util.CollectionUtil;
import org.jboss.cx.remoting.version.Version;
import org.jboss.cx.remoting.log.Logger;
@@ -88,17 +89,13 @@
sessions.notifyAll();
}
- RequestListener<?, ?> getRootRequestListener() {
- return rootRequestListener;
- }
-
public final class CoreProtocolServerContext implements ProtocolServerContext {
private CoreProtocolServerContext() {
}
- public ProtocolContext establishSession(ProtocolHandler handler) {
+ public <I, O> ProtocolContext establishSession(final ProtocolHandler handler, final Context<I, O> rootContext) {
final CoreSession session = new CoreSession(CoreEndpoint.this);
- session.initializeServer(handler);
+ session.initializeServer(handler, rootContext);
return session.getProtocolContext();
}
}
@@ -157,7 +154,7 @@
return endpointMap;
}
- public Session openSession(final URI uri, final AttributeMap attributeMap) throws RemotingException {
+ public <I, O> Session openSession(final URI uri, final AttributeMap attributeMap, final Context<I, O> rootContext) throws RemotingException {
final String scheme = uri.getScheme();
if (scheme == null) {
throw new RemotingException("No scheme on remote endpoint URI");
@@ -171,7 +168,7 @@
final ProtocolHandlerFactory factory = registration.getProtocolHandlerFactory();
try {
final CoreSession session = new CoreSession(CoreEndpoint.this);
- session.initializeClient(factory, uri, attributeMap);
+ session.initializeClient(factory, uri, attributeMap, rootContext);
sessions.add(session);
return session.getUserSession();
} catch (IOException e) {
@@ -206,17 +203,29 @@
}
public <I, O> Context<I, O> createContext(RequestListener<I, O> requestListener) {
- return null;
+ final CoreInboundContext<I, O> inbound = new CoreInboundContext<I, O>(requestListener, executor);
+ final CoreOutboundContext<I, O> outbound = new CoreOutboundContext<I, O>(executor);
+ inbound.initialize(outbound.getContextClient());
+ outbound.initialize(inbound.getContextServer());
+ return outbound.getUserContext();
}
public <I, O> ContextSource<I, O> createService(RequestListener<I, O> requestListener) {
- return null;
+ final CoreInboundService<I, O> inbound = new CoreInboundService<I, O>(requestListener, executor);
+ final CoreOutboundService<I, O> outbound = new CoreOutboundService<I, O>(executor);
+ inbound.initialize(outbound.getServiceClient());
+ outbound.initialize(inbound.getServiceServer());
+ return outbound.getUserContextSource();
}
public void close() throws RemotingException {
// todo ...
}
+ public void closeImmediate() throws RemotingException {
+ // todo ...
+ }
+
public void addCloseHandler(final CloseHandler<Endpoint> closeHandler) {
// todo ...
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,71 +1,92 @@
package org.jboss.cx.remoting.core;
-import java.util.concurrent.ConcurrentMap;
-import org.jboss.cx.remoting.RemoteExecutionException;
+import java.util.concurrent.Executor;
+import java.util.Set;
+import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.RequestListener;
-import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
-import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
+import org.jboss.cx.remoting.log.Logger;
+import org.jboss.cx.remoting.util.AtomicStateMachine;
+import static org.jboss.cx.remoting.util.CollectionUtil.synchronizedHashSet;
+import static org.jboss.cx.remoting.util.AtomicStateMachine.start;
+
/**
*
*/
public final class CoreInboundContext<I, O> {
+ private static final Logger log = Logger.getLogger(CoreInboundContext.class);
- private final ContextIdentifier contextIdentifier;
- private final CoreSession coreSession;
private final RequestListener<I, O> requestListener;
+ private final Executor executor;
+ private final Set<CoreInboundRequest<I, O>> requests = synchronizedHashSet();
+ private final AtomicStateMachine<State> state = start(State.NEW);
- private final ConcurrentMap<RequestIdentifier,CoreInboundRequest<I, O>> requests = CollectionUtil.concurrentMap();
+ private ContextClient contextClient;
- public CoreInboundContext(final ContextIdentifier contextIdentifier, final CoreSession coreSession, final RequestListener<I, O> requestListener) {
- this.contextIdentifier = contextIdentifier;
- this.coreSession = coreSession;
- this.requestListener = requestListener;
+ private enum State {
+ NEW,
+ UP,
+ STOPPING,
+ DOWN,
}
- // Outbound protocol messages
-
- void sendReply(final RequestIdentifier requestIdentifier, final O reply) throws RemotingException {
- coreSession.sendReply(contextIdentifier, requestIdentifier, reply);
+ public CoreInboundContext(final RequestListener<I, O> requestListener, final Executor executor) {
+ this.requestListener = requestListener;
+ this.executor = executor;
}
- void sendException(final RequestIdentifier requestIdentifier, final RemoteExecutionException cause) throws RemotingException {
- coreSession.sendException(contextIdentifier, requestIdentifier, cause);
+ public ContextServer<I, O> getContextServer() {
+ return new Server();
}
- void sendCancelAcknowledge(final RequestIdentifier requestIdentifier) throws RemotingException {
- coreSession.sendCancelAcknowledge(contextIdentifier, requestIdentifier);
+ public void initialize(final ContextClient contextClient) {
+ state.requireTransitionExclusive(State.NEW, State.UP);
+ this.contextClient = contextClient;
+ state.releaseExclusive();
}
- // Inbound protocol messages
-
- void receiveCancelRequest(final RequestIdentifier requestIdentifier, final boolean mayInterrupt) {
- final CoreInboundRequest<I, O> inboundRequest = getInboundRequest(requestIdentifier);
- if (inboundRequest != null) {
- inboundRequest.receiveCancelRequest(mayInterrupt);
+ public void remove(final CoreInboundRequest<I, O> request) {
+ final State current = state.getStateHold();
+ try {
+ requests.remove(request);
+ if (current != State.STOPPING) {
+ return;
+ }
+ } finally {
+ state.release();
}
+ if (requests.isEmpty()) {
+ state.transition(State.STOPPING, State.DOWN);
+ }
}
- void receiveRequest(final RequestIdentifier requestIdentifier, final I request) {
- final CoreInboundRequest<I, O> inboundRequest = createInboundRequest(requestIdentifier, request);
- inboundRequest.receiveRequest(request);
- }
+ public final class Server implements ContextServer<I, O> {
+ public RequestServer<I> createNewRequest(final RequestClient<O> requestClient) throws RemotingException {
+ if (state.inHold(State.UP)) try {
+ final CoreInboundRequest<I, O> inboundRequest = new CoreInboundRequest<I, O>(requestListener, executor);
+ inboundRequest.initialize(requestClient);
+ requests.add(inboundRequest);
+ return inboundRequest.getRequester();
+ } finally {
+ state.release();
+ } else {
+ throw new RemotingException("Context is not up");
+ }
+ }
- // Other protocol-related
-
- protected void shutdown() {
-
+ public void handleClose(final boolean immediate, final boolean cancel, final boolean interrupt) throws RemotingException {
+ if (state.transition(State.UP, State.STOPPING)) {
+ contextClient.handleClosing(false);
+ if (immediate || cancel) {
+ for (CoreInboundRequest<I, O> inboundRequest : requests) {
+ try {
+ inboundRequest.getRequester().handleCancelRequest(immediate || interrupt);
+ } catch (Exception e) {
+ log.trace("Failed to notify inbound request of cancellation upon context close: %s", e);
+ }
+ }
+ }
+ }
+ }
}
-
- // Request mgmt
-
- CoreInboundRequest<I, O> createInboundRequest(final RequestIdentifier requestIdentifier, final I request) {
- return new CoreInboundRequest<I, O>(requestIdentifier, this, requestListener, coreSession.getExecutor());
- }
-
- CoreInboundRequest<I, O> getInboundRequest(RequestIdentifier requestIdentifier) {
- return requests.get(requestIdentifier);
- }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -7,7 +7,6 @@
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.log.Logger;
-import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
import java.util.concurrent.Executor;
import java.util.Set;
import java.util.HashSet;
@@ -21,22 +20,33 @@
public final class CoreInboundRequest<I, O> {
private static final Logger log = Logger.getLogger(CoreInboundRequest.class);
- private final RequestIdentifier requestIdentifier;
- private final CoreInboundContext<I, O> context;
private final RequestListener<I,O> requestListener;
private final Executor executor;
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
private final UserRequestContext userRequestContext = new UserRequestContext();
+ private final RequestServer<I> request = new Request();
+ private RequestClient<O> requestClient;
+
+ /**
+ * @protectedby {@code this}
+ */
private boolean mayInterrupt;
+ /**
+ * @protectedby {@code this}
+ */
private boolean cancel;
+ /**
+ * @protectedby {@code this}
+ */
private Set<Thread> tasks;
+ /**
+ * @protectedby {@code this}
+ */
private List<RequestCancelHandler<O>> cancelHandlers;
- public CoreInboundRequest(final RequestIdentifier requestIdentifier, final CoreInboundContext<I, O> context, final RequestListener<I, O> requestListener, final Executor executor) {
- this.requestIdentifier = requestIdentifier;
- this.context = context;
+ public CoreInboundRequest(final RequestListener<I, O> requestListener, final Executor executor) {
this.requestListener = requestListener;
this.executor = executor;
}
@@ -45,80 +55,148 @@
INITIAL,
UNSENT,
SENT,
+ TERMINATED,
}
- void receiveRequest(final I request) {
- try {
- state.requireTransition(State.INITIAL, State.UNSENT);
- requestListener.handleRequest(userRequestContext, request);
- } catch (Throwable e) {
- if (state.transition(State.UNSENT, State.SENT)) {
+ public void initialize(final RequestClient<O> requestClient) {
+ state.requireTransitionExclusive(State.INITIAL, State.UNSENT);
+ this.requestClient = requestClient;
+ state.releaseExclusive();
+ }
+
+ public RequestServer<I> getRequester() {
+ return request;
+ }
+
+ /**
+ * Execute the given command. The command will be sensitive to interruption if the request is cancelled.
+ *
+ * @param command the command to execute
+ */
+ private void executeTagged(final Runnable command) {
+ executor.execute(new Runnable() {
+ public void run() {
+ final Thread thread = Thread.currentThread();
+ synchronized(CoreInboundRequest.this) {
+ if (tasks == null) {
+ tasks = new HashSet<Thread>();
+ }
+ tasks.add(thread);
+ }
try {
- if (e instanceof RemoteExecutionException) {
- sendException((RemoteExecutionException) e);
- } else {
- sendException(new RemoteExecutionException("Execution failed: " + e.toString(), e));
+ command.run();
+ } finally {
+ synchronized(CoreInboundRequest.this) {
+ tasks.remove(thread);
}
- } catch (RemotingException e1) {
- log.trace("Tried and failed to send an exception (%s) for a request (%s): %s", e, requestIdentifier, e1);
}
}
- log.trace(e, "Request listener %s recevied an exception for request (%s)", requestListener, requestIdentifier);
- }
+ });
}
- void sendException(final RemoteExecutionException rex) throws RemotingException {
- context.sendException(requestIdentifier, rex);
- }
-
- public void receiveCancelRequest(final boolean mayInterrupt) {
- synchronized(this) {
- if (! cancel) {
- cancel = true;
- this.mayInterrupt = mayInterrupt;
- if (mayInterrupt) {
- for (Thread t : tasks) {
- t.interrupt();
+ public final class Request implements RequestServer<I> {
+ public void handleCancelRequest(final boolean mayInterrupt) {
+ synchronized(CoreInboundRequest.this) {
+ if (! cancel) {
+ cancel = true;
+ CoreInboundRequest.this.mayInterrupt |= mayInterrupt;
+ if (mayInterrupt) {
+ if (tasks != null) {
+ for (Thread t : tasks) {
+ t.interrupt();
+ }
+ }
}
+ if (cancelHandlers != null) {
+ final Iterator<RequestCancelHandler<O>> i = cancelHandlers.iterator();
+ while (i.hasNext()) {
+ final RequestCancelHandler<O> handler = i.next();
+ i.remove();
+ executor.execute(new Runnable() {
+ public void run() {
+ handler.notifyCancel(userRequestContext, mayInterrupt);
+ }
+ });
+ }
+ }
}
- if (cancelHandlers != null) {
- final Iterator<RequestCancelHandler<O>> i = cancelHandlers.iterator();
- while (i.hasNext()) {
- final RequestCancelHandler<O> handler = i.next();
- i.remove();
- executor.execute(new Runnable() {
- public void run() {
- handler.notifyCancel(userRequestContext, mayInterrupt);
+ }
+ }
+
+ public void handleRequest(final I request, final Executor streamExecutor) {
+ state.requireTransition(State.INITIAL, State.UNSENT);
+ executeTagged(new Runnable() {
+ public void run() {
+ try {
+ requestListener.handleRequest(userRequestContext, request);
+ } catch (InterruptedException e) {
+ final boolean wasCancelled;
+ synchronized(CoreInboundRequest.this) {
+ wasCancelled = cancel;
+ }
+ if (state.transition(State.UNSENT, State.SENT)) {
+ if (wasCancelled) {
+ try {
+ requestClient.handleCancelAcknowledge();
+ } catch (RemotingException e1) {
+ try {
+ requestClient.handleException(new RemoteExecutionException("Failed to send a cancel ack to client: " + e1.toString(), e1));
+ } catch (RemotingException e2) {
+ log.debug("Tried and failed to send an exception (%s): %s", e1, e2);
+ }
+ }
+ } else {
+ try {
+ requestClient.handleException(new RemoteExecutionException("Execution failed: " + e.toString(), e));
+ } catch (RemotingException e1) {
+ log.debug("Tried and failed to send an exception (%s): %s", e, e1);
+ }
}
- });
+ log.trace(e, "Request listener %s recevied an exception", requestListener);
+ }
+ } catch (Throwable e) {
+ if (state.transition(State.UNSENT, State.SENT)) {
+ try {
+ if (e instanceof RemoteExecutionException) {
+ requestClient.handleException((RemoteExecutionException) e);
+ } else {
+ requestClient.handleException(new RemoteExecutionException("Execution failed: " + e.toString(), e));
+ }
+ } catch (RemotingException e1) {
+ log.debug("Tried and failed to send an exception (%s): %s", e, e1);
+ }
+ }
+ log.trace(e, "Request listener %s recevied an exception", requestListener);
}
}
- }
+ });
}
}
public final class UserRequestContext implements RequestContext<O> {
+ private UserRequestContext() {}
public boolean isCancelled() {
- // todo...
- return false;
+ synchronized(CoreInboundRequest.this) {
+ return cancel;
+ }
}
public void sendReply(final O reply) throws RemotingException, IllegalStateException {
state.requireTransition(State.UNSENT, State.SENT);
- context.sendReply(requestIdentifier, reply);
+ requestClient.handleReply(reply);
}
public void sendFailure(final String msg, final Throwable cause) throws RemotingException, IllegalStateException {
state.requireTransition(State.UNSENT, State.SENT);
final RemoteExecutionException rex = new RemoteExecutionException(msg, cause);
rex.setStackTrace(cause.getStackTrace());
- sendException(rex);
+ requestClient.handleException(rex);
}
public void sendCancelled() throws RemotingException, IllegalStateException {
state.requireTransition(State.UNSENT, State.SENT);
- context.sendCancelAcknowledge(requestIdentifier);
+ requestClient.handleCancelAcknowledge();
}
public void addCancelHandler(final RequestCancelHandler<O> requestCancelHandler) {
@@ -138,24 +216,7 @@
}
public void execute(final Runnable command) {
- executor.execute(new Runnable() {
- public void run() {
- final Thread thread = Thread.currentThread();
- synchronized(CoreInboundRequest.this) {
- if (tasks == null) {
- tasks = new HashSet<Thread>();
- }
- tasks.add(thread);
- }
- try {
- command.run();
- } finally {
- synchronized(CoreInboundRequest.this) {
- tasks.remove(thread);
- }
- }
- }
- });
+ executeTagged(command);
}
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,25 +1,48 @@
package org.jboss.cx.remoting.core;
+import static org.jboss.cx.remoting.util.AtomicStateMachine.start;
+import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.RequestListener;
-import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
-import org.jboss.cx.remoting.spi.protocol.ServiceIdentifier;
+import org.jboss.cx.remoting.util.AtomicStateMachine;
+import java.util.concurrent.Executor;
/**
*
*/
public final class CoreInboundService<I, O> {
- private final CoreSession coreSession;
- private final ServiceIdentifier serviceIdentifier;
private final RequestListener<I, O> requestListener;
+ private final Executor executor;
+ private ServiceClient serviceClient;
- public CoreInboundService(final CoreSession coreSession, final ServiceIdentifier serviceIdentifier, final RequestListener<I, O> requestListener) throws RemotingException {
- this.coreSession = coreSession;
- this.serviceIdentifier = serviceIdentifier;
+ private final AtomicStateMachine<State> state = start(State.INITIAL);
+
+ private enum State {
+ INITIAL,
+ UP,
+ }
+
+ public CoreInboundService(final RequestListener<I, O> requestListener, final Executor executor) {
this.requestListener = requestListener;
+ this.executor = executor;
}
- void receivedOpenedContext(final ContextIdentifier remoteContextIdentifier) {
- coreSession.createServerContext(serviceIdentifier, remoteContextIdentifier, requestListener);
+ public void initialize(final ServiceClient serviceClient) {
+ state.requireTransitionExclusive(State.INITIAL, State.UP);
+ this.serviceClient = serviceClient;
+ state.releaseExclusive();
}
+
+ public ServiceServer<I, O> getServiceServer() {
+ return new ServiceServer<I, O>() {
+ public void handleClose() throws RemotingException {
+ // todo - prevent new context creation?
+ }
+
+ public ContextServer<I, O> createNewContext(final ContextClient client) {
+ final CoreInboundContext<I, O> context = new CoreInboundContext<I, O>(requestListener, executor);
+ context.initialize(client);
+ return context.getContextServer();
+ }
+ };
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -2,17 +2,17 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import java.io.Serializable;
import org.jboss.cx.remoting.Context;
import org.jboss.cx.remoting.FutureReply;
import org.jboss.cx.remoting.RemoteExecutionException;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.RequestCompletionHandler;
import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.core.util.QueueExecutor;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.util.CollectionUtil;
import org.jboss.cx.remoting.log.Logger;
-import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
-import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
/**
*
@@ -20,174 +20,118 @@
public final class CoreOutboundContext<I, O> {
private static final Logger log = Logger.getLogger(CoreOutboundContext.class);
- private final CoreEndpoint endpoint;
- private final CoreSession session;
- private final ContextIdentifier contextIdentifier;
-
private final ConcurrentMap<Object, Object> contextMap = CollectionUtil.concurrentMap();
- private final ConcurrentMap<RequestIdentifier, CoreOutboundRequest<I, O>> requests = CollectionUtil.concurrentMap();
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.UP);
private final Context<I, O> userContext = new UserContext();
+ private final ContextClient contextClient = new ContextClientImpl();
+ private final Executor executor;
- public CoreOutboundContext(final CoreSession session, final ContextIdentifier contextIdentifier) {
- this.session = session;
- this.contextIdentifier = contextIdentifier;
- endpoint = session.getEndpoint();
+ private ContextServer<I, O> contextServer;
+
+ public CoreOutboundContext(final Executor executor) {
+ this.executor = executor;
}
+ public void initialize(final ContextServer<I, O> contextServer) {
+ state.requireTransitionExclusive(State.INITIAL, State.UP);
+ this.contextServer = contextServer;
+ state.releaseExclusive();
+ }
+
private enum State {
+ INITIAL,
UP,
STOPPING,
DOWN,
}
- // Request management
-
- void dropRequest(final RequestIdentifier requestIdentifier, final CoreOutboundRequest<I, O> coreOutboundRequest) {
- requests.remove(requestIdentifier, coreOutboundRequest);
- }
-
- // Outbound protocol messages
-
- boolean sendCancelRequest(final RequestIdentifier requestIdentifier, final boolean mayInterrupt) {
- if (state.inHold(State.UP)) try {
- return session.sendCancelRequest(contextIdentifier, requestIdentifier, mayInterrupt);
- } finally {
- state.release();
- } else {
- return false;
- }
- }
-
- void sendRequest(final RequestIdentifier requestIdentifier, final I request, final Executor streamExecutor) throws RemotingException {
- session.sendRequest(contextIdentifier, requestIdentifier, request, streamExecutor);
- }
-
- // Inbound protocol messages
-
- @SuppressWarnings ({"unchecked"})
- void receiveCloseContext() {
- final CoreOutboundRequest[] requestArray;
- if (! state.transition(State.UP, State.STOPPING)) {
- return;
- }
- requestArray = requests.values().toArray(empty);
- for (CoreOutboundRequest<I, O> request : requestArray) {
- request.receiveClose();
- }
- session.removeContext(contextIdentifier);
- }
-
- void receiveCancelAcknowledge(RequestIdentifier requestIdentifier) {
- final CoreOutboundRequest<I, O> request = requests.get(requestIdentifier);
- if (request != null) {
- request.receiveCancelAcknowledge();
- }
- }
-
- void receiveReply(RequestIdentifier requestIdentifier, O reply) {
- final CoreOutboundRequest<I, O> request = requests.get(requestIdentifier);
- if (request != null) {
- request.receiveReply(reply);
- }
- }
-
- void receiveException(RequestIdentifier requestIdentifier, RemoteExecutionException exception) {
- final CoreOutboundRequest<I, O> request = requests.get(requestIdentifier);
- if (request != null) {
- request.receiveException(exception);
- } else {
- log.trace("Received an exception for an unknown request (%s)", requestIdentifier);
- }
- }
-
- // Other protocol-related
-
- RequestIdentifier openRequest() throws RemotingException {
- return session.openRequest(contextIdentifier);
- }
-
// Getters
Context<I,O> getUserContext() {
return userContext;
}
+ ContextClient getContextClient() {
+ return contextClient;
+ }
+
// Other mgmt
protected void finalize() throws Throwable {
try {
super.finalize();
} finally {
- receiveCloseContext();
+ // todo close it
log.trace("Leaked a context instance: %s", this);
}
}
- public final class UserContext implements Context<I, O> {
+ @SuppressWarnings ({"SerializableInnerClassWithNonSerializableOuterClass"})
+ public final class UserContext implements Context<I, O>, Serializable {
+ private static final long serialVersionUID = 1L;
private UserContext() {
}
+ private Object writeReplace() {
+ return contextServer;
+ }
+
public void close() throws RemotingException {
- receiveCloseContext();
+ contextServer.handleClose(false, false, false);
}
- public void addCloseHandler(final CloseHandler<Context<I, O>> closeHandler) {
- // todo ...
+ public void closeCancelling(final boolean mayInterrupt) throws RemotingException {
+ contextServer.handleClose(false, true, mayInterrupt);
}
- private FutureReply<O> doSend(final I request, final QueueExecutor queueExecutor) throws RemotingException {
- final RequestIdentifier requestIdentifier;
- requestIdentifier = openRequest();
- final CoreOutboundRequest<I, O> outboundRequest = new CoreOutboundRequest<I, O>(CoreOutboundContext.this, requestIdentifier);
- requests.put(requestIdentifier, outboundRequest);
- // Request must be sent *after* the identifier is registered in the map
- sendRequest(requestIdentifier, request, queueExecutor);
- final FutureReply<O> futureReply = outboundRequest.getFutureReply();
- futureReply.addCompletionNotifier(new RequestCompletionHandler<O>() {
- public void notifyComplete(final FutureReply<O> futureReply) {
- queueExecutor.shutdown();
- }
- });
- return futureReply;
+ public void closeImmediate() throws RemotingException {
+ contextServer.handleClose(true, true, true);
}
- public O invokeInterruptibly(final I request) throws RemotingException, RemoteExecutionException, InterruptedException {
+ public void addCloseHandler(final CloseHandler<Context<I, O>> closeHandler) {
+ // todo ...
+ }
+
+ public O invoke(final I request) throws RemotingException, RemoteExecutionException {
state.requireHold(State.UP);
try {
final QueueExecutor queueExecutor = new QueueExecutor();
- final FutureReply<O> futureReply = doSend(request, queueExecutor);
- // todo - find a safe way to make this interruptible
+ final CoreOutboundRequest<I, O> outboundRequest = new CoreOutboundRequest<I, O>();
+ final RequestServer<I> requestTerminus = contextServer.createNewRequest(outboundRequest.getReplier());
+ outboundRequest.setRequester(requestTerminus);
+ requestTerminus.handleRequest(request, queueExecutor);
+ final FutureReply<O> futureReply = outboundRequest.getFutureReply();
+ futureReply.addCompletionNotifier(new RequestCompletionHandler<O>() {
+ public void notifyComplete(final FutureReply<O> futureReply) {
+ queueExecutor.shutdown();
+ }
+ });
queueExecutor.runQueue();
- return futureReply.getInterruptibly();
+ return futureReply.get();
} finally {
state.release();
}
}
- public O invoke(final I request) throws RemotingException, RemoteExecutionException {
+ public FutureReply<O> send(final I request) throws RemotingException {
state.requireHold(State.UP);
try {
- final QueueExecutor queueExecutor = new QueueExecutor();
- final FutureReply<O> futureReply = doSend(request, queueExecutor);
- queueExecutor.runQueue();
- return futureReply.get();
+ final CoreOutboundRequest<I, O> outboundRequest = new CoreOutboundRequest<I, O>();
+ final RequestServer<I> requestTerminus = contextServer.createNewRequest(outboundRequest.getReplier());
+ outboundRequest.setRequester(requestTerminus);
+ requestTerminus.handleRequest(request, executor);
+ return outboundRequest.getFutureReply();
} finally {
state.release();
}
}
- public FutureReply<O> send(final I request) throws RemotingException {
+ public void sendOneWay(final I request) throws RemotingException {
state.requireHold(State.UP);
try {
- final RequestIdentifier requestIdentifier;
- requestIdentifier = openRequest();
- final CoreOutboundRequest<I, O> outboundRequest = new CoreOutboundRequest<I, O>(CoreOutboundContext.this, requestIdentifier);
- requests.put(requestIdentifier, outboundRequest);
- // Request must be sent *after* the identifier is registered in the map
- sendRequest(requestIdentifier, request, endpoint.getExecutor());
- return outboundRequest.getFutureReply();
+ final RequestServer<I> requestServer = contextServer.createNewRequest(null);
+ requestServer.handleRequest(request, executor);
} finally {
state.release();
}
@@ -198,5 +142,9 @@
}
}
- private static final CoreOutboundRequest[] empty = new CoreOutboundRequest[0];
+ public final class ContextClientImpl implements ContextClient {
+ public void handleClosing(boolean done) throws RemotingException {
+ // todo - remote side is closing
+ }
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -10,9 +10,9 @@
import org.jboss.cx.remoting.IndeterminateOutcomeException;
import org.jboss.cx.remoting.RemoteExecutionException;
import org.jboss.cx.remoting.RequestCompletionHandler;
+import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.log.Logger;
-import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
/**
*
@@ -21,9 +21,9 @@
private static final Logger log = Logger.getLogger(CoreOutboundRequest.class);
- private final CoreOutboundContext<I, O> context;
- private final RequestIdentifier requestIdentifier;
+ private RequestServer<I> requestServer;
+ private final RequestClient<O> requestClient = new RequestClientImpl();
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.WAITING);
private final FutureReply<O> futureReply = new FutureReplyImpl();
@@ -34,15 +34,25 @@
/* Protected by {@code state} */
private List<RequestCompletionHandler<O>> handlers = Collections.synchronizedList(new LinkedList<RequestCompletionHandler<O>>());
- public CoreOutboundRequest(final CoreOutboundContext<I, O> context, final RequestIdentifier requestIdentifier) {
- this.context = context;
- this.requestIdentifier = requestIdentifier;
+ public CoreOutboundRequest() {
}
+ public RequestServer<I> getRequester() {
+ return requestServer;
+ }
+
+ public void setRequester(final RequestServer<I> requestServer) {
+ this.requestServer = requestServer;
+ }
+
public FutureReply<O> getFutureReply() {
return futureReply;
}
+ public RequestClient<O> getReplier() {
+ return requestClient;
+ }
+
private enum State {
WAITING,
DONE,
@@ -55,7 +65,6 @@
* Complete the request. Call only with the monitor held, not in WAITING state.
*/
private void complete() {
- drop();
final List<RequestCompletionHandler<O>> handlers = this.handlers;
if (handlers != null) {
this.handlers = null;
@@ -72,67 +81,42 @@
}
}
- private void drop() {
- context.dropRequest(requestIdentifier, this);
- }
-
- // Incoming protocol messages
-
- /**
- * Request was possibly abruptly terminated.
- */
- void receiveClose() {
- if (state.transitionHold(State.WAITING, State.TERMINATED)) try {
- drop();
- } finally {
- state.release();
+ public final class RequestClientImpl implements RequestClient<O> {
+ public void handleCancelAcknowledge() {
+ if (state.transitionHold(State.WAITING, State.CANCELLED)) try {
+ complete();
+ } finally {
+ state.release();
+ }
}
- }
- /**
- * Receive a cancel acknowledge for this request.
- */
- void receiveCancelAcknowledge() {
- state.requireTransitionHold(State.WAITING, State.CANCELLED);
- try {
- complete();
- } finally {
- state.release();
+ public void handleReply(final O reply) {
+ if (state.transitionExclusive(State.WAITING, State.DONE)) try {
+ CoreOutboundRequest.this.reply = reply;
+ } finally {
+ state.releaseDowngrade();
+ try {
+ complete();
+ } finally {
+ state.release();
+ }
+ }
}
- }
- /**
- * Receive a reply for this request.
- *
- * @param reply the reply
- */
- void receiveReply(final O reply) {
- state.requireTransitionExclusive(State.WAITING, State.DONE);
- this.reply = reply;
- state.releaseDowngrade();
- try {
- complete();
- } finally {
- state.release();
+ public void handleException(final RemoteExecutionException exception) {
+ if (state.transitionExclusive(State.WAITING, State.DONE)) try {
+ CoreOutboundRequest.this.exception = exception;
+ } finally {
+ state.releaseDowngrade();
+ try {
+ complete();
+ } finally {
+ state.release();
+ }
+ }
}
}
- /**
- * Receive an exception for this request.
- *
- * @param exception the exception
- */
- void receiveException(final RemoteExecutionException exception) {
- state.requireTransitionExclusive(State.WAITING, State.EXCEPTION);
- this.exception = exception;
- state.releaseDowngrade();
- try {
- complete();
- } finally {
- state.release();
- }
- }
-
public final class FutureReplyImpl implements FutureReply<O> {
private FutureReplyImpl() {
@@ -140,8 +124,9 @@
public boolean cancel(final boolean mayInterruptIfRunning) {
if (state.inHold(State.WAITING)) try {
- if (! context.sendCancelRequest(requestIdentifier, mayInterruptIfRunning)) {
- // the cancel request could not be sent at all
+ try {
+ requestServer.handleCancelRequest(mayInterruptIfRunning);
+ } catch (RemotingException e) {
return false;
}
} finally {
@@ -150,6 +135,19 @@
return state.waitForNot(State.WAITING) == State.CANCELLED;
}
+ public FutureReply<O> sendCancel(final boolean mayInterruptIfRunning) {
+ if (state.inHold(State.WAITING)) try {
+ try {
+ requestServer.handleCancelRequest(mayInterruptIfRunning);
+ } catch (RemotingException e) {
+ // do nothing
+ }
+ } finally {
+ state.release();
+ }
+ return this;
+ }
+
public boolean isCancelled() {
return state.in(State.CANCELLED);
}
@@ -220,7 +218,24 @@
}
public O getInterruptibly(final long timeout, final TimeUnit unit) throws InterruptedException, CancellationException, RemoteExecutionException {
- return null;
+ final State newState = state.waitInterruptiblyForNotHold(State.WAITING, timeout, unit);
+ try {
+ switch (newState) {
+ case CANCELLED:
+ throw new CancellationException("Request was cancelled");
+ case EXCEPTION:
+ throw exception;
+ case DONE:
+ return reply;
+ case WAITING:
+ return null;
+ case TERMINATED:
+ throw new IndeterminateOutcomeException("Request terminated abruptly; outcome unknown");
+ }
+ throw new IllegalStateException("Wrong state");
+ } finally {
+ state.release();
+ }
}
public FutureReply<O> addCompletionNotifier(RequestCompletionHandler<O> handler) {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -6,7 +6,8 @@
import org.jboss.cx.remoting.CloseHandler;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.log.Logger;
-import org.jboss.cx.remoting.spi.protocol.ServiceIdentifier;
+import java.util.concurrent.Executor;
+import java.io.Serializable;
/**
*
@@ -14,70 +15,56 @@
public final class CoreOutboundService<I, O> {
private static final Logger log = Logger.getLogger(CoreOutboundService.class);
- private final ServiceIdentifier serviceIdentifier;
- private CoreSession coreSession;
- private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.WAITING_FOR_REPLY);
+ private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
private final ContextSource<I, O> userContextSource = new UserContextSource();
+ private final ServiceClient serviceClient = new ServiceClientImpl();
+ private final Executor executor;
+ private ServiceServer<I,O> serviceServer;
+
+ public CoreOutboundService(final Executor executor) {
+ this.executor = executor;
+ }
+
private enum State {
- WAITING_FOR_REPLY,
+ INITIAL,
UP,
- FAILED,
+ CLOSING,
DOWN
}
- protected CoreOutboundService(final CoreSession coreSession, final ServiceIdentifier serviceIdentifier) {
- this.coreSession = coreSession;
- this.serviceIdentifier = serviceIdentifier;
+ // Getters
+
+ ContextSource<I, O> getUserContextSource() {
+ return userContextSource;
}
- // State mgmt
-
- void await() throws RemotingException {
- if (state.waitForNot(State.WAITING_FOR_REPLY) == State.FAILED) {
- throw new RemotingException("Failed to open service");
- }
+ public ServiceClient getServiceClient() {
+ return serviceClient;
}
- // Outbound protocol messages
-
- void sendServiceRequest() throws RemotingException {
+ public void initialize(final ServiceServer<I, O> serviceServer) {
+ state.requireTransitionExclusive(State.INITIAL, State.UP);
+ this.serviceServer = serviceServer;
+ state.releaseExclusive();
}
- // Inbound protocol messages
+ @SuppressWarnings ({"SerializableInnerClassWithNonSerializableOuterClass"})
+ public final class UserContextSource implements ContextSource<I, O>, Serializable {
+ private static final long serialVersionUID = 1L;
- void receiveServiceActivate() {
- if (! state.transition(State.WAITING_FOR_REPLY, State.UP)) {
- log.trace("Received unsolicited service activation for service (%s)", serviceIdentifier);
+ private Object writeReplace() {
+ return serviceServer;
}
- }
- void receiveServiceTerminate() {
- if (state.transition(State.UP, State.DOWN) || state.transition(State.WAITING_FOR_REPLY, State.FAILED)) {
- closeService();
- }
- }
+ public void close() {
+ // todo ...
- // Other protocol-related
-
- void closeService() {
- try {
- coreSession.closeService(serviceIdentifier);
- } catch (RemotingException e) {
- log.trace("Failed to close service (%s): %s", serviceIdentifier, e.getMessage());
+ // todo: is it better to close all child contexts, or let them continue on independently?
}
- }
- // Getters
-
- ContextSource<I, O> getUserContextSource() {
- return userContextSource;
- }
-
- public final class UserContextSource implements ContextSource<I, O> {
-
- public void close() {
- receiveServiceTerminate();
+ public void closeImmediate() throws RemotingException {
+ // todo ...
}
public void addCloseHandler(final CloseHandler<ContextSource<I, O>> closeHandler) {
@@ -85,16 +72,16 @@
}
public Context<I, O> createContext() throws RemotingException {
- // Don't need waitForNotHold here since the state can't change again
- final State currentState = state.waitForNot(State.WAITING_FOR_REPLY);
- switch (currentState) {
- case UP: break;
- case FAILED: throw new RemotingException("Context source open failed");
- default:
- throw new IllegalStateException("Context source is not open");
- }
- final CoreOutboundContext<I, O> context = coreSession.createContext(serviceIdentifier);
+ final CoreOutboundContext<I, O> context = new CoreOutboundContext<I, O>(executor);
+ final ContextServer<I, O> contextServer = serviceServer.createNewContext(context.getContextClient());
+ context.initialize(contextServer);
return context.getUserContext();
}
}
+
+ public final class ServiceClientImpl implements ServiceClient {
+ public void handleClosing() throws RemotingException {
+ // todo - remote side is closing
+ }
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -21,13 +21,14 @@
import org.jboss.cx.remoting.Context;
import org.jboss.cx.remoting.CloseHandler;
import org.jboss.cx.remoting.core.stream.DefaultStreamDetector;
+import org.jboss.cx.remoting.core.util.DelegatingObjectInput;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.cx.remoting.util.ByteInput;
-import org.jboss.cx.remoting.util.ByteOutput;
+import org.jboss.cx.remoting.spi.ByteMessageInput;
+import org.jboss.cx.remoting.spi.ByteMessageOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageInput;
import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.util.MessageInput;
-import org.jboss.cx.remoting.util.MessageOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageOutput;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
import org.jboss.cx.remoting.spi.protocol.ProtocolContext;
@@ -39,7 +40,6 @@
import org.jboss.cx.remoting.spi.stream.StreamDetector;
import org.jboss.cx.remoting.spi.stream.StreamSerializer;
import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
-import org.jboss.cx.remoting.spi.wrapper.ContextWrapper;
/**
@@ -58,25 +58,28 @@
// stream serialization detectors - immutable (for now?)
private final List<StreamDetector> streamDetectors;
- // clients - weak reference, to clean up if the user leaks
- private final ConcurrentMap<ContextIdentifier, WeakReference<CoreOutboundContext>> contexts = CollectionUtil.concurrentMap();
- private final ConcurrentMap<ServiceIdentifier, WeakReference<CoreOutboundService>> services = CollectionUtil.concurrentMap();
+ // Contexts and services that are available on the remote end of this session
+ // In these paris, the Server points to the ProtocolHandler, and the Client points to...whatever
+ private final ConcurrentMap<ContextIdentifier, WeakReference<ServerContextPair>> clientContexts = CollectionUtil.concurrentMap();
+ private final ConcurrentMap<ServiceIdentifier, WeakReference<ServerServicePair>> clientServices = CollectionUtil.concurrentMap();
- // servers - strong refereces, only clean up if we hear it from the other end
- private final ConcurrentMap<ContextIdentifier, CoreInboundContext> serverContexts = CollectionUtil.concurrentMap();
- private final ConcurrentMap<ServiceIdentifier, CoreInboundService> serverServices = CollectionUtil.concurrentMap();
+ // Contexts and services that are available on this end of this session
+ // In these pairs, the Client points to the ProtocolHandler, and the Server points to... whatever
+ private final ConcurrentMap<ContextIdentifier, ClientContextPair> serverContexts = CollectionUtil.concurrentMap();
+ private final ConcurrentMap<ServiceIdentifier, ClientServicePair> serverServices = CollectionUtil.concurrentMap();
// streams - strong references, only clean up if a close message is sent or received
private final ConcurrentMap<StreamIdentifier, CoreStream> streams = CollectionUtil.concurrentMap();
// don't GC the endpoint while a session lives
private final CoreEndpoint endpoint;
+ private final Executor executor;
/** The protocol handler. Set on NEW -> CONNECTING */
private ProtocolHandler protocolHandler;
/** The remote endpoint name. Set on CONNECTING -> UP */
private String remoteEndpointName;
- /** The root client context. Set on CONNECTING -> UP */
+ /** The root context. Set on CONNECTING -> UP */
private Context<?, ?> rootContext;
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.NEW);
@@ -88,150 +91,64 @@
throw new NullPointerException("endpoint is null");
}
this.endpoint = endpoint;
+ executor = endpoint.getExecutor();
// todo - make stream detectors pluggable
streamDetectors = java.util.Collections.<StreamDetector>singletonList(new DefaultStreamDetector());
}
+ UserSession getUserSession() {
+ return userSession;
+ }
+
// Initializers
- @SuppressWarnings ({"unchecked"})
- void initializeServer(final ProtocolHandler protocolHandler) {
+ private <I, O> void doInitialize(final ProtocolHandler protocolHandler, final Context<I, O> rootContext) {
+ this.protocolHandler = protocolHandler;
+ if (rootContext instanceof AbstractRealContext) {
+ final AbstractRealContext<I, O> abstractRealContext = (AbstractRealContext<I, O>) rootContext;
+ // Forward local context
+ final ContextIdentifier localIdentifier = protocolHandler.getLocalRootContextIdentifier();
+ final ProtocolContextClientImpl<I, O> contextClient = new ProtocolContextClientImpl<I, O>(localIdentifier);
+ serverContexts.put(localIdentifier, new ClientContextPair<I, O>(contextClient, abstractRealContext.getContextServer()));
+ }
+ // Forward remote context
+ final ContextIdentifier remoteIdentifier = protocolHandler.getRemoteRootContextIdentifier();
+ final ProtocolContextServerImpl<I, O> contextServer = new ProtocolContextServerImpl<I,O>(remoteIdentifier);
+ clientContexts.put(remoteIdentifier, new WeakReference<ServerContextPair>(new ServerContextPair<I, O>(new BaseContextClient(), contextServer)));
+ }
+
+ <I, O> void initializeServer(final ProtocolHandler protocolHandler, final Context<I, O> rootContext) {
if (protocolHandler == null) {
throw new NullPointerException("protocolHandler is null");
}
state.requireTransitionExclusive(State.NEW, State.CONNECTING);
try {
- this.protocolHandler = protocolHandler;
- final RequestListener<?, ?> listener = endpoint.getRootRequestListener();
- if (listener != null) {
- final ContextIdentifier contextIdentifier = protocolHandler.getRemoteRootContextIdentifier();
- serverContexts.put(contextIdentifier, new CoreInboundContext(contextIdentifier, this, listener));
- }
+ doInitialize(protocolHandler, rootContext);
} finally {
state.releaseExclusive();
}
}
- @SuppressWarnings ({"unchecked"})
- void initializeClient(final ProtocolHandlerFactory protocolHandlerFactory, final URI remoteUri, final AttributeMap attributeMap) throws IOException {
+ <I, O> void initializeClient(final ProtocolHandlerFactory protocolHandlerFactory, final URI remoteUri, final AttributeMap attributeMap, final Context<I, O> rootContext) throws IOException {
if (protocolHandlerFactory == null) {
throw new NullPointerException("protocolHandlerFactory is null");
}
state.requireTransitionExclusive(State.NEW, State.CONNECTING);
try {
- protocolHandler = protocolHandlerFactory.createHandler(protocolContext, remoteUri, attributeMap);
- final RequestListener<?, ?> listener = endpoint.getRootRequestListener();
- if (listener != null) {
- final ContextIdentifier contextIdentifier = protocolHandler.getRemoteRootContextIdentifier();
- serverContexts.put(contextIdentifier, new CoreInboundContext(contextIdentifier, this, listener));
- }
+ doInitialize(protocolHandlerFactory.createHandler(protocolContext, remoteUri, attributeMap), rootContext);
} finally {
state.releaseExclusive();
}
}
- // Outbound protocol messages
-
- void sendRequest(final ContextIdentifier contextIdentifier, final RequestIdentifier requestIdentifier, final Object request, final Executor streamExecutor) throws RemotingException {
- try {
- protocolHandler.sendRequest(contextIdentifier, requestIdentifier, request, streamExecutor);
- } catch (IOException e) {
- throw new RemotingException("Failed to send the request: " + e);
- }
- }
-
- boolean sendCancelRequest(final ContextIdentifier contextIdentifier, final RequestIdentifier requestIdentifier, final boolean mayInterrupt) {
- try {
- protocolHandler.sendCancelRequest(contextIdentifier, requestIdentifier, mayInterrupt);
- } catch (IOException e) {
- log.trace("Failed to send a cancel request: %s", e);
- return false;
- }
- return true;
- }
-
- void sendReply(final ContextIdentifier contextIdentifier, final RequestIdentifier requestIdentifier, final Object reply) throws RemotingException {
- try {
- protocolHandler.sendReply(contextIdentifier, requestIdentifier, reply);
- } catch (IOException e) {
- throw new RemotingException("Failed to send the reply: " + e);
- }
- }
-
- void sendException(final ContextIdentifier contextIdentifier, final RequestIdentifier requestIdentifier, final RemoteExecutionException exception) throws RemotingException {
- try {
- protocolHandler.sendException(contextIdentifier, requestIdentifier, exception);
- } catch (IOException e) {
- throw new RemotingException("Failed to send the exception: " + e);
- }
- }
-
- void sendCancelAcknowledge(final ContextIdentifier contextIdentifier, final RequestIdentifier requestIdentifier) throws RemotingException {
- try {
- protocolHandler.sendCancelAcknowledge(contextIdentifier, requestIdentifier);
- } catch (IOException e) {
- throw new RemotingException("Failed to send cancel acknowledgement: " + e);
- }
- }
-
- // Inbound protocol messages are in the ProtocolContextImpl
-
- // Other protocol-related
-
- RequestIdentifier openRequest(final ContextIdentifier contextIdentifier) throws RemotingException {
- try {
- return protocolHandler.openRequest(contextIdentifier);
- } catch (IOException e) {
- throw new RemotingException("Failed to open a request: " + e);
- }
- }
-
- void closeService(final ServiceIdentifier serviceIdentifier) throws RemotingException {
- try {
- protocolHandler.closeService(serviceIdentifier);
- } catch (IOException e) {
- throw new RemotingException("Failed to close service: " + e);
- }
- }
-
- // Getters
-
- ProtocolContext getProtocolContext() {
+ public ProtocolContext getProtocolContext() {
return protocolContext;
}
- Session getUserSession() {
- return userSession;
- }
-
- CoreEndpoint getEndpoint() {
- return endpoint;
- }
-
- ProtocolHandler getProtocolHandler() {
+ public ProtocolHandler getProtocolHandler() {
return protocolHandler;
}
- Executor getExecutor() {
- return endpoint.getExecutor();
- }
-
- // Thread-local instance
-
- private static final ThreadLocal<CoreSession> instance = new ThreadLocal<CoreSession>();
-
- static CoreSession getInstance() {
- return instance.get();
- }
-
- private void setInstance() {
- instance.set(this);
- }
-
- private void clearInstance() {
- instance.remove();
- }
-
// State mgmt
private enum State {
@@ -242,124 +159,8 @@
DOWN,
}
- void shutdown() {
- if (state.transition(State.UP, State.STOPPING)) {
- for (Map.Entry<ContextIdentifier,WeakReference<CoreOutboundContext>> entry : contexts.entrySet()) {
- final CoreOutboundContext context = entry.getValue().get();
- if (context != null) {
- context.receiveCloseContext();
- }
- }
- for (Map.Entry<ContextIdentifier,CoreInboundContext> entry : serverContexts.entrySet()) {
- entry.getValue().shutdown();
- }
- state.requireTransition(State.STOPPING, State.DOWN);
- }
- }
-
// Context mgmt
- <I, O> CoreOutboundContext<I, O> createContext(final ServiceIdentifier serviceIdentifier) throws RemotingException {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- state.requireHold(State.UP);
- try {
- final ContextIdentifier contextIdentifier;
- try {
- contextIdentifier = protocolHandler.openContext(serviceIdentifier);
- } catch (IOException e) {
- RemotingException rex = new RemotingException("Failed to open context: " + e.getMessage());
- rex.setStackTrace(e.getStackTrace());
- throw rex;
- }
- final CoreOutboundContext<I, O> context = new CoreOutboundContext<I, O>(this, contextIdentifier);
- log.trace("Adding new context, ID = %s", contextIdentifier);
- contexts.put(contextIdentifier, new WeakReference<CoreOutboundContext>(context));
- return context;
- } finally {
- state.release();
- }
- }
-
- <I, O> CoreInboundContext<I, O> createServerContext(final ServiceIdentifier remoteServiceIdentifier, final ContextIdentifier remoteContextIdentifier, final RequestListener<I, O> requestListener) {
- if (remoteServiceIdentifier == null) {
- throw new NullPointerException("remoteServiceIdentifier is null");
- }
- if (remoteContextIdentifier == null) {
- throw new NullPointerException("remoteContextIdentifier is null");
- }
- state.requireHold(State.UP);
- try {
- final CoreInboundContext<I, O> context = new CoreInboundContext<I, O>(remoteContextIdentifier, this, requestListener);
- log.trace("Adding new server (inbound) context, ID = %s", remoteContextIdentifier);
- serverContexts.put(remoteContextIdentifier, context);
- return context;
- } finally {
- state.release();
- }
- }
-
- CoreOutboundContext getContext(final ContextIdentifier contextIdentifier) {
- if (contextIdentifier == null) {
- throw new NullPointerException("contextIdentifier is null");
- }
- final WeakReference<CoreOutboundContext> weakReference = contexts.get(contextIdentifier);
- return weakReference == null ? null : weakReference.get();
- }
-
- CoreInboundContext getServerContext(final ContextIdentifier remoteContextIdentifier) {
- if (remoteContextIdentifier == null) {
- throw new NullPointerException("remoteContextIdentifier is null");
- }
- final CoreInboundContext context = serverContexts.get(remoteContextIdentifier);
- return context;
- }
-
- void removeContext(final ContextIdentifier identifier) {
- if (identifier == null) {
- throw new NullPointerException("identifier is null");
- }
- contexts.remove(identifier);
- }
-
- void removeServerContext(final ContextIdentifier identifier) {
- if (identifier == null) {
- throw new NullPointerException("identifier is null");
- }
- serverContexts.remove(identifier);
- }
-
- // Service mgmt
-
- CoreOutboundService getService(final ServiceIdentifier serviceIdentifier) {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- final WeakReference<CoreOutboundService> weakReference = services.get(serviceIdentifier);
- return weakReference == null ? null : weakReference.get();
- }
-
- CoreInboundService getServerService(final ServiceIdentifier serviceIdentifier) {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- return serverServices.get(serviceIdentifier);
- }
-
- void removeServerService(final ServiceIdentifier serviceIdentifier) {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- serverServices.remove(serviceIdentifier);
- }
-
- // Stream mgmt
-
- void removeStream(final StreamIdentifier streamIdentifier) {
- streams.remove(streamIdentifier);
- }
-
// User session impl
public final class UserSession implements Session {
@@ -368,14 +169,20 @@
private final ConcurrentMap<Object, Object> sessionMap = CollectionUtil.concurrentMap();
public void close() throws RemotingException {
- shutdown();
+ // todo -
try {
protocolHandler.closeSession();
} catch (IOException e) {
throw new RemotingException("Unable to close session: " + e.toString());
}
+ // todo - should this be non-blocking?
+ state.waitFor(State.DOWN);
}
+ public void closeImmediate() throws RemotingException {
+ // todo ...
+ }
+
public void addCloseHandler(final CloseHandler<Session> closeHandler) {
// todo ...
}
@@ -400,93 +207,134 @@
// Protocol context
+ @SuppressWarnings ({"unchecked"})
+ private static <O> void doSendReply(RequestClient<O> requestClient, Object data) throws RemotingException {
+ requestClient.handleReply((O)data);
+ }
+
public final class ProtocolContextImpl implements ProtocolContext {
public void closeSession() {
- shutdown();
+ // todo ...
}
- public MessageOutput getMessageOutput(ByteOutput target) throws IOException {
- return new MessageOutputImpl(target, streamDetectors, endpoint.getOrderedExecutor());
+ public ObjectMessageOutput getMessageOutput(ByteMessageOutput target) throws IOException {
+ return new ObjectMessageOutputImpl(target, streamDetectors, endpoint.getOrderedExecutor());
}
- public MessageOutput getMessageOutput(ByteOutput target, Executor streamExecutor) throws IOException {
- return new MessageOutputImpl(target, streamDetectors, streamExecutor);
+ public ObjectMessageOutput getMessageOutput(ByteMessageOutput target, Executor streamExecutor) throws IOException {
+ return new ObjectMessageOutputImpl(target, streamDetectors, streamExecutor);
}
- public MessageInput getMessageInput(ByteInput source) throws IOException {
- return new MessageInputImpl(source);
+ public ObjectMessageInput getMessageInput(ByteMessageInput source) throws IOException {
+ return new ObjectMessageInputImpl(source);
}
public String getLocalEndpointName() {
return endpoint.getUserEndpoint().getName();
}
- public void closeContext(ContextIdentifier remoteContextIdentifier) {
- final CoreInboundContext context = getServerContext(remoteContextIdentifier);
- if (context != null) {
- context.shutdown();
+ public void receiveContextClose(ContextIdentifier remoteContextIdentifier, final boolean immediate, final boolean cancel, final boolean interrupt) {
+ final ClientContextPair contextPair = serverContexts.remove(remoteContextIdentifier);
+ // todo - do the whole close operation
+ try {
+ contextPair.contextServer.handleClose(immediate, cancel, interrupt);
+ } catch (RemotingException e) {
+ log.trace(e, "Failed to forward a context close");
}
}
public void closeStream(StreamIdentifier streamIdentifier) {
- streams.remove(streamIdentifier);
+ final CoreStream coreStream = streams.remove(streamIdentifier);
+ // todo - shut down stream
}
- public void closeService(ServiceIdentifier serviceIdentifier) {
- // todo
+ public void receiveServiceClose(ServiceIdentifier serviceIdentifier) {
+ final ClientServicePair servicePair = serverServices.remove(serviceIdentifier);
+ try {
+ servicePair.serviceServer.handleClose();
+ } catch (RemotingException e) {
+ log.trace(e, "Failed to forward a service close");
+ }
}
+ @SuppressWarnings ({"unchecked"})
public void receiveOpenedContext(ServiceIdentifier remoteServiceIdentifier, ContextIdentifier remoteContextIdentifier) {
- final CoreInboundService service = getServerService(remoteServiceIdentifier);
- if (service != null) {
- service.receivedOpenedContext(remoteContextIdentifier);
+ try {
+ final ClientServicePair servicePair = serverServices.get(remoteServiceIdentifier);
+ final ProtocolContextClientImpl contextClient = new ProtocolContextClientImpl(remoteContextIdentifier);
+ final ContextServer contextServer = servicePair.serviceServer.createNewContext(contextClient);
+ // todo - who puts it in the map?
+ serverContexts.put(remoteContextIdentifier, new ClientContextPair(contextClient, contextServer));
+ } catch (RemotingException e) {
+ log.trace(e, "Failed to add a context to a service");
}
}
- public void receiveServiceTerminate(ServiceIdentifier serviceIdentifier) {
- final CoreOutboundService service = getService(serviceIdentifier);
- if (service != null) {
- service.receiveServiceTerminate();
- } else {
- log.trace("Got service terminate for an unknown service (%s)", serviceIdentifier);
+ public void receiveServiceClosing(ServiceIdentifier serviceIdentifier) {
+ final WeakReference<ServerServicePair> ref = clientServices.get(serviceIdentifier);
+ final ServerServicePair servicePair = ref.get();
+ try {
+ servicePair.serviceClient.handleClosing();
+ } catch (RemotingException e) {
+ log.trace(e, "Failed to signal that a service was closing on the remote side");
}
}
- @SuppressWarnings ({"unchecked"})
+ public void receiveContextClosing(ContextIdentifier contextIdentifier, boolean done) {
+ final WeakReference<ServerContextPair> ref = clientContexts.get(contextIdentifier);
+ final ServerContextPair contextPair = ref.get();
+ try {
+ contextPair.contextClient.handleClosing(done);
+ } catch (RemotingException e) {
+ log.trace(e, "Failed to signal that a context was closing on the remote side");
+ }
+ }
+
public void receiveReply(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier, Object reply) {
- final CoreOutboundContext context = getContext(contextIdentifier);
- if (context != null) {
- context.receiveReply(requestIdentifier, reply);
- } else {
- log.trace("Got a reply for an unknown context (%s)", contextIdentifier);
+ final WeakReference<ServerContextPair> ref = clientContexts.get(contextIdentifier);
+ final ServerContextPair contextPair = ref.get();
+ final RequestClient<?> requestClient = (RequestClient<?>) contextPair.contextServer.requests.get(requestIdentifier);
+ try {
+ doSendReply(requestClient, reply);
+ } catch (RemotingException e) {
+ log.trace(e, "Failed to receive a reply");
}
}
public void receiveException(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier, RemoteExecutionException exception) {
- final CoreOutboundContext context = getContext(contextIdentifier);
- if (context != null) {
- context.receiveException(requestIdentifier, exception);
- } else {
- log.trace("Got a request exception for an unknown context (%s)", contextIdentifier);
+ final WeakReference<ServerContextPair> ref = clientContexts.get(contextIdentifier);
+ final ServerContextPair contextPair = ref.get();
+ final RequestClient<?> requestClient = (RequestClient<?>) contextPair.contextServer.requests.get(requestIdentifier);
+ try {
+ requestClient.handleException(exception);
+ } catch (RemotingException e) {
+ log.trace(e, "Failed to receive an exception reply");
}
}
public void receiveCancelAcknowledge(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier) {
- final CoreOutboundContext context = getContext(contextIdentifier);
- if (context != null) {
- context.receiveCancelAcknowledge(requestIdentifier);
- } else {
- log.trace("Got a cancel acknowledge for an unknown context (%s)", contextIdentifier);
+ final WeakReference<ServerContextPair> ref = clientContexts.get(contextIdentifier);
+ final ServerContextPair contextPair = ref.get();
+ final RequestClient<?> requestClient = (RequestClient<?>) contextPair.contextServer.requests.get(requestIdentifier);
+ try {
+ requestClient.handleCancelAcknowledge();
+ } catch (RemotingException e) {
+ log.trace(e, "Failed to receive a cancellation acknowledgement");
}
}
public void receiveCancelRequest(ContextIdentifier remoteContextIdentifier, RequestIdentifier requestIdentifier, boolean mayInterrupt) {
- final CoreInboundContext context = getServerContext(remoteContextIdentifier);
- context.receiveCancelRequest(requestIdentifier, mayInterrupt);
+ final ClientContextPair contextPair = serverContexts.get(remoteContextIdentifier);
+ final RequestServer<?> requestServer = (RequestServer<?>) contextPair.contextClient.requests.get(requestIdentifier);
+ try {
+ requestServer.handleCancelRequest(mayInterrupt);
+ } catch (RemotingException e) {
+ log.trace(e, "Failed to receive a cancellation request");
+ }
}
- public void receiveStreamData(StreamIdentifier streamIdentifier, MessageInput data) {
+ public void receiveStreamData(StreamIdentifier streamIdentifier, ObjectMessageInput data) {
final CoreStream coreStream = streams.get(streamIdentifier);
coreStream.receiveStreamData(data);
}
@@ -497,49 +345,41 @@
try {
state.requireTransition(State.CONNECTING, State.UP);
CoreSession.this.remoteEndpointName = remoteEndpointName;
- final ContextIdentifier rootContextIdentifier = protocolHandler.getLocalRootContextIdentifier();
- final CoreOutboundContext outboundContext = new CoreOutboundContext(CoreSession.this, rootContextIdentifier);
- rootContext = new ContextWrapper(outboundContext.getUserContext()) {
- public void close() throws RemotingException {
- throw new RemotingException("close() not allowed on root context");
- }
- };
- contexts.put(rootContextIdentifier, new WeakReference<CoreOutboundContext>(outboundContext));
} finally {
state.releaseExclusive();
}
}
+ @SuppressWarnings ({"unchecked"})
public void receiveRequest(final ContextIdentifier remoteContextIdentifier, final RequestIdentifier requestIdentifier, final Object request) {
- final CoreInboundContext context = getServerContext(remoteContextIdentifier);
- if (context != null) {
- endpoint.getExecutor().execute(new Runnable() {
- @SuppressWarnings ({"unchecked"})
- public void run() {
- context.receiveRequest(requestIdentifier, request);
+ final ClientContextPair contextPair = serverContexts.get(remoteContextIdentifier);
+ final RequestServer requestServer = (RequestServer) contextPair.contextClient.requests.get(requestIdentifier);
+ try {
+ if (requestServer != null) {
+ requestServer.handleRequest(request, executor);
+ } else {
+ log.trace("Got a request on an unknown context identifier (%s)", remoteContextIdentifier);
+ try {
+ protocolHandler.sendException(remoteContextIdentifier, requestIdentifier, new RemoteExecutionException("Received a request on an invalid context"));
+ } catch (IOException e) {
+ log.trace("Failed to send exception: %s", e.getMessage());
}
- });
- } else {
- log.trace("Got a request on an unknown context identifier (%s)", remoteContextIdentifier);
- try {
- protocolHandler.sendException(remoteContextIdentifier, requestIdentifier, new RemoteExecutionException("Received a request on an invalid context"));
- } catch (IOException e) {
- log.trace("Failed to send exception: %s", e.getMessage());
}
+ } catch (RemotingException e) {
+ e.printStackTrace();
}
}
-
}
// message output
- private final class MessageOutputImpl extends ObjectOutputStream implements MessageOutput {
- private final ByteOutput target;
+ private final class ObjectMessageOutputImpl extends ObjectOutputStream implements ObjectMessageOutput {
+ private final ByteMessageOutput target;
private final List<StreamDetector> streamDetectors;
private final List<StreamSerializer> streamSerializers = new ArrayList<StreamSerializer>();
private final Executor streamExecutor;
- private MessageOutputImpl(final ByteOutput target, final List<StreamDetector> streamDetectors, final Executor streamExecutor) throws IOException {
+ private ObjectMessageOutputImpl(final ByteMessageOutput target, final List<StreamDetector> streamDetectors, final Executor streamExecutor) throws IOException {
super(new OutputStream() {
public void write(int b) throws IOException {
target.write(b);
@@ -595,9 +435,7 @@
}
protected void writeObjectOverride(Object obj) throws IOException {
- setInstance();
super.writeObjectOverride(obj);
- clearInstance();
}
protected Object replaceObject(Object obj) throws IOException {
@@ -611,7 +449,7 @@
throw new IOException("Duplicate stream identifier encountered: " + streamIdentifier);
}
streamSerializers.add(stream.getStreamSerializer());
- return new StreamMarker(CoreSession.this, factory.getClass(), streamIdentifier);
+ return new StreamMarker(factory.getClass(), streamIdentifier);
}
}
return testObject;
@@ -682,15 +520,15 @@
}
}
- private final class MessageInputImpl extends DelegatingObjectInput implements MessageInput {
+ private final class ObjectMessageInputImpl extends DelegatingObjectInput implements ObjectMessageInput {
private CoreSession.ObjectInputImpl objectInput;
- private MessageInputImpl(final ObjectInputImpl objectInput) throws IOException {
+ private ObjectMessageInputImpl(final ObjectInputImpl objectInput) throws IOException {
super(objectInput);
this.objectInput = objectInput;
}
- private MessageInputImpl(final ByteInput source) throws IOException {
+ private ObjectMessageInputImpl(final ByteMessageInput source) throws IOException {
this(new ObjectInputImpl(new InputStream() {
public int read(byte b[]) throws IOException {
return source.read(b);
@@ -715,21 +553,11 @@
}
public Object readObject() throws ClassNotFoundException, IOException {
- setInstance();
- try {
- return objectInput.readObject();
- } finally {
- clearInstance();
- }
+ return objectInput.readObject();
}
public Object readObject(ClassLoader loader) throws ClassNotFoundException, IOException {
- setInstance();
- try {
- return objectInput.readObject(loader);
- } finally {
- clearInstance();
- }
+ return objectInput.readObject(loader);
}
public int remaining() {
@@ -741,6 +569,224 @@
}
}
+ private static final class ServerContextPair<I, O> {
+ private final ContextClient contextClient;
+ private final ProtocolContextServerImpl<I, O> contextServer;
+
+ private ServerContextPair(final ContextClient contextClient, final ProtocolContextServerImpl<I, O> contextServer) {
+ this.contextClient = contextClient;
+ this.contextServer = contextServer;
+ }
+ }
+
+ private static final class ClientContextPair<I, O> {
+ private final ProtocolContextClientImpl<I, O> contextClient;
+ private final ContextServer<I, O> contextServer;
+
+ private ClientContextPair(final ProtocolContextClientImpl<I, O> contextClient, final ContextServer<I, O> contextServer) {
+ this.contextClient = contextClient;
+ this.contextServer = contextServer;
+ }
+ }
+
+ private static final class ServerServicePair<I, O> {
+ private final ServiceClient serviceClient;
+ private final ProtocolServiceServerImpl<I, O> serviceServer;
+
+ private ServerServicePair(final ServiceClient serviceClient, final ProtocolServiceServerImpl<I, O> serviceServer) {
+ this.serviceClient = serviceClient;
+ this.serviceServer = serviceServer;
+ }
+ }
+
+ private static final class ClientServicePair<I, O> {
+ private final ProtocolServiceClientImpl serviceClient;
+ private final ServiceServer<I, O> serviceServer;
+
+ private ClientServicePair(final ProtocolServiceClientImpl serviceClient, final ServiceServer<I, O> serviceServer) {
+ this.serviceClient = serviceClient;
+ this.serviceServer = serviceServer;
+ }
+ }
+
+ private final class ProtocolServiceClientImpl implements ServiceClient {
+ private final ServiceIdentifier serviceIdentifier;
+
+ public ProtocolServiceClientImpl(final ServiceIdentifier serviceIdentifier) {
+ this.serviceIdentifier = serviceIdentifier;
+ }
+
+ public void handleClosing() throws RemotingException {
+ try {
+ protocolHandler.sendServiceClosing(serviceIdentifier);
+ } catch (RemotingException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RemotingException("Failed to send service closing message: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private final class ProtocolServiceServerImpl<I, O> implements ServiceServer<I, O> {
+ private final ServiceIdentifier serviceIdentifier;
+
+ public ProtocolServiceServerImpl(final ServiceIdentifier serviceIdentifier) {
+ this.serviceIdentifier = serviceIdentifier;
+ }
+
+ public void handleClose() throws RemotingException {
+ try {
+ protocolHandler.sendServiceClose(serviceIdentifier);
+ } catch (RemotingException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RemotingException("Failed to send service close message: " + e.getMessage(), e);
+ }
+ }
+
+ public ContextServer<I, O> createNewContext(final ContextClient client) throws RemotingException {
+ try {
+ final ContextIdentifier contextIdentifier = protocolHandler.openContext(serviceIdentifier);
+ clientContexts.put(contextIdentifier, new WeakReference<ServerContextPair>(new ServerContextPair<I, O>(client, new ProtocolContextServerImpl<I, O>(contextIdentifier))));
+ return new ProtocolContextServerImpl<I, O>(contextIdentifier);
+ } catch (RemotingException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RemotingException("Failed to open a context: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private final class ProtocolContextClientImpl<I, O> implements ContextClient {
+ private final ContextIdentifier contextIdentifier;
+ private final ConcurrentMap<RequestIdentifier, RequestServer<I>> requests = CollectionUtil.concurrentMap();
+
+ public ProtocolContextClientImpl(final ContextIdentifier contextIdentifier) {
+ this.contextIdentifier = contextIdentifier;
+ }
+
+ public void handleClosing(boolean done) throws RemotingException {
+ try {
+ if (state.inHold(State.UP)) {
+ protocolHandler.sendContextClosing(contextIdentifier, done);
+ }
+ } catch (RemotingException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RemotingException("Failed to send context closing message: " + e.getMessage(), e);
+ }
+ }
+
+ private RequestClient<O> addClient(RequestIdentifier identifier) {
+ return new ProtocolRequestClientImpl<O>(contextIdentifier, identifier);
+ }
+
+ private final class ProtocolRequestClientImpl<O> implements RequestClient<O> {
+ private final ContextIdentifier contextIdentifer;
+ private final RequestIdentifier requestIdentifer;
+
+ public ProtocolRequestClientImpl(final ContextIdentifier contextIdentifer, final RequestIdentifier requestIdentifer) {
+ this.contextIdentifer = contextIdentifer;
+ this.requestIdentifer = requestIdentifer;
+ }
+
+ public void handleReply(final O reply) throws RemotingException {
+ try {
+ protocolHandler.sendReply(contextIdentifer, requestIdentifer, reply);
+ } catch (RemotingException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RemotingException("Failed to send a reply: " + e.getMessage(), e);
+ } finally {
+ requests.remove(requestIdentifer);
+ }
+ }
+
+ public void handleException(final RemoteExecutionException cause) throws RemotingException {
+ try {
+ protocolHandler.sendException(contextIdentifer, requestIdentifer, cause);
+ } catch (RemotingException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RemotingException("Failed to send an exception: " + e.getMessage(), e);
+ } finally {
+ requests.remove(requestIdentifer);
+ }
+ }
+
+ public void handleCancelAcknowledge() throws RemotingException {
+ try {
+ protocolHandler.sendCancelAcknowledge(contextIdentifer, requestIdentifer);
+ } catch (RemotingException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RemotingException("Failed to send a cancel acknowledgement: " + e.getMessage(), e);
+ } finally {
+ requests.remove(requestIdentifer);
+ }
+ }
+ }
+ }
+
+ private final class ProtocolContextServerImpl<I, O> implements ContextServer<I, O> {
+ private final ContextIdentifier contextIdentifier;
+ private final ConcurrentMap<RequestIdentifier, RequestClient<O>> requests = CollectionUtil.concurrentMap();
+
+ public ProtocolContextServerImpl(final ContextIdentifier contextIdentifier) {
+ this.contextIdentifier = contextIdentifier;
+ }
+
+ public RequestServer<I> createNewRequest(final RequestClient<O> requestClient) throws RemotingException {
+ try {
+ final RequestIdentifier requestIdentifier = protocolHandler.openRequest(contextIdentifier);
+ requests.put(requestIdentifier, requestClient);
+ return new ProtocolRequestServerImpl(requestIdentifier);
+ } catch (RemotingException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RemotingException("Failed to open a request: " + e.getMessage(), e);
+ }
+ }
+
+ public void handleClose(final boolean immediate, final boolean cancel, final boolean interrupt) throws RemotingException {
+ try {
+ protocolHandler.sendContextClose(contextIdentifier, immediate, cancel, interrupt);
+ } catch (RemotingException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RemotingException("Failed to send context close message: " + e.getMessage(), e);
+ }
+ }
+
+ private final class ProtocolRequestServerImpl implements RequestServer<I> {
+ private final RequestIdentifier requestIdentifier;
+
+ public ProtocolRequestServerImpl(final RequestIdentifier requestIdentifier) {
+ this.requestIdentifier = requestIdentifier;
+ }
+
+ public void handleRequest(final I request, final Executor streamExecutor) throws RemotingException {
+ try {
+ protocolHandler.sendRequest(contextIdentifier, requestIdentifier, request, streamExecutor);
+ } catch (RemotingException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RemotingException("Failed to send a request: " + e.getMessage(), e);
+ }
+ }
+
+ public void handleCancelRequest(final boolean mayInterrupt) throws RemotingException {
+ try {
+ protocolHandler.sendCancelRequest(contextIdentifier, requestIdentifier, mayInterrupt);
+ } catch (RemotingException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RemotingException("Failed to send a cancel request: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+
private static final Map<String, Class<?>> primitiveTypes = new HashMap<String, Class<?>>();
private static <T> void add(Class<T> type) {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreStream.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreStream.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreStream.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -2,8 +2,8 @@
import java.io.IOException;
import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.util.MessageInput;
-import org.jboss.cx.remoting.util.MessageOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageInput;
+import org.jboss.cx.remoting.spi.ObjectMessageOutput;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.spi.protocol.ProtocolHandler;
import org.jboss.cx.remoting.spi.protocol.StreamIdentifier;
@@ -62,7 +62,7 @@
streamSerializer = streamSerializerFactory.getRemoteSide(streamContext);
}
- public void receiveStreamData(final MessageInput data) {
+ public void receiveStreamData(final ObjectMessageInput data) {
executor.execute(new Runnable() {
public void run() {
try {
@@ -89,7 +89,7 @@
private StreamContextImpl() {
}
- public MessageOutput writeMessage() throws IOException {
+ public ObjectMessageOutput writeMessage() throws IOException {
return protocolHandler.sendStreamData(streamIdentifier, executor);
}
@@ -97,7 +97,8 @@
try {
protocolHandler.closeStream(streamIdentifier);
} finally {
- coreSession.removeStream(streamIdentifier);
+ // todo clean up stream
+// coreSession.removeStream(streamIdentifier);
}
}
}
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/DelegatingObjectInput.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/DelegatingObjectInput.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/DelegatingObjectInput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,103 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-
-/**
- *
- */
-public class DelegatingObjectInput implements ObjectInput {
- private final ObjectInput delegate;
-
- public DelegatingObjectInput(final ObjectInput delegate) {
- this.delegate = delegate;
- }
-
- public int read() throws IOException {
- return delegate.read();
- }
-
- public int read(final byte[] data) throws IOException {
- return delegate.read(data);
- }
-
- public int read(final byte[] data, final int offs, final int len) throws IOException {
- return delegate.read(data, offs, len);
- }
-
- public void close() throws IOException {
- delegate.close();
- }
-
- public Object readObject() throws ClassNotFoundException, IOException {
- return delegate.readObject();
- }
-
- public long skip(final long n) throws IOException {
- return delegate.skip(n);
- }
-
- public int available() throws IOException {
- return delegate.available();
- }
-
- public void readFully(final byte[] b) throws IOException {
- delegate.readFully(b);
- }
-
- public void readFully(final byte[] b, final int off, final int len) throws IOException {
- delegate.readFully(b, off, len);
- }
-
- public int skipBytes(final int n) throws IOException {
- return delegate.skipBytes(n);
- }
-
- public boolean readBoolean() throws IOException {
- return delegate.readBoolean();
- }
-
- public byte readByte() throws IOException {
- return delegate.readByte();
- }
-
- public int readUnsignedByte() throws IOException {
- return delegate.readUnsignedByte();
- }
-
- public short readShort() throws IOException {
- return delegate.readShort();
- }
-
- public int readUnsignedShort() throws IOException {
- return delegate.readUnsignedShort();
- }
-
- public char readChar() throws IOException {
- return delegate.readChar();
- }
-
- public int readInt() throws IOException {
- return delegate.readInt();
- }
-
- public long readLong() throws IOException {
- return delegate.readLong();
- }
-
- public float readFloat() throws IOException {
- return delegate.readFloat();
- }
-
- public double readDouble() throws IOException {
- return delegate.readDouble();
- }
-
- public String readLine() throws IOException {
- return delegate.readLine();
- }
-
- public String readUTF() throws IOException {
- return delegate.readUTF();
- }
-}
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalProtocol.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalProtocol.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalProtocol.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,164 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.Endpoint;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.util.MessageOutput;
-import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.log.Logger;
-import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
-import org.jboss.cx.remoting.spi.protocol.ProtocolContext;
-import org.jboss.cx.remoting.spi.protocol.ProtocolHandler;
-import org.jboss.cx.remoting.spi.protocol.ProtocolHandlerFactory;
-import org.jboss.cx.remoting.spi.protocol.ProtocolRegistration;
-import org.jboss.cx.remoting.spi.protocol.ProtocolRegistrationSpec;
-import org.jboss.cx.remoting.spi.protocol.ProtocolServerContext;
-import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
-import org.jboss.cx.remoting.spi.protocol.ServiceIdentifier;
-import org.jboss.cx.remoting.spi.protocol.SimpleContextIdentifier;
-import org.jboss.cx.remoting.spi.protocol.SimpleRequestIdentifier;
-import org.jboss.cx.remoting.spi.protocol.SimpleServiceIdentifier;
-import org.jboss.cx.remoting.spi.protocol.SimpleStreamIdentifier;
-import org.jboss.cx.remoting.spi.protocol.StreamIdentifier;
-
-/**
- *
- */
-public final class LocalProtocol {
-
- private static final Logger log = Logger.getLogger(LocalProtocol.class);
-
- private final ConcurrentMap<String, ProtocolServerContext> endpoints = CollectionUtil.concurrentMap();
-
- public void addToEndpoint(Endpoint endpoint) throws RemotingException {
- final ProtocolRegistration registration = endpoint.registerProtocol(ProtocolRegistrationSpec.DEFAULT.setScheme("local").setProtocolHandlerFactory(new Factory(endpoint)));
- final ProtocolServerContext serverContext = registration.getProtocolServerContext();
- if (endpoints.putIfAbsent(endpoint.getName(), serverContext) != null) {
- throw new IllegalArgumentException("Attempt to register duplicate endpoint \"" + endpoint.getName() + "\"");
- }
- }
-
- public final class Factory implements ProtocolHandlerFactory {
- private final String endpointName;
-
- private Factory(final Endpoint endpoint) {
- endpointName = endpoint.getName();
- }
-
- public boolean isLocal(URI uri) {
- return true;
- }
-
- public ProtocolHandler createHandler(ProtocolContext context, URI remoteUri, final AttributeMap attributeMap) throws IOException {
- final String remoteName = remoteUri.getSchemeSpecificPart();
- final ProtocolServerContext serverContext = endpoints.get(remoteName);
- if (serverContext == null) {
- throw new IOException("No local endpoint named \"" + remoteName + "\" could be found");
- }
- return new Handler(serverContext.establishSession(new Handler(context)));
- }
-
- public void close() {
- endpoints.remove(endpointName);
- }
- }
-
- public final class Handler implements ProtocolHandler {
- private final ProtocolContext remoteContext;
-
- private Handler(final ProtocolContext remoteContext) {
- this.remoteContext = remoteContext;
- }
-
- public ContextIdentifier openContext(final ServiceIdentifier serviceIdentifier) throws IOException {
- log.trace("Opening context for local protocol");
- final SimpleContextIdentifier contextIdentifier = new SimpleContextIdentifier();
- remoteContext.receiveOpenedContext(serviceIdentifier, contextIdentifier);
- return contextIdentifier;
- }
-
- public RequestIdentifier openRequest(ContextIdentifier contextIdentifier) throws IOException {
- log.trace("Opening request for local protocol");
- return new SimpleRequestIdentifier();
- }
-
- public StreamIdentifier openStream() throws IOException {
- log.trace("Opening stream for local protocol");
- return new SimpleStreamIdentifier();
- }
-
- public ServiceIdentifier openService() throws IOException {
- return new SimpleServiceIdentifier();
- }
-
- public void closeSession() throws IOException {
- log.trace("Closing session for local protocol");
- remoteContext.closeSession();
- }
-
- public String getRemoteEndpointName() {
- return remoteContext.getLocalEndpointName();
- }
-
- public void closeService(ServiceIdentifier serviceIdentifier) throws IOException {
- }
-
- public void sendContextClose(ContextIdentifier contextIdentifier) throws IOException {
- log.trace("Closing context for local protocol");
- remoteContext.closeContext(contextIdentifier);
- }
-
- public void closeStream(StreamIdentifier streamIdentifier) throws IOException {
- log.trace("Closing stream for local protocol");
- }
-
- public void sendReply(ContextIdentifier remoteContextIdentifier, RequestIdentifier requestIdentifier, Object reply) throws IOException {
- log.trace("Sending stream for local protocol");
- remoteContext.receiveReply(remoteContextIdentifier, requestIdentifier, reply);
- }
-
- public void sendException(ContextIdentifier remoteContextIdentifier, RequestIdentifier requestIdentifier, RemoteExecutionException exception) throws IOException {
- log.trace("Sending exception for local protocol");
- remoteContext.receiveException(remoteContextIdentifier, requestIdentifier, exception);
- }
-
- public void sendRequest(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier, Object request, final Executor streamExecutor) throws IOException {
- log.trace("Sending request for local protocol");
- remoteContext.receiveRequest(contextIdentifier, requestIdentifier, request);
- }
-
- public void sendCancelAcknowledge(ContextIdentifier remoteContextIdentifier, RequestIdentifier requestIdentifier) throws IOException {
- log.trace("Sending cancel acknowledge for local protocol");
- remoteContext.receiveCancelAcknowledge(remoteContextIdentifier, requestIdentifier);
- }
-
- public void sendServiceTerminate(ServiceIdentifier remoteServiceIdentifier) throws IOException {
- }
-
- public ContextIdentifier getLocalRootContextIdentifier() {
- return null;
- }
-
- public ContextIdentifier getRemoteRootContextIdentifier() {
- return null;
- }
-
- public void sendCancelRequest(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier, boolean mayInterrupt) throws IOException {
- log.trace("Sending cancel request for local protocol");
- remoteContext.receiveCancelRequest(contextIdentifier, requestIdentifier, mayInterrupt);
- }
-
- public ContextIdentifier openContext() throws IOException {
- return null;
- }
-
- public MessageOutput sendStreamData(StreamIdentifier streamIdentifier, final Executor streamExeceutor) throws IOException {
- throw new UnsupportedOperationException("streams");
- }
- }
-}
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/OrderedExecutorFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/OrderedExecutorFactory.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/OrderedExecutorFactory.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,50 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public final class OrderedExecutorFactory {
- private final Executor parent;
- private final Set<ChildExecutor> runningChildren = Collections.synchronizedSet(new HashSet<ChildExecutor>());
-
- public OrderedExecutorFactory(final Executor parent) {
- this.parent = parent;
- }
-
- public Executor getOrderedExecutor() {
- return new ChildExecutor();
- }
-
- private final class ChildExecutor implements Executor, Runnable {
- private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
-
- public void execute(Runnable command) {
- synchronized(tasks) {
- tasks.add(command);
- if (tasks.size() == 1 && runningChildren.add(this)) {
- parent.execute(this);
- }
- }
- }
-
- public void run() {
- for (;;) {
- final Runnable task;
- synchronized(tasks) {
- task = tasks.poll();
- if (task == null) {
- runningChildren.remove(this);
- return;
- }
- }
- task.run();
- }
- }
- }
-}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolContextClient.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolContextClient.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolContextClient.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,11 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
+
+/**
+ *
+ */
+public interface ProtocolContextClient<I, O> extends ContextClient {
+ RequestClient<O> addRequest(RequestIdentifier requestIdentifier);
+
+}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolContextServer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolContextServer.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ProtocolContextServer.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,10 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
+
+/**
+ *
+ */
+public interface ProtocolContextServer<I, O> extends ContextServer<I, O> {
+ RequestClient<O> getRequestClient(RequestIdentifier requestIdentifier);
+}
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/QueueExecutor.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/QueueExecutor.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/QueueExecutor.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,100 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.log.Logger;
-
-/**
- * An executor designed to run all submitted tasks in the current thread. The queue is run continuously
- * until the {@code shutdown()} method is invoked. Jobs may be submitted to the queue from any thread.
- * Only one thread should invoke the {@code runQueue()} method, which will run until the executor is
- * shut down.
- */
-public final class QueueExecutor implements Executor {
- private static final Logger log = Logger.getLogger(QueueExecutor.class);
-
- private final Queue<Runnable> queue = new LinkedList<Runnable>();
-
- private State state = State.WAITING;
-
- private enum State {
- RUNNING,
- WAITING,
- STOPPING,
- DOWN,
- }
-
- public void execute(Runnable command) {
- synchronized(queue) {
- switch (state) {
- case WAITING:
- state = State.RUNNING;
- queue.notify();
- // fall thru
- case RUNNING:
- queue.add(command);
- break;
- default:
- throw new IllegalStateException("Executor is no longer available");
- }
- }
- }
-
- public void runQueue() {
- boolean intr = Thread.interrupted();
- try {
- for (;;) {
- final State newState;
- synchronized(queue) {
- while (state == State.WAITING) {
- try {
- queue.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- if (state == State.DOWN) {
- throw new IllegalStateException("DOWN");
- }
- newState = state;
- }
- for (;;) {
- final Runnable runnable;
- synchronized(queue) {
- runnable = queue.poll();
- if (runnable == null) {
- break;
- }
- }
- try {
- runnable.run();
- } catch (Throwable t) {
- log.trace(t, "Error occurred while processing run queue");
- }
- }
- if (newState == State.STOPPING) {
- synchronized(queue) {
- state = State.DOWN;
- return;
- }
- }
- }
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public void shutdown() {
- synchronized(queue) {
- switch (state) {
- case WAITING:
- queue.notify();
- case RUNNING:
- state = State.STOPPING;
- }
- }
- }
-}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClassLoader.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClassLoader.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClassLoader.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -26,6 +26,11 @@
protected Class<?> findClass(String name) throws ClassNotFoundException {
try {
+ return super.findClass(name);
+ } catch (ClassNotFoundException e) {
+ // continue on...
+ }
+ try {
final ClassLoaderResourceReply reply = loaderContext.invoke(new ClassLoaderResourceRequest(name + ".class"));
final ObjectSource<RemoteResource> source = reply.getResources();
try {
@@ -56,9 +61,16 @@
} catch (RemotingException e) {
throw new ClassNotFoundException("Cannot load class " + name + " due to an invocation failure", e);
} catch (RemoteExecutionException e) {
- throw new ClassNotFoundException("Cannot load class " + name + " due to a remote invocation failure", e.getCause());
+ final Throwable cause = e.getCause();
+ if (cause instanceof ClassNotFoundException) {
+ throw (ClassNotFoundException)cause;
+ }
+ throw new ClassNotFoundException("Cannot load class " + name + " due to a remote invocation failure", cause);
} catch (IOException e) {
- throw new ClassNotFoundException("Cannot load class " + name + " due to an input/output error", e);
+ throw new ClassNotFoundException("Cannot load class " + name + " due to an I/O error", e);
}
}
+
+ // todo - support arbitrary resources
+ // todo - cache fetched resources locally for forwarding
}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestClient.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestClient.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestClient.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,17 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.RemoteExecutionException;
+
+/**
+ *
+ */
+public interface RequestClient<O> {
+ // Outbound protocol messages
+
+ void handleReply(final O reply) throws RemotingException;
+
+ void handleException(final RemoteExecutionException cause) throws RemotingException;
+
+ void handleCancelAcknowledge() throws RemotingException;
+}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestServer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestServer.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestServer.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,16 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.RemotingException;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public interface RequestServer<I> {
+
+ // Outbound protocol messages
+
+ void handleRequest(I request, final Executor streamExecutor) throws RemotingException;
+
+ void handleCancelRequest(boolean mayInterrupt) throws RemotingException;
+}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceClient.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceClient.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceClient.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,10 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.RemotingException;
+
+/**
+ *
+ */
+public interface ServiceClient {
+ void handleClosing() throws RemotingException;
+}
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,59 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import org.jboss.cx.remoting.RequestListener;
-import org.jboss.cx.remoting.RequestContext;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.ContextSource;
-import org.jboss.cx.remoting.Context;
-import org.jboss.cx.remoting.util.ServiceURI;
-import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.service.ServiceRequest;
-import org.jboss.cx.remoting.service.ServiceReply;
-import java.net.URI;
-import java.util.concurrent.ConcurrentMap;
-import java.util.SortedMap;
-
-/**
- *
- */
-public final class ServiceLocatorListener<I, O> implements RequestListener<ServiceRequest<I, O>, ServiceReply<I, O>> {
-
- private interface Service {
- String getGroupName();
-
- String getType();
-
- // todo - add in whatever negotation to the request object (security?)
- <X, Y> Context<Void, ServiceReply<X, Y>> getServiceChannel();
- }
-
- private interface Peer {
- String getName();
-
- int getCost();
-
- <X, Y> Context<ServiceRequest<X, Y>, ServiceReply<X, Y>> getLocatorContext();
-
- SortedMap<String, Service> getServicesByGroupName();
-
- SortedMap<String, Service> getServicesByType();
- }
-
- private static <K, V> ConcurrentMap<K, V> syncMap() {
- return CollectionUtil.concurrentMap(CollectionUtil.<K, V>hashMap());
- }
-
- private final ConcurrentMap<String, ConcurrentMap<String, ContextSource<?, ?>>> deployments = syncMap();
-
- public void handleRequest(final RequestContext<ServiceReply<I, O>> requestContext, final ServiceRequest<I, O> request) throws RemoteExecutionException, InterruptedException {
- final URI uri = request.getUri();
- final ServiceURI serviceURI = new ServiceURI(uri);
- final String endpointName = serviceURI.getEndpointName();
- final String groupName = serviceURI.getGroupName();
- final String serviceType = serviceURI.getServiceType();
-
-
- }
-
-
-}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceServer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceServer.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceServer.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,12 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.RemotingException;
+
+/**
+ *
+ */
+public interface ServiceServer<I, O> {
+ void handleClose() throws RemotingException;
+
+ ContextServer<I, O> createNewContext(ContextClient client) throws RemotingException;
+}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -12,21 +12,18 @@
*/
public final class StreamMarker implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
private Class<? extends StreamSerializerFactory> factoryClass;
private StreamIdentifier streamIdentifier;
- private CoreSession coreSession;
- public StreamMarker(final CoreSession coreSession, final Class<? extends StreamSerializerFactory> factoryClass, final StreamIdentifier streamIdentifier) {
- if (coreSession == null) {
- throw new NullPointerException("coreSession is null");
- }
+ public StreamMarker(final Class<? extends StreamSerializerFactory> factoryClass, final StreamIdentifier streamIdentifier) {
if (factoryClass == null) {
throw new NullPointerException("factoryClass is null");
}
if (streamIdentifier == null) {
throw new NullPointerException("streamIdentifier is null");
}
- this.coreSession = coreSession;
this.factoryClass = factoryClass;
this.streamIdentifier = streamIdentifier;
}
@@ -49,7 +46,6 @@
@SuppressWarnings ({"unchecked"})
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- coreSession = CoreSession.getInstance();
factoryClass = (Class<? extends StreamSerializerFactory>) in.readObject();
streamIdentifier = (StreamIdentifier) in.readObject();
}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ClassLoaderResourceListener.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ClassLoaderResourceListener.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ClassLoaderResourceListener.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,73 @@
+package org.jboss.cx.remoting.core.service;
+
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.RequestContext;
+import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.IOExceptionCarrier;
+import org.jboss.cx.remoting.stream.Streams;
+import org.jboss.cx.remoting.stream.ObjectSource;
+import org.jboss.cx.remoting.stream.ObjectSourceWrapper;
+import org.jboss.cx.remoting.util.CollectionUtil;
+import org.jboss.cx.remoting.util.Translator;
+import org.jboss.cx.remoting.service.ClassLoaderResourceRequest;
+import org.jboss.cx.remoting.service.ClassLoaderResourceReply;
+import org.jboss.cx.remoting.service.RemoteResource;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.Enumeration;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class ClassLoaderResourceListener implements RequestListener<ClassLoaderResourceRequest, ClassLoaderResourceReply> {
+ private ClassLoader classLoader;
+
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+
+ public void setClassLoader(final ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ }
+
+ public void handleOpen() {
+ }
+
+ public void handleRequest(final RequestContext<ClassLoaderResourceReply> requestContext, final ClassLoaderResourceRequest request) throws RemoteExecutionException, InterruptedException {
+ try {
+ final Enumeration<URL> urlResources = classLoader.getResources(request.getName());
+ final Enumeration<RemoteResource> actualResources = CollectionUtil.translate(urlResources, new Translator<URL, RemoteResource>() {
+ public RemoteResource translate(final URL input) {
+ try {
+ final RemoteResource resource = new RemoteResource();
+ final URLConnection urlConnection = input.openConnection();
+ final int size = urlConnection.getContentLength();
+ resource.setInputStream(urlConnection.getInputStream());
+ resource.setSize(size);
+ return resource;
+ } catch (IOException ex) {
+ throw new IOExceptionCarrier(ex);
+ }
+ }
+ });
+ final ObjectSource<RemoteResource> resourceSequence = new ObjectSourceWrapper<RemoteResource>(Streams.getEnumerationObjectSource(actualResources)) {
+ public RemoteResource next() throws IOException {
+ try {
+ return super.next();
+ } catch (IOExceptionCarrier ex) {
+ throw ex.getCause();
+ }
+ }
+ };
+ ClassLoaderResourceReply reply = new ClassLoaderResourceReply();
+ reply.setResources(resourceSequence);
+ requestContext.sendReply(reply);
+ } catch (IOException e) {
+ throw new RemoteExecutionException("Unable to get resources: " + e.getMessage(), e);
+ }
+ }
+
+ public void handleClose() {
+ }
+}
Copied: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ServiceLocatorListener.java (from rev 3517, remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java)
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ServiceLocatorListener.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/service/ServiceLocatorListener.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,65 @@
+package org.jboss.cx.remoting.core.service;
+
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.RequestContext;
+import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.ContextSource;
+import org.jboss.cx.remoting.Context;
+import org.jboss.cx.remoting.util.ServiceURI;
+import org.jboss.cx.remoting.util.CollectionUtil;
+import org.jboss.cx.remoting.service.ServiceRequest;
+import org.jboss.cx.remoting.service.ServiceReply;
+import java.net.URI;
+import java.util.concurrent.ConcurrentMap;
+import java.util.SortedMap;
+
+/**
+ *
+ */
+public final class ServiceLocatorListener<I, O> implements RequestListener<ServiceRequest<I, O>, ServiceReply<I, O>> {
+
+ private interface Service {
+ String getGroupName();
+
+ String getType();
+
+ // todo - add in whatever negotation to the request object (security?)
+ <X, Y> Context<Void, ServiceReply<X, Y>> getServiceChannel();
+ }
+
+ private interface Peer {
+ String getName();
+
+ int getCost();
+
+ <X, Y> Context<ServiceRequest<X, Y>, ServiceReply<X, Y>> getLocatorContext();
+
+ SortedMap<String, Service> getServicesByGroupName();
+
+ SortedMap<String, Service> getServicesByType();
+ }
+
+ private static <K, V> ConcurrentMap<K, V> syncMap() {
+ return CollectionUtil.synchronizedMap(CollectionUtil.<K, V>hashMap());
+ }
+
+ private final ConcurrentMap<String, ConcurrentMap<String, ContextSource<?, ?>>> deployments = syncMap();
+
+ public void handleOpen() {
+ }
+
+ public void handleRequest(final RequestContext<ServiceReply<I, O>> requestContext, final ServiceRequest<I, O> request) throws RemoteExecutionException, InterruptedException {
+ final URI uri = request.getUri();
+ final ServiceURI serviceURI = new ServiceURI(uri);
+ final String endpointName = serviceURI.getEndpointName();
+ final String groupName = serviceURI.getGroupName();
+ final String serviceType = serviceURI.getServiceType();
+
+
+ }
+
+ public void handleClose() {
+ }
+
+
+}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/DefaultStreamDetector.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/DefaultStreamDetector.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/DefaultStreamDetector.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -7,7 +7,6 @@
import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
import org.jboss.cx.remoting.stream.ObjectSink;
import org.jboss.cx.remoting.stream.ObjectSource;
-import org.jboss.cx.remoting.stream.ProgressStream;
/**
*
@@ -22,8 +21,6 @@
return new ObjectSourceStreamSerializerFactory();
} else if (candidate instanceof ObjectSink) {
return new ObjectSinkStreamSerializerFactory();
- } else if (candidate instanceof ProgressStream) {
- return new ProgressStreamStreamSerializerFactory();
} else if (candidate instanceof Iterator) {
return new IteratorStreamSerializerFactory();
} else {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -3,8 +3,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
-import org.jboss.cx.remoting.util.MessageInput;
-import org.jboss.cx.remoting.util.MessageOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageInput;
+import org.jboss.cx.remoting.spi.ObjectMessageOutput;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.spi.stream.RemoteStreamSerializer;
import org.jboss.cx.remoting.spi.stream.StreamContext;
@@ -50,12 +50,12 @@
sendNext();
}
- public void handleData(MessageInput data) throws IOException {
+ public void handleData(ObjectMessageInput data) throws IOException {
sendNext();
}
private void sendNext() throws IOException {
- final MessageOutput output = context.writeMessage();
+ final ObjectMessageOutput output = context.writeMessage();
final byte[] bytes = new byte[BUF_LEN];
int i, t;
boolean end = false;
@@ -151,7 +151,7 @@
public void handleOpen() throws IOException {
}
- public void handleData(MessageInput data) throws IOException {
+ public void handleData(ObjectMessageInput data) throws IOException {
synchronized(messageQueue) {
for (;;) {
final int d = data.read();
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/IteratorStreamSerializerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/IteratorStreamSerializerFactory.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/IteratorStreamSerializerFactory.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -2,7 +2,7 @@
import java.io.IOException;
import java.util.Iterator;
-import org.jboss.cx.remoting.util.MessageInput;
+import org.jboss.cx.remoting.spi.ObjectMessageInput;
import org.jboss.cx.remoting.spi.stream.RemoteStreamSerializer;
import org.jboss.cx.remoting.spi.stream.StreamContext;
import org.jboss.cx.remoting.spi.stream.StreamSerializer;
@@ -61,7 +61,7 @@
public void handleOpen() throws IOException {
}
- public void handleData(MessageInput data) throws IOException {
+ public void handleData(ObjectMessageInput data) throws IOException {
}
public void handleClose() throws IOException {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSinkStreamSerializerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSinkStreamSerializerFactory.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSinkStreamSerializerFactory.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,8 +1,8 @@
package org.jboss.cx.remoting.core.stream;
import java.io.IOException;
-import org.jboss.cx.remoting.util.MessageInput;
-import org.jboss.cx.remoting.util.MessageOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageInput;
+import org.jboss.cx.remoting.spi.ObjectMessageOutput;
import org.jboss.cx.remoting.spi.stream.RemoteStreamSerializer;
import org.jboss.cx.remoting.spi.stream.StreamContext;
import org.jboss.cx.remoting.spi.stream.StreamSerializer;
@@ -42,7 +42,7 @@
}
@SuppressWarnings ({"unchecked"})
- public void handleData(MessageInput data) throws IOException {
+ public void handleData(ObjectMessageInput data) throws IOException {
MessageType messageType = MessageType.values()[data.read()];
switch (messageType) {
case DATA:
@@ -73,14 +73,14 @@
public ObjectSink<?> getRemoteInstance() {
return new ObjectSink<Object>() {
public void accept(final Object instance) throws IOException {
- final MessageOutput msg = context.writeMessage();
+ final ObjectMessageOutput msg = context.writeMessage();
msg.write(MessageType.DATA.ordinal());
msg.writeObject(instance);
msg.commit();
}
public void flush() throws IOException {
- final MessageOutput msg = context.writeMessage();
+ final ObjectMessageOutput msg = context.writeMessage();
msg.write(MessageType.FLUSH.ordinal());
msg.commit();
}
@@ -94,7 +94,7 @@
public void handleOpen() throws IOException {
}
- public void handleData(MessageInput data) throws IOException {
+ public void handleData(ObjectMessageInput data) throws IOException {
}
public void handleClose() throws IOException {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -4,8 +4,8 @@
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.Queue;
-import org.jboss.cx.remoting.util.MessageInput;
-import org.jboss.cx.remoting.util.MessageOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageInput;
+import org.jboss.cx.remoting.spi.ObjectMessageOutput;
import org.jboss.cx.remoting.spi.stream.RemoteStreamSerializer;
import org.jboss.cx.remoting.spi.stream.StreamContext;
import org.jboss.cx.remoting.spi.stream.StreamSerializer;
@@ -37,7 +37,7 @@
transmitNext();
}
- public void handleData(MessageInput data) throws IOException {
+ public void handleData(ObjectMessageInput data) throws IOException {
transmitNext();
}
@@ -46,7 +46,7 @@
}
private void transmitNext() throws IOException {
- final MessageOutput msg = streamContext.writeMessage();
+ final ObjectMessageOutput msg = streamContext.writeMessage();
final boolean hasNext = objectSource.hasNext();
msg.writeBoolean(hasNext);
if (hasNext) {
@@ -121,7 +121,7 @@
}
}
final Message msg = messageQueue.remove();
- final MessageOutput omsg;
+ final ObjectMessageOutput omsg;
switch (msg.type) {
case ITEM:
omsg = context.writeMessage();
@@ -161,7 +161,7 @@
}
@SuppressWarnings ({"unchecked"})
- public void handleData(MessageInput data) throws IOException {
+ public void handleData(ObjectMessageInput data) throws IOException {
synchronized(messageQueue) {
if (! data.readBoolean()) {
messageQueue.add(new Message(Type.END, null));
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/OutputStreamStreamSerailizerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/OutputStreamStreamSerailizerFactory.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/OutputStreamStreamSerailizerFactory.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -2,8 +2,8 @@
import java.io.IOException;
import java.io.OutputStream;
-import org.jboss.cx.remoting.util.MessageInput;
-import org.jboss.cx.remoting.util.MessageOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageInput;
+import org.jboss.cx.remoting.spi.ObjectMessageOutput;
import org.jboss.cx.remoting.spi.stream.RemoteStreamSerializer;
import org.jboss.cx.remoting.spi.stream.StreamContext;
import org.jboss.cx.remoting.spi.stream.StreamSerializer;
@@ -39,7 +39,7 @@
public void handleOpen() throws IOException {
}
- public void handleData(MessageInput data) throws IOException {
+ public void handleData(ObjectMessageInput data) throws IOException {
MessageType messageType = MessageType.values()[data.read()];
switch (messageType) {
case DATA:
@@ -64,7 +64,7 @@
private static final class RemoteStreamSerializerImpl implements RemoteStreamSerializer {
private final StreamContext context;
private final Object monitor = new Object();
- private MessageOutput current;
+ private ObjectMessageOutput current;
public RemoteStreamSerializerImpl(final StreamContext context) {
this.context = context;
@@ -116,7 +116,7 @@
public void handleOpen() throws IOException {
}
- public void handleData(MessageInput data) throws IOException {
+ public void handleData(ObjectMessageInput data) throws IOException {
// ignore
}
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ProgressStreamStreamSerializerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ProgressStreamStreamSerializerFactory.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ProgressStreamStreamSerializerFactory.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,79 +0,0 @@
-package org.jboss.cx.remoting.core.stream;
-
-import java.io.IOException;
-import org.jboss.cx.remoting.util.MessageInput;
-import org.jboss.cx.remoting.util.MessageOutput;
-import org.jboss.cx.remoting.spi.stream.RemoteStreamSerializer;
-import org.jboss.cx.remoting.spi.stream.StreamContext;
-import org.jboss.cx.remoting.spi.stream.StreamSerializer;
-import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
-import org.jboss.cx.remoting.stream.ProgressStream;
-
-/**
- *
- */
-public final class ProgressStreamStreamSerializerFactory implements StreamSerializerFactory {
- public StreamSerializer getLocalSide(StreamContext context, Object local) throws IOException {
- return new StreamSerializerImpl((ProgressStream)local);
- }
-
- public RemoteStreamSerializer getRemoteSide(StreamContext context) throws IOException {
- return new RemoteStreamSerializerImpl(context);
- }
-
- public static final class StreamSerializerImpl implements StreamSerializer {
- private final ProgressStream progressStream;
-
- public StreamSerializerImpl(final ProgressStream progressStream) {
- this.progressStream = progressStream;
- }
-
- public void handleOpen() throws IOException {
- }
-
- public void handleData(MessageInput data) throws IOException {
- final String operationTitle = data.readUTF();
- final int unitsDone = data.readInt();
- final int totalUnits = data.readInt();
- final boolean approx = data.readBoolean();
- progressStream.update(operationTitle, unitsDone, totalUnits, approx);
- }
-
- public void handleClose() throws IOException {
- }
- }
-
- public static final class RemoteStreamSerializerImpl implements RemoteStreamSerializer {
- private final StreamContext context;
-
- public RemoteStreamSerializerImpl(final StreamContext context) {
- this.context = context;
- }
-
- public ProgressStream getRemoteInstance() {
- return new ProgressStream() {
- public void update(String operationTitle, int unitsDone, int totalUnits, boolean approx) {
- try {
- final MessageOutput msg = context.writeMessage();
- msg.writeUTF(operationTitle);
- msg.writeInt(unitsDone);
- msg.writeInt(totalUnits);
- msg.writeBoolean(approx);
- msg.commit();
- } catch (IOException e) {
- // todo - log?
- }
- }
- };
- }
-
- public void handleOpen() throws IOException {
- }
-
- public void handleData(MessageInput data) throws IOException {
- }
-
- public void handleClose() throws IOException {
- }
- }
-}
Copied: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DelegatingObjectInput.java (from rev 3517, remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/DelegatingObjectInput.java)
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DelegatingObjectInput.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DelegatingObjectInput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,103 @@
+package org.jboss.cx.remoting.core.util;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+
+/**
+ *
+ */
+public class DelegatingObjectInput implements ObjectInput {
+ private final ObjectInput delegate;
+
+ public DelegatingObjectInput(final ObjectInput delegate) {
+ this.delegate = delegate;
+ }
+
+ public int read() throws IOException {
+ return delegate.read();
+ }
+
+ public int read(final byte[] data) throws IOException {
+ return delegate.read(data);
+ }
+
+ public int read(final byte[] data, final int offs, final int len) throws IOException {
+ return delegate.read(data, offs, len);
+ }
+
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ public Object readObject() throws ClassNotFoundException, IOException {
+ return delegate.readObject();
+ }
+
+ public long skip(final long n) throws IOException {
+ return delegate.skip(n);
+ }
+
+ public int available() throws IOException {
+ return delegate.available();
+ }
+
+ public void readFully(final byte[] b) throws IOException {
+ delegate.readFully(b);
+ }
+
+ public void readFully(final byte[] b, final int off, final int len) throws IOException {
+ delegate.readFully(b, off, len);
+ }
+
+ public int skipBytes(final int n) throws IOException {
+ return delegate.skipBytes(n);
+ }
+
+ public boolean readBoolean() throws IOException {
+ return delegate.readBoolean();
+ }
+
+ public byte readByte() throws IOException {
+ return delegate.readByte();
+ }
+
+ public int readUnsignedByte() throws IOException {
+ return delegate.readUnsignedByte();
+ }
+
+ public short readShort() throws IOException {
+ return delegate.readShort();
+ }
+
+ public int readUnsignedShort() throws IOException {
+ return delegate.readUnsignedShort();
+ }
+
+ public char readChar() throws IOException {
+ return delegate.readChar();
+ }
+
+ public int readInt() throws IOException {
+ return delegate.readInt();
+ }
+
+ public long readLong() throws IOException {
+ return delegate.readLong();
+ }
+
+ public float readFloat() throws IOException {
+ return delegate.readFloat();
+ }
+
+ public double readDouble() throws IOException {
+ return delegate.readDouble();
+ }
+
+ public String readLine() throws IOException {
+ return delegate.readLine();
+ }
+
+ public String readUTF() throws IOException {
+ return delegate.readUTF();
+ }
+}
Copied: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java (from rev 3517, remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/OrderedExecutorFactory.java)
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,50 @@
+package org.jboss.cx.remoting.core.util;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public final class OrderedExecutorFactory {
+ private final Executor parent;
+ private final Set<ChildExecutor> runningChildren = Collections.synchronizedSet(new HashSet<ChildExecutor>());
+
+ public OrderedExecutorFactory(final Executor parent) {
+ this.parent = parent;
+ }
+
+ public Executor getOrderedExecutor() {
+ return new ChildExecutor();
+ }
+
+ private final class ChildExecutor implements Executor, Runnable {
+ private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
+
+ public void execute(Runnable command) {
+ synchronized(tasks) {
+ tasks.add(command);
+ if (tasks.size() == 1 && runningChildren.add(this)) {
+ parent.execute(this);
+ }
+ }
+ }
+
+ public void run() {
+ for (;;) {
+ final Runnable task;
+ synchronized(tasks) {
+ task = tasks.poll();
+ if (task == null) {
+ runningChildren.remove(this);
+ return;
+ }
+ }
+ task.run();
+ }
+ }
+ }
+}
Copied: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/QueueExecutor.java (from rev 3517, remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/QueueExecutor.java)
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/QueueExecutor.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/QueueExecutor.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,100 @@
+package org.jboss.cx.remoting.core.util;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.log.Logger;
+
+/**
+ * An executor designed to run all submitted tasks in the current thread. The queue is run continuously
+ * until the {@code shutdown()} method is invoked. Jobs may be submitted to the queue from any thread.
+ * Only one thread should invoke the {@code runQueue()} method, which will run until the executor is
+ * shut down.
+ */
+public final class QueueExecutor implements Executor {
+ private static final Logger log = Logger.getLogger(QueueExecutor.class);
+
+ private final Queue<Runnable> queue = new LinkedList<Runnable>();
+
+ private State state = State.WAITING;
+
+ private enum State {
+ RUNNING,
+ WAITING,
+ STOPPING,
+ DOWN,
+ }
+
+ public void execute(Runnable command) {
+ synchronized(queue) {
+ switch (state) {
+ case WAITING:
+ state = State.RUNNING;
+ queue.notify();
+ // fall thru
+ case RUNNING:
+ queue.add(command);
+ break;
+ default:
+ throw new IllegalStateException("Executor is no longer available");
+ }
+ }
+ }
+
+ public void runQueue() {
+ boolean intr = Thread.interrupted();
+ try {
+ for (;;) {
+ final State newState;
+ synchronized(queue) {
+ while (state == State.WAITING) {
+ try {
+ queue.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ if (state == State.DOWN) {
+ throw new IllegalStateException("DOWN");
+ }
+ newState = state;
+ }
+ for (;;) {
+ final Runnable runnable;
+ synchronized(queue) {
+ runnable = queue.poll();
+ if (runnable == null) {
+ break;
+ }
+ }
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ log.trace(t, "Error occurred while processing run queue");
+ }
+ }
+ if (newState == State.STOPPING) {
+ synchronized(queue) {
+ state = State.DOWN;
+ return;
+ }
+ }
+ }
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void shutdown() {
+ synchronized(queue) {
+ switch (state) {
+ case WAITING:
+ queue.notify();
+ case RUNNING:
+ state = State.STOPPING;
+ }
+ }
+ }
+}
Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionImpl.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionImpl.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionImpl.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -6,8 +6,8 @@
import org.jboss.cx.remoting.http.spi.OutgoingHttpMessage;
import org.jboss.cx.remoting.http.spi.AbstractOutgoingHttpMessage;
import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.util.MessageOutput;
-import org.jboss.cx.remoting.util.ByteOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageOutput;
+import org.jboss.cx.remoting.spi.ByteMessageOutput;
import org.jboss.cx.remoting.spi.protocol.ProtocolContext;
import org.jboss.cx.remoting.spi.protocol.ProtocolHandler;
import org.jboss.cx.remoting.spi.protocol.ServiceIdentifier;
@@ -130,8 +130,8 @@
public void sendServiceActivate(final ServiceIdentifier remoteServiceIdentifier) throws IOException {
outgoingQueue.add(new OutputAction() {
- public void run(ByteOutput target) throws IOException {
- final MessageOutput msgOutput = protocolContext.getMessageOutput(target);
+ public void run(ByteMessageOutput target) throws IOException {
+ final ObjectMessageOutput msgOutput = protocolContext.getMessageOutput(target);
write(msgOutput, MsgType.SERVICE_ACTIVATE);
write(msgOutput, remoteServiceIdentifier);
msgOutput.commit();
@@ -141,8 +141,8 @@
public void sendReply(final ContextIdentifier remoteContextIdentifier, final RequestIdentifier requestIdentifier, final Object reply) throws IOException {
// we have to buffer because reply might be mutable!
- final BufferedByteOutput output = new BufferedByteOutput(256);
- final MessageOutput msgOutput = protocolContext.getMessageOutput(output);
+ final BufferedByteMessageOutput output = new BufferedByteMessageOutput(256);
+ final ObjectMessageOutput msgOutput = protocolContext.getMessageOutput(output);
write(msgOutput, MsgType.REPLY);
write(msgOutput, remoteContextIdentifier);
write(msgOutput, requestIdentifier);
@@ -152,8 +152,8 @@
public void sendException(final ContextIdentifier remoteContextIdentifier, final RequestIdentifier requestIdentifier, final RemoteExecutionException exception) throws IOException {
// we have to buffer because exception might contain mutable elements
- final BufferedByteOutput output = new BufferedByteOutput(256);
- final MessageOutput msgOutput = protocolContext.getMessageOutput(output);
+ final BufferedByteMessageOutput output = new BufferedByteMessageOutput(256);
+ final ObjectMessageOutput msgOutput = protocolContext.getMessageOutput(output);
write(msgOutput, MsgType.EXCEPTION);
write(msgOutput, remoteContextIdentifier);
write(msgOutput, requestIdentifier);
@@ -163,8 +163,8 @@
public void sendCancelAcknowledge(final ContextIdentifier remoteContextIdentifier, final RequestIdentifier requestIdentifier) throws IOException {
outgoingQueue.add(new OutputAction() {
- public void run(ByteOutput target) throws IOException {
- final MessageOutput msgOutput = protocolContext.getMessageOutput(target);
+ public void run(ByteMessageOutput target) throws IOException {
+ final ObjectMessageOutput msgOutput = protocolContext.getMessageOutput(target);
write(msgOutput, MsgType.CANCEL_ACK);
write(msgOutput, remoteContextIdentifier);
write(msgOutput, requestIdentifier);
@@ -173,10 +173,10 @@
});
}
- public void sendServiceTerminate(final ServiceIdentifier remoteServiceIdentifier) throws IOException {
+ public void sendServiceClosing(final ServiceIdentifier remoteServiceIdentifier) throws IOException {
outgoingQueue.add(new OutputAction() {
- public void run(ByteOutput target) throws IOException {
- final MessageOutput msgOutput = protocolContext.getMessageOutput(target);
+ public void run(ByteMessageOutput target) throws IOException {
+ final ObjectMessageOutput msgOutput = protocolContext.getMessageOutput(target);
write(msgOutput, MsgType.SERVICE_TERMINATE);
write(msgOutput, remoteServiceIdentifier);
msgOutput.commit();
@@ -184,6 +184,9 @@
});
}
+ public void sendContextClosing(final ContextIdentifier remoteContextIdentifier, final boolean done) throws IOException {
+ }
+
public ContextIdentifier getLocalRootContextIdentifier() {
return null;
}
@@ -195,8 +198,8 @@
public ContextIdentifier openContext(final ServiceIdentifier serviceIdentifier) throws IOException {
final ContextIdentifier contextIdentifier = null;
outgoingQueue.add(new OutputAction() {
- public void run(ByteOutput target) throws IOException {
- final MessageOutput msgOutput = protocolContext.getMessageOutput(target);
+ public void run(ByteMessageOutput target) throws IOException {
+ final ObjectMessageOutput msgOutput = protocolContext.getMessageOutput(target);
write(msgOutput, MsgType.CONTEXT_OPENED);
write(msgOutput, serviceIdentifier);
write(msgOutput, contextIdentifier);
@@ -206,10 +209,10 @@
return contextIdentifier;
}
- public void sendContextClose(final ContextIdentifier contextIdentifier) throws IOException {
+ public void sendContextClose(final ContextIdentifier contextIdentifier, final boolean immediate, final boolean cancel, final boolean interrupt) throws IOException {
outgoingQueue.add(new OutputAction() {
- public void run(ByteOutput target) throws IOException {
- final MessageOutput msgOutput = protocolContext.getMessageOutput(target);
+ public void run(ByteMessageOutput target) throws IOException {
+ final ObjectMessageOutput msgOutput = protocolContext.getMessageOutput(target);
write(msgOutput, MsgType.CLOSE_CONTEXT);
write(msgOutput, contextIdentifier);
msgOutput.commit();
@@ -225,10 +228,10 @@
return null;
}
- public void closeService(final ServiceIdentifier serviceIdentifier) throws IOException {
+ public void sendServiceClose(final ServiceIdentifier serviceIdentifier) throws IOException {
outgoingQueue.add(new OutputAction() {
- public void run(ByteOutput target) throws IOException {
- final MessageOutput msgOutput = protocolContext.getMessageOutput(target);
+ public void run(ByteMessageOutput target) throws IOException {
+ final ObjectMessageOutput msgOutput = protocolContext.getMessageOutput(target);
write(msgOutput, MsgType.CLOSE_SERVICE);
write(msgOutput, serviceIdentifier);
msgOutput.commit();
@@ -238,8 +241,8 @@
public void sendRequest(final ContextIdentifier contextIdentifier, final RequestIdentifier requestIdentifier, final Object request, final Executor streamExecutor) throws IOException {
// we have to buffer because request might be mutable!
- final BufferedByteOutput output = new BufferedByteOutput(256);
- final MessageOutput msgOutput = protocolContext.getMessageOutput(output, streamExecutor);
+ final BufferedByteMessageOutput output = new BufferedByteMessageOutput(256);
+ final ObjectMessageOutput msgOutput = protocolContext.getMessageOutput(output, streamExecutor);
write(msgOutput, MsgType.REQUEST);
write(msgOutput, contextIdentifier);
write(msgOutput, requestIdentifier);
@@ -249,8 +252,8 @@
public void sendCancelRequest(final ContextIdentifier contextIdentifier, final RequestIdentifier requestIdentifier, final boolean mayInterrupt) throws IOException {
outgoingQueue.add(new OutputAction() {
- public void run(ByteOutput target) throws IOException {
- final MessageOutput msgOutput = protocolContext.getMessageOutput(target);
+ public void run(ByteMessageOutput target) throws IOException {
+ final ObjectMessageOutput msgOutput = protocolContext.getMessageOutput(target);
write(msgOutput, MsgType.CANCEL_REQUEST);
write(msgOutput, contextIdentifier);
write(msgOutput, requestIdentifier);
@@ -270,8 +273,8 @@
public void closeStream(final StreamIdentifier streamIdentifier) throws IOException {
outgoingQueue.add(new OutputAction() {
- public void run(ByteOutput target) throws IOException {
- final MessageOutput msgOutput = protocolContext.getMessageOutput(target);
+ public void run(ByteMessageOutput target) throws IOException {
+ final ObjectMessageOutput msgOutput = protocolContext.getMessageOutput(target);
write(msgOutput, MsgType.CLOSE_STREAM);
write(msgOutput, streamIdentifier);
msgOutput.commit();
@@ -287,8 +290,8 @@
write(output, identifier);
}
- public MessageOutput sendStreamData(StreamIdentifier streamIdentifier, Executor streamExecutor) throws IOException {
- return protocolContext.getMessageOutput(new BufferedByteOutput(256), streamExecutor);
+ public ObjectMessageOutput sendStreamData(StreamIdentifier streamIdentifier, Executor streamExecutor) throws IOException {
+ return protocolContext.getMessageOutput(new BufferedByteMessageOutput(256), streamExecutor);
}
public void closeSession() throws IOException {
@@ -299,12 +302,12 @@
}
}
- public class BufferedByteOutput implements ByteOutput, OutputAction {
+ public class BufferedByteMessageOutput implements ByteMessageOutput, OutputAction {
private final int bufsize;
private final List<byte[]> bufferList = new ArrayList<byte[]>();
private int sizeOfLast;
- public BufferedByteOutput(final int bufsize) {
+ public BufferedByteMessageOutput(final int bufsize) {
this.bufsize = bufsize;
}
@@ -370,7 +373,7 @@
public void flush() throws IOException {
}
- public void run(ByteOutput output) throws IOException {
+ public void run(ByteMessageOutput output) throws IOException {
final Iterator<byte[]> iterator = bufferList.iterator();
if (! iterator.hasNext()) {
return;
@@ -392,7 +395,7 @@
}
private interface OutputAction {
- void run(ByteOutput target) throws IOException;
+ void run(ByteMessageOutput target) throws IOException;
}
private final class OutgoingActionHttpMessage extends AbstractOutgoingHttpMessage {
@@ -406,8 +409,8 @@
addHeader(Http.HEADER_SEQ, Long.toString(sequenceValue, 16));
}
- public void writeMessageData(ByteOutput byteOutput) throws IOException {
- final MessageOutput msgOut = protocolContext.getMessageOutput(byteOutput);
+ public void writeMessageData(ByteMessageOutput byteOutput) throws IOException {
+ final ObjectMessageOutput msgOut = protocolContext.getMessageOutput(byteOutput);
msgOut.writeInt(PROTOCOL_VERSION);
write(msgOut, MsgType.DATA_START);
msgOut.writeLong(sequenceValue);
Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/IncomingHttpMessage.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/IncomingHttpMessage.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/IncomingHttpMessage.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -2,13 +2,13 @@
import java.io.IOException;
import java.net.InetAddress;
-import org.jboss.cx.remoting.util.ByteInput;
+import org.jboss.cx.remoting.spi.ByteMessageInput;
/**
*
*/
public interface IncomingHttpMessage extends HttpMessage {
- ByteInput getMessageData() throws IOException;
+ ByteMessageInput getMessageData() throws IOException;
InetAddress getRemoteAddress();
Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/OutgoingHttpMessage.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/OutgoingHttpMessage.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/OutgoingHttpMessage.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,11 +1,11 @@
package org.jboss.cx.remoting.http.spi;
import java.io.IOException;
-import org.jboss.cx.remoting.util.ByteOutput;
+import org.jboss.cx.remoting.spi.ByteMessageOutput;
/**
*
*/
public interface OutgoingHttpMessage extends HttpMessage {
- void writeMessageData(ByteOutput byteOutput) throws IOException;
+ void writeMessageData(ByteMessageOutput byteOutput) throws IOException;
}
Modified: remoting3/trunk/http-se6/src/main/java/org/jboss/cx/remoting/http/se6/ServerInstance.java
===================================================================
--- remoting3/trunk/http-se6/src/main/java/org/jboss/cx/remoting/http/se6/ServerInstance.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/http-se6/src/main/java/org/jboss/cx/remoting/http/se6/ServerInstance.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -10,8 +10,8 @@
import org.jboss.cx.remoting.http.spi.RemotingHttpSessionContext;
import org.jboss.cx.remoting.http.spi.OutgoingHttpMessage;
import org.jboss.cx.remoting.http.spi.AbstractIncomingHttpMessage;
-import org.jboss.cx.remoting.util.ByteInput;
-import org.jboss.cx.remoting.util.ByteOutput;
+import org.jboss.cx.remoting.spi.ByteMessageInput;
+import org.jboss.cx.remoting.spi.ByteMessageOutput;
import com.sun.net.httpserver.BasicAuthenticator;
import com.sun.net.httpserver.Headers;
@@ -67,9 +67,9 @@
final int remotePort = inetSocketAddress.getPort();
RemotingHttpSessionContext httpSessionContext = null; // todo locate
httpSessionContext.queueMessage(new AbstractIncomingHttpMessage(localAddress, localPort, remoteAddress, remotePort) {
- public ByteInput getMessageData() {
+ public ByteMessageInput getMessageData() {
final InputStream inputStream = httpExchange.getRequestBody();
- return new ByteInput() {
+ return new ByteMessageInput() {
public int read() throws IOException {
return inputStream.read();
}
@@ -114,7 +114,7 @@
}
httpExchange.sendResponseHeaders(200, 0); // todo - preset response size?
final OutputStream outputStream = httpExchange.getResponseBody();
- httpReply.writeMessageData(new ByteOutput() {
+ httpReply.writeMessageData(new ByteMessageOutput() {
public void write(int b) throws IOException {
outputStream.write(b);
}
Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -22,16 +22,16 @@
import org.jboss.cx.remoting.RemoteExecutionException;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.cx.remoting.util.MessageOutput;
-import org.jboss.cx.remoting.util.MessageInput;
+import org.jboss.cx.remoting.spi.ObjectMessageOutput;
+import org.jboss.cx.remoting.spi.ObjectMessageInput;
import org.jboss.cx.remoting.util.CollectionUtil;
import org.jboss.cx.remoting.util.WeakHashSet;
import org.jboss.cx.remoting.jrpp.id.JrppContextIdentifier;
import org.jboss.cx.remoting.jrpp.id.JrppRequestIdentifier;
import org.jboss.cx.remoting.jrpp.id.JrppServiceIdentifier;
import org.jboss.cx.remoting.jrpp.id.JrppStreamIdentifier;
-import org.jboss.cx.remoting.jrpp.mina.IoBufferByteInput;
-import org.jboss.cx.remoting.jrpp.mina.IoBufferByteOutput;
+import org.jboss.cx.remoting.jrpp.mina.IoBufferByteMessageInput;
+import org.jboss.cx.remoting.jrpp.mina.IoBufferByteMessageOutput;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
import org.jboss.cx.remoting.spi.protocol.ProtocolContext;
@@ -148,7 +148,7 @@
try {
ioSession.setAttribute(JRPP_CONNECTION, this);
this.ioSession = ioSession;
- final ProtocolContext protocolContext = protocolServerContext.establishSession(protocolHandler);
+ final ProtocolContext protocolContext = protocolServerContext.establishSession(protocolHandler, null /* todo */);
this.protocolContext = protocolContext;
client = false;
} finally {
@@ -308,7 +308,7 @@
public void sendResponse(byte[] rawMsgData) throws IOException {
final IoBuffer buffer = newBuffer(rawMsgData.length + 100, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.SASL_RESPONSE);
output.write(rawMsgData);
output.commit();
@@ -316,7 +316,7 @@
public void sendChallenge(byte[] rawMsgData) throws IOException {
final IoBuffer buffer = newBuffer(rawMsgData.length + 100, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.SASL_CHALLENGE);
output.write(rawMsgData);
output.commit();
@@ -325,7 +325,7 @@
private void sendVersionMessage() throws IOException {
// send version info
final IoBuffer buffer = newBuffer(60, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.VERSION);
output.writeShort(PROTOCOL_VERSION);
output.writeUTF(protocolContext.getLocalEndpointName());
@@ -341,7 +341,7 @@
final SaslClientFilter saslClientFilter = getSaslClientFilter();
saslClientFilter.setSaslClient(ioSession, saslClient);
final IoBuffer buffer = newBuffer(600, true);
- MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.AUTH_REQUEST);
output.writeInt(clientMechs.length);
for (String mech : clientMechs) {
@@ -413,11 +413,14 @@
}
public final class RemotingProtocolHandler implements ProtocolHandler {
+ public void sendContextClosing(final ContextIdentifier remoteContextIdentifier, final boolean done) throws IOException {
+ // todo
+ }
public ContextIdentifier openContext(ServiceIdentifier serviceIdentifier) throws IOException {
final ContextIdentifier contextIdentifier = getNewContextIdentifier();
final IoBuffer buffer = newBuffer(60, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.OPEN_CONTEXT);
write(output, serviceIdentifier);
write(output, contextIdentifier);
@@ -448,23 +451,23 @@
return remoteName;
}
- public void closeService(ServiceIdentifier serviceIdentifier) throws IOException {
+ public void sendServiceClose(ServiceIdentifier serviceIdentifier) throws IOException {
if (! state.in(State.UP)) {
return;
}
final IoBuffer buffer = newBuffer(60, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.CLOSE_SERVICE);
write(output, serviceIdentifier);
output.commit();
}
- public void sendContextClose(ContextIdentifier contextIdentifier) throws IOException {
+ public void sendContextClose(ContextIdentifier contextIdentifier, final boolean immediate, final boolean cancel, final boolean interrupt) throws IOException {
if (! state.in(State.UP)) {
return;
}
final IoBuffer buffer = newBuffer(60, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.CLOSE_CONTEXT);
write(output, contextIdentifier);
output.commit();
@@ -477,7 +480,7 @@
if (true /* todo if close not already sent */) {
// todo mark as sent or remove from table
final IoBuffer buffer = newBuffer(60, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.CLOSE_STREAM);
write(output, streamIdentifier);
output.commit();
@@ -492,7 +495,7 @@
throw new NullPointerException("requestIdentifier is null");
}
final IoBuffer buffer = newBuffer(500, true);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.REPLY);
write(output, remoteContextIdentifier);
write(output, requestIdentifier);
@@ -511,7 +514,7 @@
throw new NullPointerException("exception is null");
}
final IoBuffer buffer = newBuffer(500, true);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.EXCEPTION);
write(output, remoteContextIdentifier);
write(output, requestIdentifier);
@@ -527,7 +530,7 @@
throw new NullPointerException("requestIdentifier is null");
}
final IoBuffer buffer = newBuffer(500, true);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession), streamExecutor);
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession), streamExecutor);
write(output, MessageType.REQUEST);
write(output, contextIdentifier);
write(output, requestIdentifier);
@@ -543,19 +546,19 @@
throw new NullPointerException("requestIdentifier is null");
}
final IoBuffer buffer = newBuffer(60, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.CANCEL_ACK);
write(output, remoteContextIdentifier);
write(output, requestIdentifier);
output.commit();
}
- public void sendServiceTerminate(ServiceIdentifier remoteServiceIdentifier) throws IOException {
+ public void sendServiceClosing(ServiceIdentifier remoteServiceIdentifier) throws IOException {
if (remoteServiceIdentifier == null) {
throw new NullPointerException("remoteServiceIdentifier is null");
}
final IoBuffer buffer = newBuffer(60, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.SERVICE_TERMINATE);
write(output, remoteServiceIdentifier);
output.commit();
@@ -577,7 +580,7 @@
throw new NullPointerException("requestIdentifier is null");
}
final IoBuffer buffer = newBuffer(60, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.CANCEL_REQ);
write(output, contextIdentifier);
write(output, requestIdentifier);
@@ -589,7 +592,7 @@
return null;
}
- public MessageOutput sendStreamData(StreamIdentifier streamIdentifier, Executor streamExecutor) throws IOException {
+ public ObjectMessageOutput sendStreamData(StreamIdentifier streamIdentifier, Executor streamExecutor) throws IOException {
if (streamIdentifier == null) {
throw new NullPointerException("streamIdentifier is null");
}
@@ -597,7 +600,7 @@
throw new NullPointerException("streamExeceutor is null");
}
final IoBuffer buffer = newBuffer(500, true);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession), streamExecutor);
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession), streamExecutor);
write(output, MessageType.STREAM_DATA);
write(output, streamIdentifier);
return output;
@@ -648,7 +651,7 @@
public void messageReceived(Object message) throws Exception {
final boolean trace = log.isTrace();
- final MessageInput input = protocolContext.getMessageInput(new IoBufferByteInput((IoBuffer) message));
+ final ObjectMessageInput input = protocolContext.getMessageInput(new IoBufferByteMessageInput((IoBuffer) message));
final MessageType type = MessageType.values()[input.readByte() & 0xff];
if (trace) {
log.trace("Received message of type %s in state %s", type, state.getState());
@@ -684,7 +687,7 @@
try {
if (saslServerFilter.handleSaslResponse(ioSession, bytes)) {
final IoBuffer buffer = newBuffer(60, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.AUTH_SUCCESS);
output.commit();
saslServerFilter.startEncryption(ioSession);
@@ -693,7 +696,7 @@
}
} catch (SaslException ex) {
final IoBuffer buffer = newBuffer(100, true);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.AUTH_FAILED);
output.writeUTF("Authentication failed: " + ex.getMessage());
output.commit();
@@ -732,7 +735,7 @@
if (saslServerFilter.sendInitialChallenge(ioSession)) {
// complete (that was quick!)
final IoBuffer buffer = newBuffer(60, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.AUTH_SUCCESS);
output.commit();
state.requireTransition(State.UP);
@@ -742,7 +745,7 @@
}
} catch (SaslException ex) {
final IoBuffer buffer = newBuffer(100, true);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
+ final ObjectMessageOutput output = protocolContext.getMessageOutput(new IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.AUTH_FAILED);
output.writeUTF("Unable to initiate SASL authentication: " + ex.getMessage());
output.commit();
@@ -841,12 +844,12 @@
}
case CLOSE_CONTEXT: {
final ContextIdentifier contextIdentifier = (ContextIdentifier) input.readObject();
- protocolContext.closeContext(contextIdentifier);
+ protocolContext.receiveContextClose(contextIdentifier, false, false, false);
return;
}
case CLOSE_SERVICE: {
final ServiceIdentifier serviceIdentifier = (ServiceIdentifier) input.readObject();
- protocolContext.closeService(serviceIdentifier);
+ protocolContext.receiveServiceClose(serviceIdentifier);
return;
}
case CLOSE_STREAM: {
@@ -880,7 +883,7 @@
}
case SERVICE_TERMINATE: {
final ServiceIdentifier serviceIdentifier = (ServiceIdentifier) input.readObject();
- protocolContext.receiveServiceTerminate(serviceIdentifier);
+ protocolContext.receiveServiceClosing(serviceIdentifier);
return;
}
case STREAM_DATA: {
Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppRequestIdentifier.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppRequestIdentifier.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppRequestIdentifier.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,7 +1,5 @@
package org.jboss.cx.remoting.jrpp.id;
-import java.io.IOException;
-import org.jboss.cx.remoting.util.MessageInput;
import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
/**
Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppServiceIdentifier.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppServiceIdentifier.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppServiceIdentifier.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,7 +1,5 @@
package org.jboss.cx.remoting.jrpp.id;
-import java.io.IOException;
-import org.jboss.cx.remoting.util.MessageInput;
import org.jboss.cx.remoting.spi.protocol.ServiceIdentifier;
/**
Deleted: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteInput.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteInput.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteInput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,40 +0,0 @@
-package org.jboss.cx.remoting.jrpp.mina;
-
-import java.io.IOException;
-import org.apache.mina.common.IoBuffer;
-import org.jboss.cx.remoting.util.ByteInput;
-
-/**
- *
- */
-public final class IoBufferByteInput implements ByteInput {
- private final IoBuffer ioBuffer;
-
- public IoBufferByteInput(final IoBuffer ioBuffer) {
- this.ioBuffer = ioBuffer;
- }
-
- public int read() throws IOException {
- return ioBuffer.hasRemaining() ? ioBuffer.get() : -1;
- }
-
- public int read(byte[] data) throws IOException {
- return read(data, 0, data.length);
- }
-
- public int read(byte[] data, int offs, int len) throws IOException {
- if (! ioBuffer.hasRemaining()) {
- return -1;
- }
- int c = Math.min(ioBuffer.remaining(), len);
- ioBuffer.get(data, offs, c);
- return c;
- }
-
- public int remaining() {
- return ioBuffer.remaining();
- }
-
- public void close() throws IOException {
- }
-}
Copied: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteMessageInput.java (from rev 3517, remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteInput.java)
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteMessageInput.java (rev 0)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteMessageInput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,40 @@
+package org.jboss.cx.remoting.jrpp.mina;
+
+import java.io.IOException;
+import org.apache.mina.common.IoBuffer;
+import org.jboss.cx.remoting.spi.ByteMessageInput;
+
+/**
+ *
+ */
+public final class IoBufferByteMessageInput implements ByteMessageInput {
+ private final IoBuffer ioBuffer;
+
+ public IoBufferByteMessageInput(final IoBuffer ioBuffer) {
+ this.ioBuffer = ioBuffer;
+ }
+
+ public int read() throws IOException {
+ return ioBuffer.hasRemaining() ? ioBuffer.get() : -1;
+ }
+
+ public int read(byte[] data) throws IOException {
+ return read(data, 0, data.length);
+ }
+
+ public int read(byte[] data, int offs, int len) throws IOException {
+ if (! ioBuffer.hasRemaining()) {
+ return -1;
+ }
+ int c = Math.min(ioBuffer.remaining(), len);
+ ioBuffer.get(data, offs, c);
+ return c;
+ }
+
+ public int remaining() {
+ return ioBuffer.remaining();
+ }
+
+ public void close() throws IOException {
+ }
+}
Copied: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteMessageOutput.java (from rev 3517, remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteOutput.java)
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteMessageOutput.java (rev 0)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteMessageOutput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -0,0 +1,46 @@
+package org.jboss.cx.remoting.jrpp.mina;
+
+import java.io.IOException;
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.cx.remoting.spi.ByteMessageOutput;
+
+/**
+ *
+ */
+public final class IoBufferByteMessageOutput implements ByteMessageOutput {
+ private final IoBuffer ioBuffer;
+ private final IoSession ioSession;
+
+ public IoBufferByteMessageOutput(final IoBuffer ioBuffer, final IoSession ioSession) {
+ this.ioBuffer = ioBuffer;
+ this.ioSession = ioSession;
+ }
+
+ public void write(int b) throws IOException {
+ ioBuffer.put((byte)b);
+ }
+
+ public void write(byte[] b) throws IOException {
+ ioBuffer.put(b);
+ }
+
+ public void write(byte[] b, int offs, int len) throws IOException {
+ ioBuffer.put(b, offs, len);
+ }
+
+ public void commit() throws IOException {
+ final IoBuffer buffer = ioBuffer.flip().skip(4);
+ ioSession.write(buffer);
+ }
+
+ public int getBytesWritten() {
+ return ioBuffer.position();
+ }
+
+ public void close() throws IOException {
+ }
+
+ public void flush() throws IOException {
+ }
+}
Deleted: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteOutput.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteOutput.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/mina/IoBufferByteOutput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,46 +0,0 @@
-package org.jboss.cx.remoting.jrpp.mina;
-
-import java.io.IOException;
-import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoSession;
-import org.jboss.cx.remoting.util.ByteOutput;
-
-/**
- *
- */
-public final class IoBufferByteOutput implements ByteOutput {
- private final IoBuffer ioBuffer;
- private final IoSession ioSession;
-
- public IoBufferByteOutput(final IoBuffer ioBuffer, final IoSession ioSession) {
- this.ioBuffer = ioBuffer;
- this.ioSession = ioSession;
- }
-
- public void write(int b) throws IOException {
- ioBuffer.put((byte)b);
- }
-
- public void write(byte[] b) throws IOException {
- ioBuffer.put(b);
- }
-
- public void write(byte[] b, int offs, int len) throws IOException {
- ioBuffer.put(b, offs, len);
- }
-
- public void commit() throws IOException {
- final IoBuffer buffer = ioBuffer.flip().skip(4);
- ioSession.write(buffer);
- }
-
- public int getBytesWritten() {
- return ioBuffer.position();
- }
-
- public void close() throws IOException {
- }
-
- public void flush() throws IOException {
- }
-}
Modified: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AbstractTypeMap.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AbstractTypeMap.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AbstractTypeMap.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -39,6 +39,7 @@
public boolean containsValue(final Object value) {
// since we key by type, we can do an O(1) search for value!
+ // todo - unless the given value is stored at a key of one of its super-types
final Class<? extends Object> claxx = value.getClass();
return map.containsKey(claxx) && isEqual(value, map.get(claxx));
}
Modified: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AtomicMap.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AtomicMap.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AtomicMap.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -6,9 +6,10 @@
import java.util.concurrent.ConcurrentMap;
/**
- *
+ * A delegating map implementation that accepts a standard {@code Map}, but conforms to the contract
+ * for {@code ConcurrentMap}. No synchronization is done on the delegate.
*/
-public final class AtomicMap<K, V> implements ConcurrentMap<K, V> {
+public class AtomicMap<K, V> implements ConcurrentMap<K, V> {
private final Map<K, V> delegate;
public AtomicMap(final Map<K, V> delegate) {
Deleted: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ByteInput.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ByteInput.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ByteInput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,44 +0,0 @@
-package org.jboss.cx.remoting.util;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * A readable source of byte data.
- */
-public interface ByteInput extends Closeable {
- /**
- * Read one byte.
- *
- * @return the byte, or -1 if the end of the stream has been reached.
- * @throws IOException if an I/O error occurs
- */
- int read() throws IOException;
-
- /**
- * Read a series of bytes into an array.
- *
- * @param data the array into which data is to be read
- * @return the total number of bytes read, or -1 if there are no bytes remaining to read
- * @throws IOException if an I/O error occurs
- */
- int read(byte[] data) throws IOException;
-
- /**
- * Read a series of bytes into an array.
- *
- * @param data the array into which data is to be read
- * @param offs the start offset in the {@code data} array at which the data is written
- * @param len the maximum number of bytes to read
- * @return the total number of bytes read, or -1 if there are no bytes remaining to read
- * @throws IOException if an I/O error occurs
- */
- int read(byte[] data, int offs, int len) throws IOException;
-
- /**
- * Return the number of bytes remaining.
- *
- * @return the number of bytes, or -1 if the byte count cannot be determined
- */
- int remaining();
-}
Deleted: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ByteOutput.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ByteOutput.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ByteOutput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,52 +0,0 @@
-package org.jboss.cx.remoting.util;
-
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-
-/**
- * A writable destination for byte data.
- */
-public interface ByteOutput extends Closeable, Flushable {
- /**
- * Write a single byte of data. The input argument is truncated to 8 bits.
- *
- * @param b the byte to write
- * @throws IOException if an I/O error occurs
- */
- void write(int b) throws IOException;
-
- /**
- * Write many bytes of data.
- *
- * @param b the bytes to write
- * @throws IOException if an I/O error occurs
- */
- void write(byte[] b) throws IOException;
-
- /**
- * Write many bytes of data.
- *
- * @param b the bytes to write
- * @param offs the offset in {@code b} to start reading bytes from
- * @param len the number of bytes to write
- * @throws IOException if an I/O error occurs
- */
- void write(byte[] b, int offs, int len) throws IOException;
-
- /**
- * Commit the written data. This causes the accumulated data to be sent as a message on the underlying
- * channel.
- *
- * @throws IOException if an I/O error occurs
- */
- void commit() throws IOException;
-
- /**
- * Get a count of the number of bytes written to this message.
- *
- * @return the count
- * @throws IOException if an I/O error occurs
- */
- int getBytesWritten() throws IOException;
-}
Modified: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -17,76 +17,196 @@
import java.util.concurrent.ConcurrentMap;
/**
- *
+ * A set of helpful utility functions for collections.
*/
public final class CollectionUtil {
private CollectionUtil() {
}
+ /**
+ * Create a concurrent map instance.
+ *
+ * @return a new concurrent map
+ */
public static <K, V> ConcurrentMap<K, V> concurrentMap() {
if (true) {
- return concurrentMap(new HashMap<K,V>());
+ return synchronizedHashMap();
} else {
return new ConcurrentHashMap<K, V>();
}
}
- public static <K, V> ConcurrentMap<K, V> concurrentMap(Map<K, V> original) {
+ /**
+ * Create a synchronized map that obeys the contract for {@code ConcurrentMap}.
+ *
+ * @param original the map to be wrapped
+ * @return a synchronized map
+ */
+ public static <K, V> ConcurrentMap<K, V> synchronizedMap(Map<K, V> original) {
return new SynchronizedMap<K, V>(original);
}
+ /**
+ * Create a synchronized hash map that obeys the contract for {@code ConcurrentMap}.
+ *
+ * @return a synchronized hash map
+ */
+ public static <K, V> ConcurrentMap<K, V> synchronizedHashMap() {
+ return synchronizedMap(CollectionUtil.<K, V>hashMap());
+ }
+
+ /**
+ * Create an array-backed list.
+ *
+ * @return an array-backed list
+ */
public static <T> List<T> arrayList() {
return new ArrayList<T>();
}
+ /**
+ * Create an array-backed list whose contents are a copy of the given list.
+ *
+ * @param orig the original list
+ * @return an array backed list
+ */
public static <T> List<T> arrayList(List<T> orig) {
return new ArrayList<T>(orig);
}
+ /**
+ * Create a synchronized wrapper for the given set.
+ *
+ * @param nested the nested set
+ * @return a synchronized version of the nested set
+ */
public static <T> Set<T> synchronizedSet(Set<T> nested) {
return new SynchronizedSet<T>(nested);
}
+ /**
+ * Create a synchronized hash set.
+ *
+ * @return a synchronized hash set
+ */
+ public static <T> Set<T> synchronizedHashSet() {
+ return synchronizedSet(CollectionUtil.<T>hashSet());
+ }
+
+ /**
+ * Create a synchronized weak hash set.
+ *
+ * @return a synchronized weak hash set
+ */
+ public static <T> Set<T> synchronizedWeakHashSet() {
+ return synchronizedSet(CollectionUtil.<T>weakHashSet());
+ }
+
+ /**
+ * Create a synchronized version of the nested queue that obeys the contract for {@code BlockingQueue}.
+ *
+ * @param nested the nested queue
+ * @return the blocking queue
+ */
public static <T> BlockingQueue<T> synchronizedQueue(Queue<T> nested) {
return new SynchronizedQueue<T>(nested);
}
+ /**
+ * Create a weak hash set.
+ *
+ * @return a weak hash set
+ */
public static <T> Set<T> weakHashSet() {
return new WeakHashSet<T>();
}
+ /**
+ * Create a fixed-capacity blocking queue.
+ *
+ * @param size the fixed size
+ * @return a fixed-capacity blocking queue
+ */
public static <T> BlockingQueue<T> blockingQueue(int size) {
return new ArrayBlockingQueue<T>(size);
}
+ /**
+ * Create a hash set.
+ *
+ * @return a hash set
+ */
+ public static <T> Set<T> hashSet() {
+ return new HashSet<T>();
+ }
+
+ /**
+ * Create a hash map with weak keys. See {@link java.util.WeakHashMap}.
+ *
+ * @return a hash map with weak keys
+ */
+ public static <K, V> Map<K, V> weakHashMap() {
+ return new WeakHashMap<K, V>();
+ }
+
+ /**
+ * Create a synchronized hash map with weak keys, which obeys the {@code ConcurrentMap} contract.
+ *
+ * @return a synchronized weak hash map
+ */
+ public static <K, V> ConcurrentMap<K, V> synchronizedWeakHashMap() {
+ return CollectionUtil.<K,V>synchronizedMap(CollectionUtil.<K,V>weakHashMap());
+ }
+
+ /**
+ * Create an unmodifiable list view of an array.
+ *
+ * @param entries the array
+ * @return an unmodifiable list
+ */
+ public static <T> List<T> unmodifiableList(final T[] entries) {
+ return new UnmodifiableArrayList<T>(entries);
+ }
+
+ /**
+ * Create a hash map.
+ *
+ * @return a hash map
+ */
+ public static <K, V> Map<K, V> hashMap() {
+ return new HashMap<K, V>();
+ }
+
+ /**
+ * Create an {@code Iterable} view of another {@code Iterable} that exposes no other methods.
+ *
+ * @param original the wrapped instance
+ * @return a new {@code Iterable}
+ */
public static <T> Iterable<T> protectedIterable(Iterable<T> original) {
return new DelegateIterable<T>(original);
}
- public static <T> Set<T> hashSet() {
- return new HashSet<T>();
- }
-
+ /**
+ * Create an {@code Iterable} view of an {@code Enumeration}. The view can be used only once.
+ *
+ * @param enumeration the enumeration
+ * @return the {@code Iterable} view
+ */
public static <T> Iterable<T> loop(final Enumeration<T> enumeration) {
return new Iterable<T>() {
public Iterator<T> iterator() {
- return new Iterator<T>() {
- public boolean hasNext() {
- return enumeration.hasMoreElements();
- }
-
- public T next() {
- return enumeration.nextElement();
- }
-
- public void remove() {
- throw new UnsupportedOperationException("remove() not supported");
- }
- };
+ return CollectionUtil.iterator(enumeration);
}
};
}
+ /**
+ * Create an {@code Iterable} view of an {@code Iterator}. The view can be used only once.
+ *
+ * @param iterator the iterator
+ * @return the {@code Iterable} view
+ */
public static <T> Iterable<T> loop(final Iterator<T> iterator) {
return new Iterable<T>() {
public Iterator<T> iterator() {
@@ -95,6 +215,92 @@
};
}
+ /**
+ * Create an {@code Iterator} view of an {@code Enumeration}.
+ *
+ * @param enumeration the enumeration
+ * @return the {@code Iterator} view
+ */
+ public static <T> Iterator<T> iterator(final Enumeration<T> enumeration) {
+ return new Iterator<T>() {
+ public boolean hasNext() {
+ return enumeration.hasMoreElements();
+ }
+
+ public T next() {
+ return enumeration.nextElement();
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException("remove()");
+ }
+ };
+ }
+
+ /**
+ * Run a translation function for each element of an {@code Enumeration}.
+ *
+ * @param input the input data
+ * @param translator the translator
+ * @return the translated data
+ */
+ public static <I,O> Enumeration<O> translate(final Enumeration<I> input, final Translator<I, O> translator) {
+ return new Enumeration<O>() {
+ public boolean hasMoreElements() {
+ return input.hasMoreElements();
+ }
+
+ public O nextElement() {
+ return translator.translate(input.nextElement());
+ }
+ };
+ }
+
+ /**
+ * Run a translation function for each element of an {@code Iterator}.
+ *
+ * @param input the input data
+ * @param translator the translator
+ * @return the translated data
+ */
+ public static <I,O> Iterator<O> translate(final Iterator<I> input, final Translator<I, O> translator) {
+ return new Iterator<O>() {
+ public boolean hasNext() {
+ return input.hasNext();
+ }
+
+ public O next() {
+ return translator.translate(input.next());
+ }
+
+ public void remove() {
+ input.remove();
+ }
+ };
+ }
+
+ /**
+ * Run a translation function for each element of an {@code Iterable}.
+ *
+ * @param input the input data
+ * @param translator the translator
+ * @return the translated data
+ */
+ public static <I,O> Iterable<O> translate(final Iterable<I> input, final Translator<I, O> translator) {
+ return new Iterable<O>() {
+ public Iterator<O> iterator() {
+ return translate(input.iterator(), translator);
+ }
+ };
+ }
+
+ /**
+ * Create an iterable view of a string split by a given delimiter.
+ *
+ * @param delimiter the delimiter
+ * @param subject the original string
+ * @return the iterable split view
+ */
public static Iterable<String> split(final String delimiter, final String subject) {
return new Iterable<String>() {
public Iterator<String> iterator() {
@@ -128,21 +334,4 @@
}
};
}
-
- public static <K, V> Map<K, V> weakHashMap() {
- return new WeakHashMap<K, V>();
- }
-
-
- public static <K, V> ConcurrentMap<K, V> concurrentWeakHashMap() {
- return CollectionUtil.<K,V>concurrentMap(CollectionUtil.<K,V>weakHashMap());
- }
-
- public static <T> List<T> unmodifiableList(final T[] entries) {
- return new UnmodifiableArrayList<T>(entries);
- }
-
- public static <K, V> Map<K, V> hashMap() {
- return new HashMap<K, V>();
- }
}
Deleted: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/MessageInput.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/MessageInput.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/MessageInput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,29 +0,0 @@
-package org.jboss.cx.remoting.util;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-
-/**
- * A readable message.
- */
-public interface MessageInput extends ByteInput, ObjectInput {
- /**
- * Read an object using the current context classloader, or, if there is no such classloader, the classloader
- * which loaded this interface.
- *
- * @return the object from the message
- * @throws ClassNotFoundException if the class of the object could not be resolved by the classloader
- * @throws IOException if an I/O error occurs
- */
- Object readObject() throws ClassNotFoundException, IOException;
-
- /**
- * Read an object using the given classloader.
- *
- * @param loader the classloader to use
- * @return the object from the message
- * @throws ClassNotFoundException if the class of the object could not be resolved by the classloader
- * @throws IOException if an I/O error occurs
- */
- Object readObject(ClassLoader loader) throws ClassNotFoundException, IOException;
-}
Deleted: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/MessageOutput.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/MessageOutput.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/MessageOutput.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -1,9 +0,0 @@
-package org.jboss.cx.remoting.util;
-
-import java.io.ObjectOutput;
-
-/**
- * A writable message.
- */
-public interface MessageOutput extends ByteOutput, ObjectOutput {
-}
Modified: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ServiceURI.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ServiceURI.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ServiceURI.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -6,7 +6,7 @@
import java.util.regex.Pattern;
/**
- *
+ * A parser for JBoss Remoting URI types.
*/
public final class ServiceURI {
public static final String SCHEME = "jrs";
Modified: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/TypeMap.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/TypeMap.java 2008-03-09 03:56:14 UTC (rev 3598)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/TypeMap.java 2008-03-11 23:46:24 UTC (rev 3599)
@@ -5,7 +5,7 @@
import java.util.Set;
/**
- *
+ * A map with classes for keys.
*/
public interface TypeMap<B> {
void clear();
16 years, 9 months
JBoss Remoting SVN: r3598 - remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/http/keep_alive.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-03-08 22:56:14 -0500 (Sat, 08 Mar 2008)
New Revision: 3598
Modified:
remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/http/keep_alive/StressHTTPInvokerTestClient.java
Log:
JBREM-844: (1) Added Thread.sleep() at end for oneway invocations; (2) fixed loop counter.
Modified: remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/http/keep_alive/StressHTTPInvokerTestClient.java
===================================================================
--- remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/http/keep_alive/StressHTTPInvokerTestClient.java 2008-03-08 15:46:42 UTC (rev 3597)
+++ remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/http/keep_alive/StressHTTPInvokerTestClient.java 2008-03-09 03:56:14 UTC (rev 3598)
@@ -50,8 +50,10 @@
for(int x = 0; x < 1000; x++)
{
+ if ((x + 1) % 100 == 0) log.info("loop: " + (x + 1));
super.testPostInvocation();
- if (x + 1 % 100 == 0) log.info("loop: " + (x + 1));
}
+
+ Thread.sleep(5000);
}
}
\ No newline at end of file
16 years, 9 months
JBoss Remoting SVN: r3597 - remoting2/branches/2.x.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-03-08 10:46:42 -0500 (Sat, 08 Mar 2008)
New Revision: 3597
Modified:
remoting2/branches/2.x/test.policy
Log:
More fixes for JBREM-920
Modified: remoting2/branches/2.x/test.policy
===================================================================
--- remoting2/branches/2.x/test.policy 2008-03-08 15:46:29 UTC (rev 3596)
+++ remoting2/branches/2.x/test.policy 2008-03-08 15:46:42 UTC (rev 3597)
@@ -6,12 +6,20 @@
// Can't create sockets without it
permission java.net.SocketPermission "*:*", "accept,connect,listen,resolve";
+
+ // HTTP client invokers use Class.getMethod()
+ permission java.lang.RuntimePermission "accessClassInPackage.sun.net.www.protocol.https";
+ permission java.lang.RuntimePermission "accessClassInPackage.sun.net.www.protocol.http";
+
+ // Permission to create an MBean server
+ permission javax.management.MBeanServerPermission "createMBeanServer, releaseMBeanServer";
};
// Permissions for JBoss Serialization
grant {
permission java.lang.RuntimePermission "accessDeclaredMembers";
permission java.io.SerializablePermission "enableSubstitution";
+ permission java.io.SerializablePermission "enableSubclassImplementation";
permission java.lang.RuntimePermission "accessClassInPackage.sun.reflect";
permission java.lang.RuntimePermission "reflectionFactoryAccess";
};
@@ -27,6 +35,7 @@
grant {
// Used by at least one test case
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
+ permission java.lang.RuntimePermission "setContextClassLoader";
};
// Permissions for JUnit itself to function
16 years, 9 months
JBoss Remoting SVN: r3596 - remoting2/branches/2.x.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-03-08 10:46:29 -0500 (Sat, 08 Mar 2008)
New Revision: 3596
Modified:
remoting2/branches/2.x/build.xml
Log:
Back memory back down again - this did not help anyway
Modified: remoting2/branches/2.x/build.xml
===================================================================
--- remoting2/branches/2.x/build.xml 2008-03-08 07:35:46 UTC (rev 3595)
+++ remoting2/branches/2.x/build.xml 2008-03-08 15:46:29 UTC (rev 3596)
@@ -927,7 +927,7 @@
<mkdir dir="${output.tests.tmp}"/>
<junit
printsummary="true" fork="yes" includeantruntime="true"
- tempdir="${output.tests.tmp}" maxmemory="1280m">
+ tempdir="${output.tests.tmp}" maxmemory="1024m">
<jvmarg value="-Dloader.path=${output.lib.dir}/jboss-remoting-loading-tests.jar"/>
<jvmarg value="-D${remoting.metadata.key}=${metadata}"/>
<classpath>
16 years, 9 months