Author: david.lloyd(a)jboss.com
Date: 2008-10-22 18:01:55 -0400 (Wed, 22 Oct 2008)
New Revision: 4607
Added:
remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteReplyException.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestListenerExecutor.java
Removed:
remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingExceptionCarrier.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/util/TaggingExecutor.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/Client.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingException.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContext.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
Log:
Update semantics of exception handling for client/handler/listener chains, as per 10/21
discussion
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/Client.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/Client.java 2008-10-21 05:35:10
UTC (rev 4606)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/Client.java 2008-10-22 22:01:55
UTC (rev 4607)
@@ -1,7 +1,9 @@
package org.jboss.remoting;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CancellationException;
import java.io.IOException;
+import java.io.ObjectStreamException;
import org.jboss.xnio.IoFuture;
/**
@@ -12,29 +14,67 @@
*/
public interface Client<I, O> extends HandleableCloseable<Client<I, O>>
{
/**
- * Send a request and block until a reply is received.
+ * Send a request and block until a reply is received. If the remote side
manipulates a stream, the
+ * current thread may be used to handle it.
* <p/>
- * Uses the default invocation policy for handling remote invocations. If the remote
side manipulates a stream, the
- * current thread will be used to handle it by default.
+ * If the operation is cancelled asynchronously, a {@code CancellationException} is
thrown. This exception indicates
+ * that the request was received and was executed, but a cancellation request was
received and handled before the
+ * reply was able to be sent. The remote service will have cleanly cancelled the
operation. This exception type
+ * is a {@code RuntimeException}; thus direct handling of this exception is optional
(depending on your use case).
* <p/>
- * If the remote session cannot handle the request, a {@code RemotingException} will
be thrown.
+ * If the request is sent but the remote side sends an exception back, a {@code
RemoteExecutionException} is thrown
+ * with the cause and message initialized by the remote service. This exception
indicates an error in the execution
+ * of the service's {@code RequestListener}. The service will have cleanly
recovered from such an exception.
+ * <p/>
+ * If the request is sent and the remote side tries to reply, but sending the reply
fails, a
+ * {@code RemoteReplyException} is thrown, possibly with the cause initialized to the
reason of the failure. Typically
+ * this exception is thrown when serialization of the reply failed for some reason.
This exception type extends
+ * {@code RemoteExecutionException} and can be treated similarly in most cases.
+ * <p/>
+ * If the request is sent and the remote side sends the reply successfully but there
is an error reading the reply
+ * locally, a {@code ReplyException} is thrown. In this case the operation is known
to have completed without error
+ * but the actual detailed reply cannot be known. In cases where the reply would be
ignored anyway, this exception
+ * type may be safely ignored (possibly logging it for informational purposes). This
exception is typically caused
+ * by an {@code ObjectStreamException} thrown while marshalling the reply, though
other causes are also possible.
+ * <p/>
+ * If the result of the operation is known to be impossible to ascertain, then an
{@code IndeterminateOutcomeException}
+ * is thrown. Possible causes of this condition include (but are not limited to) the
connection to the remote side
+ * being unexpectedly broken, or the current thread being interrupted before the
reply can be read. In the latter
+ * case, a best effort is automatically made to attempt to cancel the outstanding
operation, though there is no
+ * guarantee.
+ * <p/>
+ * If the request cannot be sent, some other {@code IOException} will be thrown with
the reason, including (but not limited to)
+ * attempting to call this method on a closed client, or {@code
ObjectStreamException}s related to marshalling the
+ * request locally or unmarshalling it remotely. Such an exception indicates that
the remote side did not receive
+ * the request.
+ * <p/>
+ * All these exceptions (apart from {@code CancellationException}) extend {@code
IOException} which makes it easier
+ * to selectively catch only those exceptions that you need to implement special
policy for, while relegating the
+ * rest to common handlers.
*
* @param request the request to send
*
* @return the result of the request
*
- * @throws RemotingException if the request could not be sent
+ * @throws CancellationException if the operation was cancelled asynchronously
* @throws RemoteExecutionException if the remote handler threw an exception
+ * @throws RemoteReplyException if the remote side was unable to send the response
+ * @throws ReplyException if the operation succeeded but the reply cannot be read for
some reason
+ * @throws IndeterminateOutcomeException if the result of the operation cannot be
ascertained
+ * @throws ObjectStreamException if marshalling or unmarshalling some part of the
request failed
+ * @throws IOException if some I/O error occurred while sending the request
*/
- O invoke(I request) throws IOException;
+ O invoke(I request) throws IOException, CancellationException;
/**
- * Send a request asynchronously.
+ * Send a request asynchronously. If the remote side manipulates a stream, it
+ * may use a local policy to assign one or more thread(s) to handle the local end of
that stream, or it may
+ * fail with an exception (e.g. if this method is called on a client with no threads
to handle streaming).
* <p/>
- * Uses the default invocation policy for handling remote invocations. If the remote
side manipulates a stream, it
- * MAY fail with an exception (e.g. if this method is called on a client with no
threads to handle streaming).
- * <p/>
- * Returns immediately.
+ * Returns immediately. The returned {@code IoFuture} object can be queried at a
later time to determine the result
+ * of the operation. If the operation fails, one of the conditions described on the
{@link #invoke(Object) invoke(I)}
+ * method will result. This condition can be determined by reading the status of the
{@code IoFuture} object or
+ * by attempting to read the result.
*
* @param request the request to send
*
Added: remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteReplyException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteReplyException.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteReplyException.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting;
+
+/**
+ * An exception indicating that a the remote side tried and failed to send a reply
message; the remote side would be
+ * aware of this type of failure, so the outcome is determinate; thus it extends {@code
RemoteExecutionException}.
+ */
+public class RemoteReplyException extends RemoteExecutionException {
+
+ private static final long serialVersionUID = -8572480018652753441L;
+
+ /**
+ * Constructs a <tt>RemoteReplyException</tt> with no detail message. The
cause is not initialized, and may subsequently
+ * be initialized by a call to {@link #initCause(Throwable) initCause}.
+ */
+ public RemoteReplyException() {
+ }
+
+ /**
+ * Constructs a <tt>RemoteReplyException</tt> with the specified detail
message. The cause is not initialized, and may
+ * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+ *
+ * @param msg the detail message
+ */
+ public RemoteReplyException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Constructs a <tt>RemoteReplyException</tt> with the specified cause.
The detail message is set to:
+ * <pre>
+ * (cause == null ? null : cause.toString())</pre>
+ * (which typically contains the class and detail message of
<tt>cause</tt>).
+ *
+ * @param cause the cause (which is saved for later retrieval by the {@link
#getCause()} method)
+ */
+ public RemoteReplyException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a <tt>RemoteReplyException</tt> with the specified detail
message and cause.
+ *
+ * @param msg the detail message
+ * @param cause the cause (which is saved for later retrieval by the {@link
#getCause()} method)
+ */
+ public RemoteReplyException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingException.java 2008-10-21
05:35:10 UTC (rev 4606)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingException.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -3,9 +3,10 @@
import java.io.IOException;
/**
- * A general Remoting exception.
+ * A general Remoting exception. Used as a base class in order to provide constructors
which accept any combination
+ * of {@code cause} and {@code message}.
*/
-public class RemotingException extends IOException {
+public abstract class RemotingException extends IOException {
private static final long serialVersionUID = 1540716301579397423L;
@@ -13,7 +14,7 @@
* Constructs a <tt>RemotingException</tt> with no detail message. The
cause is not initialized, and may subsequently be
* initialized by a call to {@link #initCause(Throwable) initCause}.
*/
- public RemotingException() {
+ protected RemotingException() {
}
/**
@@ -22,7 +23,7 @@
*
* @param msg the detail message
*/
- public RemotingException(String msg) {
+ protected RemotingException(String msg) {
super(msg);
}
@@ -34,7 +35,7 @@
*
* @param cause the cause (which is saved for later retrieval by the {@link
#getCause()} method)
*/
- public RemotingException(Throwable cause) {
+ protected RemotingException(Throwable cause) {
initCause(cause);
}
@@ -44,7 +45,7 @@
* @param msg the detail message
* @param cause the cause (which is saved for later retrieval by the {@link
#getCause()} method)
*/
- public RemotingException(String msg, Throwable cause) {
+ protected RemotingException(String msg, Throwable cause) {
super(msg);
initCause(cause);
}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingExceptionCarrier.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingExceptionCarrier.java 2008-10-21
05:35:10 UTC (rev 4606)
+++
remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingExceptionCarrier.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -1,27 +0,0 @@
-package org.jboss.remoting;
-
-/**
- * A runtime exception that carries a {@link org.jboss.remoting.RemotingException} as a
cause.
- */
-public class RemotingExceptionCarrier extends IOExceptionCarrier {
-
- private static final long serialVersionUID = -1326735788761007331L;
-
- /**
- * Construct a new carrier.
- *
- * @param cause the nested cause
- */
- public RemotingExceptionCarrier(RemotingException cause) {
- super(cause);
- }
-
- /**
- * Get the cause.
- *
- * @return the cause
- */
- public RemotingException getCause() {
- return (RemotingException) super.getCause();
- }
-}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContext.java 2008-10-21
05:35:10 UTC (rev 4606)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContext.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -4,7 +4,8 @@
import java.io.IOException;
/**
- * The context of a single request.
+ * The context of a single request. A request listener is obligated to call exactly one
of the three {@code send} methods
+ * specified in this interface.
*
* @param <O> the reply type
*/
@@ -24,30 +25,38 @@
boolean isCancelled();
/**
- * Send a reply back to the caller.
+ * Send a reply back to the caller. If transmission fails, an {@code IOException} is
thrown from this method
+ * and a reply is sent back to the client which will trigger a {@link
RemoteReplyException} to be thrown. If the
+ * client connection is interrupted in such a way that the reply cannot reach the
client, the client will (eventually)
+ * receive an {@link IndeterminateOutcomeException}.
*
* @param reply the reply to send
* @throws IOException if the transmission failed
- * @throws IllegalStateException if a reply was already sent
+ * @throws IllegalStateException if this or another of the {@code sendXXX()} methods
was already invoked for this request
*/
void sendReply(O reply) throws IOException, IllegalStateException;
/**
- * Send a failure message back to the caller.
+ * Send a failure message back to the caller. If the transmission succeeds, the
client will receive a
+ * {@link RemoteExecutionException} with the message initialized to {@code msg} and
the cause initialized to
+ * {@code cause}. If the transmission fails, an {@code IOException} is thrown from
this
+ * method and the client will (eventually) receive an {@link
IndeterminateOutcomeException}.
*
* @param msg a message describing the failure, if any (can be {@code null})
* @param cause the failure cause, if any (can be {@code null})
*
* @throws IOException if the transmission failed
- * @throws IllegalStateException if a reply was already sent
+ * @throws IllegalStateException if this or another of the {@code sendXXX()} methods
was already invoked for this request
*/
void sendFailure(String msg, Throwable cause) throws IOException,
IllegalStateException;
/**
- * Send a cancellation message back to the client.
+ * Send a cancellation message back to the client. If the transmission succeeds, the
client result will be an
+ * acknowledgement of cancellation. If the transmission fails, an {@code
IOException} is thrown from this
+ * method and the client will (eventually) receive an {@link
IndeterminateOutcomeException}.
*
* @throws IOException if the message could not be sent (the client could not be
notified about the cancellation)
- * @throws IllegalStateException if a reply was already sent
+ * @throws IllegalStateException if this or another of the {@code sendXXX()} methods
was already invoked for this request
*/
void sendCancelled() throws IOException, IllegalStateException;
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java 2008-10-21
05:35:10 UTC (rev 4606)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -48,7 +48,7 @@
try {
replyHandler.handleException(exception);
} catch (Throwable t) {
- log.error(t, "Failed to properly handle exception");
+ log.debug(t, "Failed to properly handle exception");
}
}
@@ -63,7 +63,7 @@
try {
replyHandler.handleReply(reply);
} catch (Throwable t) {
- log.error(t, "Failed to properly handle reply");
+ log.debug(t, "Failed to properly handle reply");
}
}
@@ -76,7 +76,7 @@
try {
replyHandler.handleCancellation();
} catch (Throwable t) {
- log.error(t, "Failed to properly handle cancellation");
+ log.debug(t, "Failed to properly handle cancellation");
}
}
@@ -90,7 +90,7 @@
try {
handler.notifyCancel(requestContext);
} catch (Throwable t) {
- log.error(t, "Request cancel handler threw an exception when calling
notifyCancel()");
+ log.error(t, "Request cancel handler threw an exception");
}
}
@@ -105,7 +105,7 @@
try {
handler.handleClose(closed);
} catch (Throwable t) {
- log.error(t, "Close handler failed unexpectedly");
+ log.error(t, "Close handler threw an exception");
}
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java 2008-10-21
05:35:10 UTC (rev 4606)
+++
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -31,21 +31,22 @@
public interface ReplyHandler {
/**
- * Handle a successful reply.
+ * Handle a successful reply. If the reply could not be forwarded, an exception is
thrown.
*
* @param reply the reply
*/
- void handleReply(Object reply);
+ void handleReply(Object reply) throws IOException;
/**
- * Handle an exception.
+ * Handle an exception. If the exception could not be forwarded, a (different)
{@code IOException} is thrown.
*
* @param exception an exception which describes the problem
*/
- void handleException(IOException exception);
+ void handleException(IOException exception) throws IOException;
/**
- * Handle a cancellation request.
+ * Handle a cancellation acknowledgement. If the cancellation acknowledgement could
not be forwarded, an
+ * exception is thrown.
*/
- void handleCancellation();
+ void handleCancellation() throws IOException;
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java 2008-10-21
05:35:10 UTC (rev 4606)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -42,10 +42,12 @@
this.endpoint = endpoint;
}
+ @SuppressWarnings({ "unchecked" })
public void writeExternal(final Object o, final ObjectOutput output) throws
IOException {
output.writeObject(((ClientImpl)o).getRequestHandlerHandle().getResource());
}
+ @SuppressWarnings({ "unchecked" })
public Object createExternal(final Class<?> aClass, final ObjectInput input,
final Creator creator) throws IOException, ClassNotFoundException {
final RequestHandler handler = (RequestHandler) input.readObject();
return new ClientImpl(handler.getHandle(), endpoint.getExecutor());
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java 2008-10-21
05:35:10 UTC (rev 4606)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -23,7 +23,7 @@
package org.jboss.remoting.core;
import org.jboss.remoting.Client;
-import org.jboss.remoting.RemotingException;
+import org.jboss.remoting.IndeterminateOutcomeException;
import org.jboss.remoting.core.util.QueueExecutor;
import org.jboss.remoting.spi.remote.RequestHandler;
import org.jboss.remoting.spi.remote.ReplyHandler;
@@ -51,7 +51,7 @@
public O invoke(final I request) throws IOException {
if (! isOpen()) {
- throw new RemotingException("Client is not open");
+ throw new IOException("Client is not open");
}
final QueueExecutor executor = new QueueExecutor();
final FutureReplyImpl<O> futureReply = new
FutureReplyImpl<O>(executor);
@@ -64,12 +64,21 @@
}
});
executor.runQueue();
- return futureReply.get();
+ try {
+ return futureReply.getInterruptibly();
+ } catch (InterruptedException e) {
+ try {
+ futureReply.cancel();
+ throw new IndeterminateOutcomeException("The current thread was
interrupted before the result could be read");
+ } finally {
+ Thread.currentThread().interrupt();
+ }
+ }
}
public IoFuture<O> send(final I request) throws IOException {
if (! isOpen()) {
- throw new RemotingException("Client is not open");
+ throw new IOException("Client is not open");
}
final FutureReplyImpl<O> futureReply = new
FutureReplyImpl<O>(executor);
final ReplyHandler replyHandler = futureReply.getReplyHandler();
@@ -78,9 +87,9 @@
return futureReply;
}
- public void sendOneWay(final I request) throws RemotingException {
+ public void sendOneWay(final I request) throws IOException {
if (! isOpen()) {
- throw new RemotingException("Client is not open");
+ throw new IOException("Client is not open");
}
handle.getResource().receiveRequest(request);
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java 2008-10-21
05:35:10 UTC (rev 4606)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -42,10 +42,12 @@
this.endpoint = endpoint;
}
+ @SuppressWarnings({ "unchecked" })
public void writeExternal(final Object o, final ObjectOutput output) throws
IOException {
output.writeObject(((ClientSourceImpl)
o).getRequestHandlerSourceHandle().getResource());
}
+ @SuppressWarnings({ "unchecked" })
public Object createExternal(final Class<?> aClass, final ObjectInput input,
final Creator creator) throws IOException, ClassNotFoundException {
final RequestHandlerSource handler = (RequestHandlerSource) input.readObject();
return new ClientSourceImpl(handler.getHandle(), endpoint);
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java 2008-10-21
05:35:10 UTC (rev 4606)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -24,7 +24,6 @@
import org.jboss.remoting.ClientSource;
import org.jboss.remoting.Client;
-import org.jboss.remoting.RemotingException;
import org.jboss.remoting.Endpoint;
import org.jboss.remoting.spi.remote.RequestHandler;
import org.jboss.remoting.spi.remote.RequestHandlerSource;
@@ -53,7 +52,7 @@
public Client<I, O> createClient() throws IOException {
if (! isOpen()) {
- throw new RemotingException("Client source is not open");
+ throw new IOException("Client source is not open");
}
final Handle<RequestHandler> clientHandle =
handle.getResource().createRequestHandler();
try {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java 2008-10-21
05:35:10 UTC (rev 4606)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -27,12 +27,12 @@
import org.jboss.remoting.spi.remote.ReplyHandler;
import org.jboss.remoting.spi.SpiUtils;
import org.jboss.remoting.spi.AbstractAutoCloseable;
-import org.jboss.remoting.RemotingException;
import org.jboss.remoting.RequestListener;
import org.jboss.remoting.RemoteExecutionException;
import org.jboss.remoting.CloseHandler;
import org.jboss.xnio.log.Logger;
import java.util.concurrent.Executor;
+import java.io.IOException;
/**
*
@@ -40,14 +40,12 @@
public final class LocalRequestHandler<I, O> extends
AbstractAutoCloseable<RequestHandler> implements RequestHandler {
private final RequestListener<I, O> requestListener;
- private final Executor executor;
private final ClientContextImpl clientContext;
private static final Logger log = Logger.getLogger(LocalRequestHandler.class);
private LocalRequestHandler(final Executor executor, final RequestListener<I,
O> requestListener, final ClientContextImpl clientContext) {
super(executor);
- this.executor = executor;
this.requestListener = requestListener;
this.clientContext = clientContext;
}
@@ -62,7 +60,7 @@
public void receiveRequest(final Object request) {
final RequestContextImpl<O> context = new
RequestContextImpl<O>(clientContext);
- executor.execute(new Runnable() {
+ context.execute(new Runnable() {
@SuppressWarnings({ "unchecked" })
public void run() {
try {
@@ -76,7 +74,7 @@
public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler
replyHandler) {
final RequestContextImpl<O> context = new
RequestContextImpl<O>(replyHandler, clientContext);
- executor.execute(new Runnable() {
+ context.execute(new Runnable() {
@SuppressWarnings({ "unchecked" })
public void run() {
try {
@@ -95,7 +93,7 @@
};
}
- void open() throws RemotingException {
+ void open() throws IOException {
try {
requestListener.handleClientOpen(clientContext);
addCloseHandler(new CloseHandler<RequestHandler>() {
@@ -108,7 +106,9 @@
}
});
} catch (Throwable t) {
- throw new RemotingException("Failed to open client context", t);
+ final IOException ioe = new IOException("Failed to open client
context");
+ ioe.initCause(t);
+ throw ioe;
}
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java 2008-10-21
05:35:10 UTC (rev 4606)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -27,7 +27,6 @@
import org.jboss.remoting.spi.remote.Handle;
import org.jboss.remoting.spi.AbstractAutoCloseable;
import org.jboss.remoting.RequestListener;
-import org.jboss.remoting.RemotingException;
import org.jboss.remoting.CloseHandler;
import org.jboss.xnio.log.Logger;
import java.util.concurrent.Executor;
@@ -57,11 +56,11 @@
localRequestHandler.open();
return localRequestHandler.getHandle();
} else {
- throw new RemotingException("LocalRequestHandlerSource is
closed");
+ throw new IOException("LocalRequestHandlerSource is closed");
}
}
- void open() throws RemotingException {
+ void open() throws IOException {
try {
requestListener.handleServiceOpen(serviceContext);
addCloseHandler(new CloseHandler<RequestHandlerSource>() {
@@ -74,7 +73,9 @@
}
});
} catch (Throwable t) {
- throw new RemotingException("Failed to open client context", t);
+ final IOException ioe = new IOException("Failed to open client
context");
+ ioe.initCause(t);
+ throw ioe;
}
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java 2008-10-21
05:35:10 UTC (rev 4606)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -24,15 +24,17 @@
import org.jboss.remoting.RequestContext;
import org.jboss.remoting.ClientContext;
-import org.jboss.remoting.RemotingException;
import org.jboss.remoting.RequestCancelHandler;
import org.jboss.remoting.RemoteExecutionException;
-import org.jboss.remoting.core.util.TaggingExecutor;
+import org.jboss.remoting.RemoteReplyException;
+import org.jboss.remoting.IndeterminateOutcomeException;
import org.jboss.remoting.spi.remote.ReplyHandler;
import org.jboss.remoting.spi.SpiUtils;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.Set;
import java.util.HashSet;
+import java.io.IOException;
/**
*
@@ -43,21 +45,26 @@
private final Object cancelLock = new Object();
private final ReplyHandler replyHandler;
private final ClientContextImpl clientContext;
+ private final AtomicInteger taskCount = new AtomicInteger();
- private final AtomicBoolean cancelled = new AtomicBoolean();
// @protectedby cancelLock
+ private boolean cancelled;
+ // @protectedby cancelLock
private Set<RequestCancelHandler<O>> cancelHandlers;
- private final TaggingExecutor executor;
+ private final RequestListenerExecutor executor;
RequestContextImpl(final ReplyHandler replyHandler, final ClientContextImpl
clientContext) {
this.replyHandler = replyHandler;
this.clientContext = clientContext;
- executor = new TaggingExecutor(clientContext.getExecutor());
+ //noinspection ThisEscapedInObjectConstruction
+ executor = new RequestListenerExecutor(clientContext.getExecutor(), this);
}
+ // todo - used by one-way requests... :|
RequestContextImpl(final ClientContextImpl clientContext) {
this.clientContext = clientContext;
- executor = new TaggingExecutor(clientContext.getExecutor());
+ //noinspection ThisEscapedInObjectConstruction
+ executor = new RequestListenerExecutor(clientContext.getExecutor(), this);
replyHandler = null;
}
@@ -66,34 +73,42 @@
}
public boolean isCancelled() {
- return cancelled.get();
+ synchronized (cancelLock) {
+ return cancelled;
+ }
}
- public void sendReply(final O reply) throws RemotingException, IllegalStateException
{
+ public void sendReply(final O reply) throws IOException, IllegalStateException {
if (! closed.getAndSet(true)) {
- if (replyHandler != null) {
+ if (replyHandler != null) try {
replyHandler.handleReply(reply);
- }
+ } catch (IOException e) {
+ SpiUtils.safeHandleException(replyHandler, new
RemoteReplyException("Remote reply failed", e));
+ throw e;
+ } else throw new IllegalStateException("Cannot send a reply to a one-way
invocation");
} else {
throw new IllegalStateException("Reply already sent");
}
}
- public void sendFailure(final String msg, final Throwable cause) throws
RemotingException, IllegalStateException {
+ public void sendFailure(final String msg, final Throwable cause) throws IOException,
IllegalStateException {
if (! closed.getAndSet(true)) {
if (replyHandler != null) {
replyHandler.handleException(new RemoteExecutionException(msg, cause));
- }
+ } else throw new IllegalStateException("Cannot send a reply to a one-way
invocation");
} else {
throw new IllegalStateException("Reply already sent");
}
}
- public void sendCancelled() throws RemotingException, IllegalStateException {
+ public void sendCancelled() throws IOException, IllegalStateException {
if (! closed.getAndSet(true)) {
- if (replyHandler != null) {
+ if (replyHandler != null) try {
replyHandler.handleCancellation();
- }
+ } catch (IOException e) {
+ // this is highly unlikely to succeed
+ SpiUtils.safeHandleException(replyHandler, new
RemoteReplyException("Remote cancellation acknowledgement failed", e));
+ } else throw new IllegalStateException("Cannot send a reply to a one-way
invocation");
} else {
throw new IllegalStateException("Reply already sent");
}
@@ -101,7 +116,7 @@
public void addCancelHandler(final RequestCancelHandler<O> handler) {
synchronized (cancelLock) {
- if (cancelled.get()) {
+ if (cancelled) {
SpiUtils.safeNotifyCancellation(handler, this);
} else {
if (cancelHandlers == null) {
@@ -117,8 +132,9 @@
}
protected void cancel() {
- if (! cancelled.getAndSet(true)) {
- synchronized (cancelLock) {
+ synchronized (cancelLock) {
+ if (! cancelled) {
+ cancelled = true;
if (cancelHandlers != null) {
for (final RequestCancelHandler<O> handler : cancelHandlers) {
executor.execute(new Runnable() {
@@ -129,8 +145,19 @@
}
cancelHandlers = null;
}
+ executor.interruptAll();
}
- executor.interruptAll();
}
}
+
+ void startTask() {
+ taskCount.incrementAndGet();
+ }
+
+ void finishTask() {
+ if (taskCount.decrementAndGet() == 0 && closed.getAndSet(true)) {
+ // no response sent! send back IndeterminateOutcomeException
+ SpiUtils.safeHandleException(replyHandler, new
IndeterminateOutcomeException("No reply was sent by the request listener"));
+ }
+ }
}
Copied:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestListenerExecutor.java
(from rev 4601,
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/util/TaggingExecutor.java)
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestListenerExecutor.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestListenerExecutor.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.core;
+
+import java.util.concurrent.Executor;
+import java.util.Set;
+import org.jboss.remoting.util.CollectionUtil;
+
+/**
+ *
+ */
+public final class RequestListenerExecutor implements Executor {
+
+ private final Set<Task> tasks = CollectionUtil.synchronizedHashSet();
+ private final Executor executor;
+ private final RequestContextImpl requestContext;
+
+ public RequestListenerExecutor(final Executor executor, final RequestContextImpl
context) {
+ this.executor = executor;
+ requestContext = context;
+ }
+
+ private final class Task implements Runnable {
+ private volatile Thread thread;
+ private final Runnable runnable;
+
+ private Task(final Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ public void run() {
+ requestContext.startTask();
+ thread = Thread.currentThread();
+ tasks.add(this);
+ try {
+ runnable.run();
+ } finally {
+ requestContext.finishTask();
+ tasks.remove(this);
+ thread = null;
+ }
+ }
+ }
+
+ public void execute(final Runnable command) {
+ executor.execute(new Task(command));
+ }
+
+ public void interruptAll() {
+ synchronized (tasks) {
+ final Thread currentThread = Thread.currentThread();
+ for (Task task : tasks) {
+ final Thread thread = task.thread;
+ if (thread != null && thread != currentThread) {
+ thread.interrupt();
+ }
+ }
+ }
+ }
+}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/util/TaggingExecutor.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/util/TaggingExecutor.java 2008-10-21
05:35:10 UTC (rev 4606)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/util/TaggingExecutor.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -1,75 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.core.util;
-
-import java.util.concurrent.Executor;
-import java.util.Set;
-import org.jboss.remoting.util.CollectionUtil;
-
-/**
- *
- */
-public final class TaggingExecutor implements Executor {
-
- private final Set<Task> tasks = CollectionUtil.synchronizedHashSet();
- private final Executor executor;
-
- public TaggingExecutor(final Executor executor) {
- this.executor = executor;
- }
-
- private final class Task implements Runnable {
- private volatile Thread thread;
- private final Runnable runnable;
-
- private Task(final Runnable runnable) {
- this.runnable = runnable;
- }
-
- public void run() {
- thread = Thread.currentThread();
- tasks.add(this);
- try {
- runnable.run();
- } finally {
- tasks.remove(this);
- thread = null;
- }
- }
- }
-
- public void execute(final Runnable command) {
- executor.execute(new Task(command));
- }
-
- public void interruptAll() {
- synchronized (tasks) {
- for (Task task : tasks) {
- final Thread thread = task.thread;
- if (thread != null) {
- thread.interrupt();
- }
- }
- }
- }
-}
Modified:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java 2008-10-21
05:35:10 UTC (rev 4606)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java 2008-10-22
22:01:55 UTC (rev 4607)
@@ -49,6 +49,8 @@
import org.jboss.remoting.SimpleCloseable;
import org.jboss.remoting.RemoteExecutionException;
import org.jboss.remoting.IndeterminateOutcomeException;
+import org.jboss.remoting.ReplyException;
+import org.jboss.remoting.RemoteReplyException;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Unmarshaller;
import org.jboss.marshalling.ByteOutput;
@@ -66,6 +68,7 @@
import java.nio.BufferUnderflowException;
import java.io.IOException;
import java.io.InvalidClassException;
+import java.io.InterruptedIOException;
/**
* Protocol handler for the basic message-oriented Remoting protocol.
@@ -257,7 +260,7 @@
IoUtils.safeClose(unmarshaller);
}
} catch (IOException ex) {
- log.trace("Failed to unmarshal a reply (%s), sending a
ReplyException");
+ log.trace("Failed to unmarshal a reply (%s), sending a
ReplyException", ex);
// todo
SpiUtils.safeHandleException(replyHandler, ex);
break;
@@ -276,8 +279,10 @@
case CANCEL_ACK: {
final int requestId = buffer.getInt();
final ReplyHandler replyHandler = remoteRequests.get(requestId);
- if (replyHandler != null) {
+ if (replyHandler != null) try {
replyHandler.handleCancellation();
+ } catch (IOException e) {
+ log.trace("Failed to forward a cancellation acknowledgement
(%s)", e);
}
break;
}
@@ -458,7 +463,7 @@
this.allocator = allocator;
}
- public void handleReply(final Object reply) {
+ public void handleReply(final Object reply) throws IOException {
ByteBuffer buffer = allocator.allocate();
buffer.put((byte) MessageType.REPLY.getId());
buffer.putInt(requestId);
@@ -479,15 +484,13 @@
} finally {
IoUtils.safeClose(marshaller);
}
- } catch (IOException e) {
- log.error(e, "Failed to send a reply to the remote side");
} catch (InterruptedException e) {
- log.error(e, "Reply handler thread interrupted before a reply could
be sent");
Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Reply handler thread interrupted
before a reply could be sent");
}
}
- public void handleException(final IOException exception) {
+ public void handleException(final IOException exception) throws IOException {
ByteBuffer buffer = allocator.allocate();
buffer.put((byte) MessageType.REQUEST_FAILED.getId());
buffer.putInt(requestId);
@@ -508,15 +511,13 @@
} finally {
IoUtils.safeClose(marshaller);
}
- } catch (IOException e) {
- log.error(e, "Failed to send an exception to the remote
side");
} catch (InterruptedException e) {
- log.error(e, "Reply handler thread interrupted before an exception
could be sent");
Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Reply handler thread interrupted
before an exception could be sent");
}
}
- public void handleCancellation() {
+ public void handleCancellation() throws InterruptedIOException {
final ByteBuffer buffer = allocator.allocate();
buffer.put((byte) MessageType.CANCEL_ACK.getId());
buffer.putInt(requestId);
@@ -524,8 +525,8 @@
try {
registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
} catch (InterruptedException e) {
- // todo log
Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Reply handler thread interrupted
before cancellation could be sent");
}
}
}