[jboss-remoting-commits] JBoss Remoting SVN: r4607 - in remoting3/trunk: api/src/main/java/org/jboss/remoting/spi and 4 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Wed Oct 22 18:01:56 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-10-22 18:01:55 -0400 (Wed, 22 Oct 2008)
New Revision: 4607

Added:
   remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteReplyException.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestListenerExecutor.java
Removed:
   remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingExceptionCarrier.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/util/TaggingExecutor.java
Modified:
   remoting3/trunk/api/src/main/java/org/jboss/remoting/Client.java
   remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingException.java
   remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContext.java
   remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java
   remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
Log:
Update semantics of exception handling for client/handler/listener chains, as per 10/21 discussion

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/Client.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/Client.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/Client.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -1,7 +1,9 @@
 package org.jboss.remoting;
 
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CancellationException;
 import java.io.IOException;
+import java.io.ObjectStreamException;
 import org.jboss.xnio.IoFuture;
 
 /**
@@ -12,29 +14,67 @@
  */
 public interface Client<I, O> extends HandleableCloseable<Client<I, O>> {
     /**
-     * Send a request and block until a reply is received.
+     * Send a request and block until a reply is received.  If the remote side manipulates a stream, the
+     * current thread may be used to handle it.
      * <p/>
-     * Uses the default invocation policy for handling remote invocations. If the remote side manipulates a stream, the
-     * current thread will be used to handle it by default.
+     * If the operation is cancelled asynchronously, a {@code CancellationException} is thrown.  This exception indicates
+     * that the request was received and was executed, but a cancellation request was received and handled before the
+     * reply was able to be sent.  The remote service will have cleanly cancelled the operation.  This exception type
+     * is a {@code RuntimeException}; thus direct handling of this exception is optional (depending on your use case).
      * <p/>
-     * If the remote session cannot handle the request, a {@code RemotingException} will be thrown.
+     * If the request is sent but the remote side sends an exception back, a {@code RemoteExecutionException} is thrown
+     * with the cause and message initialized by the remote service.  This exception indicates an error in the execution
+     * of the service's {@code RequestListener}.  The service will have cleanly recovered from such an exception.
+     * <p/>
+     * If the request is sent and the remote side tries to reply, but sending the reply fails, a
+     * {@code RemoteReplyException} is thrown, possibly with the cause initialized to the reason of the failure.  Typically
+     * this exception is thrown when serialization of the reply failed for some reason.  This exception type extends
+     * {@code RemoteExecutionException} and can be treated similarly in most cases.
+     * <p/>
+     * If the request is sent and the remote side sends the reply successfully but there is an error reading the reply
+     * locally, a {@code ReplyException} is thrown.  In this case the operation is known to have completed without error
+     * but the actual detailed reply cannot be known.  In cases where the reply would be ignored anyway, this exception
+     * type may be safely ignored (possibly logging it for informational purposes).  This exception is typically caused
+     * by an {@code ObjectStreamException} thrown while marshalling the reply, though other causes are also possible.
+     * <p/>
+     * If the result of the operation is known to be impossible to ascertain, then an {@code IndeterminateOutcomeException}
+     * is thrown.  Possible causes of this condition include (but are not limited to) the connection to the remote side
+     * being unexpectedly broken, or the current thread being interrupted before the reply can be read.  In the latter
+     * case, a best effort is automatically made to attempt to cancel the outstanding operation, though there is no
+     * guarantee.
+     * <p/>
+     * If the request cannot be sent, some other {@code IOException} will be thrown with the reason, including (but not limited to)
+     * attempting to call this method on a closed client, or {@code ObjectStreamException}s related to marshalling the
+     * request locally or unmarshalling it remotely.  Such an exception indicates that the remote side did not receive
+     * the request.
+     * <p/>
+     * All these exceptions (apart from {@code CancellationException}) extend {@code IOException} which makes it easier
+     * to selectively catch only those exceptions that you need to implement special policy for, while relegating the
+     * rest to common handlers.
      *
      * @param request the request to send
      *
      * @return the result of the request
      *
-     * @throws RemotingException if the request could not be sent
+     * @throws CancellationException if the operation was cancelled asynchronously
      * @throws RemoteExecutionException if the remote handler threw an exception
+     * @throws RemoteReplyException if the remote side was unable to send the response
+     * @throws ReplyException if the operation succeeded but the reply cannot be read for some reason
+     * @throws IndeterminateOutcomeException if the result of the operation cannot be ascertained
+     * @throws ObjectStreamException if marshalling or unmarshalling some part of the request failed
+     * @throws IOException if some I/O error occurred while sending the request
      */
-    O invoke(I request) throws IOException;
+    O invoke(I request) throws IOException, CancellationException;
 
     /**
-     * Send a request asynchronously.
+     * Send a request asynchronously.  If the remote side manipulates a stream, it
+     * may use a local policy to assign one or more thread(s) to handle the local end of that stream, or it may
+     * fail with an exception (e.g. if this method is called on a client with no threads to handle streaming).
      * <p/>
-     * Uses the default invocation policy for handling remote invocations. If the remote side manipulates a stream, it
-     * MAY fail with an exception (e.g. if this method is called on a client with no threads to handle streaming).
-     * <p/>
-     * Returns immediately.
+     * Returns immediately.  The returned {@code IoFuture} object can be queried at a later time to determine the result
+     * of the operation.  If the operation fails, one of the conditions described on the {@link #invoke(Object) invoke(I)}
+     * method will result.  This condition can be determined by reading the status of the {@code IoFuture} object or
+     * by attempting to read the result.
      *
      * @param request the request to send
      *

Added: remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteReplyException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteReplyException.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteReplyException.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting;
+
+/**
+ * An exception indicating that a the remote side tried and failed to send a reply message; the remote side would be
+ * aware of this type of failure, so the outcome is determinate; thus it extends {@code RemoteExecutionException}.
+ */
+public class RemoteReplyException extends RemoteExecutionException {
+
+    private static final long serialVersionUID = -8572480018652753441L;
+
+    /**
+     * Constructs a <tt>RemoteReplyException</tt> with no detail message. The cause is not initialized, and may subsequently
+     * be initialized by a call to {@link #initCause(Throwable) initCause}.
+     */
+    public RemoteReplyException() {
+    }
+
+    /**
+     * Constructs a <tt>RemoteReplyException</tt> with the specified detail message. The cause is not initialized, and may
+     * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+     *
+     * @param msg the detail message
+     */
+    public RemoteReplyException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Constructs a <tt>RemoteReplyException</tt> with the specified cause. The detail message is set to:
+     * <pre>
+     *  (cause == null ? null : cause.toString())</pre>
+     * (which typically contains the class and detail message of <tt>cause</tt>).
+     *
+     * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+     */
+    public RemoteReplyException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Constructs a <tt>RemoteReplyException</tt> with the specified detail message and cause.
+     *
+     * @param msg the detail message
+     * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+     */
+    public RemoteReplyException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingException.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingException.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -3,9 +3,10 @@
 import java.io.IOException;
 
 /**
- * A general Remoting exception.
+ * A general Remoting exception.  Used as a base class in order to provide constructors which accept any combination
+ * of {@code cause} and {@code message}.
  */
-public class RemotingException extends IOException {
+public abstract class RemotingException extends IOException {
 
     private static final long serialVersionUID = 1540716301579397423L;
 
@@ -13,7 +14,7 @@
      * Constructs a <tt>RemotingException</tt> with no detail message. The cause is not initialized, and may subsequently be
      * initialized by a call to {@link #initCause(Throwable) initCause}.
      */
-    public RemotingException() {
+    protected RemotingException() {
     }
 
     /**
@@ -22,7 +23,7 @@
      *
      * @param msg the detail message
      */
-    public RemotingException(String msg) {
+    protected RemotingException(String msg) {
         super(msg);
     }
 
@@ -34,7 +35,7 @@
      *
      * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
      */
-    public RemotingException(Throwable cause) {
+    protected RemotingException(Throwable cause) {
         initCause(cause);
     }
 
@@ -44,7 +45,7 @@
      * @param msg the detail message
      * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
      */
-    public RemotingException(String msg, Throwable cause) {
+    protected RemotingException(String msg, Throwable cause) {
         super(msg);
         initCause(cause);
     }

Deleted: remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingExceptionCarrier.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingExceptionCarrier.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/RemotingExceptionCarrier.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -1,27 +0,0 @@
-package org.jboss.remoting;
-
-/**
- * A runtime exception that carries a {@link org.jboss.remoting.RemotingException} as a cause.
- */
-public class RemotingExceptionCarrier extends IOExceptionCarrier {
-
-    private static final long serialVersionUID = -1326735788761007331L;
-
-    /**
-     * Construct a new carrier.
-     *
-     * @param cause the nested cause
-     */
-    public RemotingExceptionCarrier(RemotingException cause) {
-        super(cause);
-    }
-
-    /**
-     * Get the cause.
-     *
-     * @return the cause
-     */
-    public RemotingException getCause() {
-        return (RemotingException) super.getCause();
-    }
-}

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContext.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContext.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -4,7 +4,8 @@
 import java.io.IOException;
 
 /**
- * The context of a single request.
+ * The context of a single request.  A request listener is obligated to call exactly one of the three {@code send} methods
+ * specified in this interface.
  *
  * @param <O> the reply type
  */
@@ -24,30 +25,38 @@
     boolean isCancelled();
 
     /**
-     * Send a reply back to the caller.
+     * Send a reply back to the caller.  If transmission fails, an {@code IOException} is thrown from this method
+     * and a reply is sent back to the client which will trigger a {@link RemoteReplyException} to be thrown.  If the
+     * client connection is interrupted in such a way that the reply cannot reach the client, the client will (eventually)
+     * receive an {@link IndeterminateOutcomeException}.
      *
      * @param reply the reply to send
      * @throws IOException if the transmission failed
-     * @throws IllegalStateException if a reply was already sent
+     * @throws IllegalStateException if this or another of the {@code sendXXX()} methods was already invoked for this request
      */
     void sendReply(O reply) throws IOException, IllegalStateException;
 
     /**
-     * Send a failure message back to the caller.
+     * Send a failure message back to the caller.  If the transmission succeeds, the client will receive a
+     * {@link RemoteExecutionException} with the message initialized to {@code msg} and the cause initialized to
+     * {@code cause}.  If the transmission fails, an {@code IOException} is thrown from this
+     * method and the client will (eventually) receive an {@link IndeterminateOutcomeException}.
      *
      * @param msg a message describing the failure, if any (can be {@code null})
      * @param cause the failure cause, if any (can be {@code null})
      *
      * @throws IOException if the transmission failed
-     * @throws IllegalStateException if a reply was already sent
+     * @throws IllegalStateException if this or another of the {@code sendXXX()} methods was already invoked for this request
      */
     void sendFailure(String msg, Throwable cause) throws IOException, IllegalStateException;
 
     /**
-     * Send a cancellation message back to the client.
+     * Send a cancellation message back to the client.  If the transmission succeeds, the client result will be an
+     * acknowledgement of cancellation.  If the transmission fails, an {@code IOException} is thrown from this
+     * method and the client will (eventually) receive an {@link IndeterminateOutcomeException}.
      *
      * @throws IOException if the message could not be sent (the client could not be notified about the cancellation)
-     * @throws IllegalStateException if a reply was already sent
+     * @throws IllegalStateException if this or another of the {@code sendXXX()} methods was already invoked for this request
      */
     void sendCancelled() throws IOException, IllegalStateException;
 

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -48,7 +48,7 @@
         try {
             replyHandler.handleException(exception);
         } catch (Throwable t) {
-            log.error(t, "Failed to properly handle exception");
+            log.debug(t, "Failed to properly handle exception");
         }
     }
 
@@ -63,7 +63,7 @@
         try {
             replyHandler.handleReply(reply);
         } catch (Throwable t) {
-            log.error(t, "Failed to properly handle reply");
+            log.debug(t, "Failed to properly handle reply");
         }
     }
 
@@ -76,7 +76,7 @@
         try {
             replyHandler.handleCancellation();
         } catch (Throwable t) {
-            log.error(t, "Failed to properly handle cancellation");
+            log.debug(t, "Failed to properly handle cancellation");
         }
     }
 
@@ -90,7 +90,7 @@
         try {
             handler.notifyCancel(requestContext);
         } catch (Throwable t) {
-            log.error(t, "Request cancel handler threw an exception when calling notifyCancel()");
+            log.error(t, "Request cancel handler threw an exception");
         }
     }
 
@@ -105,7 +105,7 @@
         try {
             handler.handleClose(closed);
         } catch (Throwable t) {
-            log.error(t, "Close handler failed unexpectedly");
+            log.error(t, "Close handler threw an exception");
         }
     }
 

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -31,21 +31,22 @@
 public interface ReplyHandler {
 
     /**
-     * Handle a successful reply.
+     * Handle a successful reply.  If the reply could not be forwarded, an exception is thrown.
      *
      * @param reply the reply
      */
-    void handleReply(Object reply);
+    void handleReply(Object reply) throws IOException;
 
     /**
-     * Handle an exception.
+     * Handle an exception.  If the exception could not be forwarded, a (different) {@code IOException} is thrown.
      *
      * @param exception an exception which describes the problem
      */
-    void handleException(IOException exception);
+    void handleException(IOException exception) throws IOException;
 
     /**
-     * Handle a cancellation request.
+     * Handle a cancellation acknowledgement.  If the cancellation acknowledgement could not be forwarded, an
+     * exception is thrown.
      */
-    void handleCancellation();
+    void handleCancellation() throws IOException;
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -42,10 +42,12 @@
         this.endpoint = endpoint;
     }
 
+    @SuppressWarnings({ "unchecked" })
     public void writeExternal(final Object o, final ObjectOutput output) throws IOException {
         output.writeObject(((ClientImpl)o).getRequestHandlerHandle().getResource());
     }
 
+    @SuppressWarnings({ "unchecked" })
     public Object createExternal(final Class<?> aClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {
         final RequestHandler handler = (RequestHandler) input.readObject();
         return new ClientImpl(handler.getHandle(), endpoint.getExecutor());

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -23,7 +23,7 @@
 package org.jboss.remoting.core;
 
 import org.jboss.remoting.Client;
-import org.jboss.remoting.RemotingException;
+import org.jboss.remoting.IndeterminateOutcomeException;
 import org.jboss.remoting.core.util.QueueExecutor;
 import org.jboss.remoting.spi.remote.RequestHandler;
 import org.jboss.remoting.spi.remote.ReplyHandler;
@@ -51,7 +51,7 @@
 
     public O invoke(final I request) throws IOException {
         if (! isOpen()) {
-            throw new RemotingException("Client is not open");
+            throw new IOException("Client is not open");
         }
         final QueueExecutor executor = new QueueExecutor();
         final FutureReplyImpl<O> futureReply = new FutureReplyImpl<O>(executor);
@@ -64,12 +64,21 @@
             }
         });
         executor.runQueue();
-        return futureReply.get();
+        try {
+            return futureReply.getInterruptibly();
+        } catch (InterruptedException e) {
+            try {
+                futureReply.cancel();
+                throw new IndeterminateOutcomeException("The current thread was interrupted before the result could be read");
+            } finally {
+                Thread.currentThread().interrupt();
+            }
+        }
     }
 
     public IoFuture<O> send(final I request) throws IOException {
         if (! isOpen()) {
-            throw new RemotingException("Client is not open");
+            throw new IOException("Client is not open");
         }
         final FutureReplyImpl<O> futureReply = new FutureReplyImpl<O>(executor);
         final ReplyHandler replyHandler = futureReply.getReplyHandler();
@@ -78,9 +87,9 @@
         return futureReply;
     }
 
-    public void sendOneWay(final I request) throws RemotingException {
+    public void sendOneWay(final I request) throws IOException {
         if (! isOpen()) {
-            throw new RemotingException("Client is not open");
+            throw new IOException("Client is not open");
         }
         handle.getResource().receiveRequest(request);
     }

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -42,10 +42,12 @@
         this.endpoint = endpoint;
     }
 
+    @SuppressWarnings({ "unchecked" })
     public void writeExternal(final Object o, final ObjectOutput output) throws IOException {
         output.writeObject(((ClientSourceImpl) o).getRequestHandlerSourceHandle().getResource());
     }
 
+    @SuppressWarnings({ "unchecked" })
     public Object createExternal(final Class<?> aClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {
         final RequestHandlerSource handler = (RequestHandlerSource) input.readObject();
         return new ClientSourceImpl(handler.getHandle(), endpoint);

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -24,7 +24,6 @@
 
 import org.jboss.remoting.ClientSource;
 import org.jboss.remoting.Client;
-import org.jboss.remoting.RemotingException;
 import org.jboss.remoting.Endpoint;
 import org.jboss.remoting.spi.remote.RequestHandler;
 import org.jboss.remoting.spi.remote.RequestHandlerSource;
@@ -53,7 +52,7 @@
 
     public Client<I, O> createClient() throws IOException {
         if (! isOpen()) {
-            throw new RemotingException("Client source is not open");
+            throw new IOException("Client source is not open");
         }
         final Handle<RequestHandler> clientHandle = handle.getResource().createRequestHandler();
         try {

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -27,12 +27,12 @@
 import org.jboss.remoting.spi.remote.ReplyHandler;
 import org.jboss.remoting.spi.SpiUtils;
 import org.jboss.remoting.spi.AbstractAutoCloseable;
-import org.jboss.remoting.RemotingException;
 import org.jboss.remoting.RequestListener;
 import org.jboss.remoting.RemoteExecutionException;
 import org.jboss.remoting.CloseHandler;
 import org.jboss.xnio.log.Logger;
 import java.util.concurrent.Executor;
+import java.io.IOException;
 
 /**
  *
@@ -40,14 +40,12 @@
 public final class LocalRequestHandler<I, O> extends AbstractAutoCloseable<RequestHandler> implements RequestHandler {
 
     private final RequestListener<I, O> requestListener;
-    private final Executor executor;
     private final ClientContextImpl clientContext;
 
     private static final Logger log = Logger.getLogger(LocalRequestHandler.class);
 
     private LocalRequestHandler(final Executor executor, final RequestListener<I, O> requestListener, final ClientContextImpl clientContext) {
         super(executor);
-        this.executor = executor;
         this.requestListener = requestListener;
         this.clientContext = clientContext;
     }
@@ -62,7 +60,7 @@
 
     public void receiveRequest(final Object request) {
         final RequestContextImpl<O> context = new RequestContextImpl<O>(clientContext);
-        executor.execute(new Runnable() {
+        context.execute(new Runnable() {
             @SuppressWarnings({ "unchecked" })
             public void run() {
                 try {
@@ -76,7 +74,7 @@
 
     public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler replyHandler) {
         final RequestContextImpl<O> context = new RequestContextImpl<O>(replyHandler, clientContext);
-        executor.execute(new Runnable() {
+        context.execute(new Runnable() {
             @SuppressWarnings({ "unchecked" })
             public void run() {
                 try {
@@ -95,7 +93,7 @@
         };
     }
 
-    void open() throws RemotingException {
+    void open() throws IOException {
         try {
             requestListener.handleClientOpen(clientContext);
             addCloseHandler(new CloseHandler<RequestHandler>() {
@@ -108,7 +106,9 @@
                 }
             });
         } catch (Throwable t) {
-            throw new RemotingException("Failed to open client context", t);
+            final IOException ioe = new IOException("Failed to open client context");
+            ioe.initCause(t);
+            throw ioe;
         }
     }
 

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -27,7 +27,6 @@
 import org.jboss.remoting.spi.remote.Handle;
 import org.jboss.remoting.spi.AbstractAutoCloseable;
 import org.jboss.remoting.RequestListener;
-import org.jboss.remoting.RemotingException;
 import org.jboss.remoting.CloseHandler;
 import org.jboss.xnio.log.Logger;
 import java.util.concurrent.Executor;
@@ -57,11 +56,11 @@
             localRequestHandler.open();
             return localRequestHandler.getHandle();
         } else {
-            throw new RemotingException("LocalRequestHandlerSource is closed");
+            throw new IOException("LocalRequestHandlerSource is closed");
         }
     }
 
-    void open() throws RemotingException {
+    void open() throws IOException {
         try {
             requestListener.handleServiceOpen(serviceContext);
             addCloseHandler(new CloseHandler<RequestHandlerSource>() {
@@ -74,7 +73,9 @@
                 }
             });
         } catch (Throwable t) {
-            throw new RemotingException("Failed to open client context", t);
+            final IOException ioe = new IOException("Failed to open client context");
+            ioe.initCause(t);
+            throw ioe;
         }
     }
 

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -24,15 +24,17 @@
 
 import org.jboss.remoting.RequestContext;
 import org.jboss.remoting.ClientContext;
-import org.jboss.remoting.RemotingException;
 import org.jboss.remoting.RequestCancelHandler;
 import org.jboss.remoting.RemoteExecutionException;
-import org.jboss.remoting.core.util.TaggingExecutor;
+import org.jboss.remoting.RemoteReplyException;
+import org.jboss.remoting.IndeterminateOutcomeException;
 import org.jboss.remoting.spi.remote.ReplyHandler;
 import org.jboss.remoting.spi.SpiUtils;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.Set;
 import java.util.HashSet;
+import java.io.IOException;
 
 /**
  *
@@ -43,21 +45,26 @@
     private final Object cancelLock = new Object();
     private final ReplyHandler replyHandler;
     private final ClientContextImpl clientContext;
+    private final AtomicInteger taskCount = new AtomicInteger();
 
-    private final AtomicBoolean cancelled = new AtomicBoolean();
     // @protectedby cancelLock
+    private boolean cancelled;
+    // @protectedby cancelLock
     private Set<RequestCancelHandler<O>> cancelHandlers;
-    private final TaggingExecutor executor;
+    private final RequestListenerExecutor executor;
 
     RequestContextImpl(final ReplyHandler replyHandler, final ClientContextImpl clientContext) {
         this.replyHandler = replyHandler;
         this.clientContext = clientContext;
-        executor = new TaggingExecutor(clientContext.getExecutor());
+        //noinspection ThisEscapedInObjectConstruction
+        executor = new RequestListenerExecutor(clientContext.getExecutor(), this);
     }
 
+    // todo - used by one-way requests... :|
     RequestContextImpl(final ClientContextImpl clientContext) {
         this.clientContext = clientContext;
-        executor = new TaggingExecutor(clientContext.getExecutor());
+        //noinspection ThisEscapedInObjectConstruction
+        executor = new RequestListenerExecutor(clientContext.getExecutor(), this);
         replyHandler = null;
     }
 
@@ -66,34 +73,42 @@
     }
 
     public boolean isCancelled() {
-        return cancelled.get();
+        synchronized (cancelLock) {
+            return cancelled;
+        }
     }
 
-    public void sendReply(final O reply) throws RemotingException, IllegalStateException {
+    public void sendReply(final O reply) throws IOException, IllegalStateException {
         if (! closed.getAndSet(true)) {
-            if (replyHandler != null) {
+            if (replyHandler != null) try {
                 replyHandler.handleReply(reply);
-            }
+            } catch (IOException e) {
+                SpiUtils.safeHandleException(replyHandler, new RemoteReplyException("Remote reply failed", e));
+                throw e;
+            } else throw new IllegalStateException("Cannot send a reply to a one-way invocation");
         } else {
             throw new IllegalStateException("Reply already sent");
         }
     }
 
-    public void sendFailure(final String msg, final Throwable cause) throws RemotingException, IllegalStateException {
+    public void sendFailure(final String msg, final Throwable cause) throws IOException, IllegalStateException {
         if (! closed.getAndSet(true)) {
             if (replyHandler != null) {
                 replyHandler.handleException(new RemoteExecutionException(msg, cause));
-            }
+            } else throw new IllegalStateException("Cannot send a reply to a one-way invocation");
         } else {
             throw new IllegalStateException("Reply already sent");
         }
     }
 
-    public void sendCancelled() throws RemotingException, IllegalStateException {
+    public void sendCancelled() throws IOException, IllegalStateException {
         if (! closed.getAndSet(true)) {
-            if (replyHandler != null) {
+            if (replyHandler != null) try {
                 replyHandler.handleCancellation();
-            }
+            } catch (IOException e) {
+                // this is highly unlikely to succeed
+                SpiUtils.safeHandleException(replyHandler, new RemoteReplyException("Remote cancellation acknowledgement failed", e));
+            } else throw new IllegalStateException("Cannot send a reply to a one-way invocation");
         } else {
             throw new IllegalStateException("Reply already sent");
         }
@@ -101,7 +116,7 @@
 
     public void addCancelHandler(final RequestCancelHandler<O> handler) {
         synchronized (cancelLock) {
-            if (cancelled.get()) {
+            if (cancelled) {
                 SpiUtils.safeNotifyCancellation(handler, this);
             } else {
                 if (cancelHandlers == null) {
@@ -117,8 +132,9 @@
     }
 
     protected void cancel() {
-        if (! cancelled.getAndSet(true)) {
-            synchronized (cancelLock) {
+        synchronized (cancelLock) {
+            if (! cancelled) {
+                cancelled = true;
                 if (cancelHandlers != null) {
                     for (final RequestCancelHandler<O> handler : cancelHandlers) {
                         executor.execute(new Runnable() {
@@ -129,8 +145,19 @@
                     }
                     cancelHandlers = null;
                 }
+                executor.interruptAll();
             }
-            executor.interruptAll();
         }
     }
+
+    void startTask() {
+        taskCount.incrementAndGet();
+    }
+
+    void finishTask() {
+        if (taskCount.decrementAndGet() == 0 && closed.getAndSet(true)) {
+            // no response sent!  send back IndeterminateOutcomeException
+            SpiUtils.safeHandleException(replyHandler, new IndeterminateOutcomeException("No reply was sent by the request listener"));
+        }
+    }
 }

Copied: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestListenerExecutor.java (from rev 4601, remoting3/trunk/core/src/main/java/org/jboss/remoting/core/util/TaggingExecutor.java)
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestListenerExecutor.java	                        (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestListenerExecutor.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.core;
+
+import java.util.concurrent.Executor;
+import java.util.Set;
+import org.jboss.remoting.util.CollectionUtil;
+
+/**
+ *
+ */
+public final class RequestListenerExecutor implements Executor {
+
+    private final Set<Task> tasks = CollectionUtil.synchronizedHashSet();
+    private final Executor executor;
+    private final RequestContextImpl requestContext;
+
+    public RequestListenerExecutor(final Executor executor, final RequestContextImpl context) {
+        this.executor = executor;
+        requestContext = context;
+    }
+
+    private final class Task implements Runnable {
+        private volatile Thread thread;
+        private final Runnable runnable;
+
+        private Task(final Runnable runnable) {
+            this.runnable = runnable;
+        }
+
+        public void run() {
+            requestContext.startTask();
+            thread = Thread.currentThread();
+            tasks.add(this);
+            try {
+                runnable.run();
+            } finally {
+                requestContext.finishTask();
+                tasks.remove(this);
+                thread = null;
+            }
+        }
+    }
+
+    public void execute(final Runnable command) {
+        executor.execute(new Task(command));
+    }
+
+    public void interruptAll() {
+        synchronized (tasks) {
+            final Thread currentThread = Thread.currentThread();
+            for (Task task : tasks) {
+                final Thread thread = task.thread;
+                if (thread != null && thread != currentThread) {
+                    thread.interrupt();
+                }
+            }
+        }
+    }
+}

Deleted: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/util/TaggingExecutor.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/util/TaggingExecutor.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/util/TaggingExecutor.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -1,75 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.core.util;
-
-import java.util.concurrent.Executor;
-import java.util.Set;
-import org.jboss.remoting.util.CollectionUtil;
-
-/**
- *
- */
-public final class TaggingExecutor implements Executor {
-
-    private final Set<Task> tasks = CollectionUtil.synchronizedHashSet();
-    private final Executor executor;
-
-    public TaggingExecutor(final Executor executor) {
-        this.executor = executor;
-    }
-
-    private final class Task implements Runnable {
-        private volatile Thread thread;
-        private final Runnable runnable;
-
-        private Task(final Runnable runnable) {
-            this.runnable = runnable;
-        }
-
-        public void run() {
-            thread = Thread.currentThread();
-            tasks.add(this);
-            try {
-                runnable.run();
-            } finally {
-                tasks.remove(this);
-                thread = null;
-            }
-        }
-    }
-
-    public void execute(final Runnable command) {
-        executor.execute(new Task(command));
-    }
-
-    public void interruptAll() {
-        synchronized (tasks) {
-            for (Task task : tasks) {
-                final Thread thread = task.thread;
-                if (thread != null) {
-                    thread.interrupt();
-                }
-            }
-        }
-    }
-}

Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java	2008-10-21 05:35:10 UTC (rev 4606)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java	2008-10-22 22:01:55 UTC (rev 4607)
@@ -49,6 +49,8 @@
 import org.jboss.remoting.SimpleCloseable;
 import org.jboss.remoting.RemoteExecutionException;
 import org.jboss.remoting.IndeterminateOutcomeException;
+import org.jboss.remoting.ReplyException;
+import org.jboss.remoting.RemoteReplyException;
 import org.jboss.marshalling.MarshallerFactory;
 import org.jboss.marshalling.Unmarshaller;
 import org.jboss.marshalling.ByteOutput;
@@ -66,6 +68,7 @@
 import java.nio.BufferUnderflowException;
 import java.io.IOException;
 import java.io.InvalidClassException;
+import java.io.InterruptedIOException;
 
 /**
  * Protocol handler for the basic message-oriented Remoting protocol.
@@ -257,7 +260,7 @@
                             IoUtils.safeClose(unmarshaller);
                         }
                     } catch (IOException ex) {
-                        log.trace("Failed to unmarshal a reply (%s), sending a ReplyException");
+                        log.trace("Failed to unmarshal a reply (%s), sending a ReplyException", ex);
                         // todo
                         SpiUtils.safeHandleException(replyHandler, ex);
                         break;
@@ -276,8 +279,10 @@
                 case CANCEL_ACK: {
                     final int requestId = buffer.getInt();
                     final ReplyHandler replyHandler = remoteRequests.get(requestId);
-                    if (replyHandler != null) {
+                    if (replyHandler != null) try {
                         replyHandler.handleCancellation();
+                    } catch (IOException e) {
+                        log.trace("Failed to forward a cancellation acknowledgement (%s)", e);
                     }
                     break;
                 }
@@ -458,7 +463,7 @@
             this.allocator = allocator;
         }
 
-        public void handleReply(final Object reply) {
+        public void handleReply(final Object reply) throws IOException {
             ByteBuffer buffer = allocator.allocate();
             buffer.put((byte) MessageType.REPLY.getId());
             buffer.putInt(requestId);
@@ -479,15 +484,13 @@
                 } finally {
                     IoUtils.safeClose(marshaller);
                 }
-            } catch (IOException e) {
-                log.error(e, "Failed to send a reply to the remote side");
             } catch (InterruptedException e) {
-                log.error(e, "Reply handler thread interrupted before a reply could be sent");
                 Thread.currentThread().interrupt();
+                throw new InterruptedIOException("Reply handler thread interrupted before a reply could be sent");
             }
         }
 
-        public void handleException(final IOException exception) {
+        public void handleException(final IOException exception) throws IOException {
             ByteBuffer buffer = allocator.allocate();
             buffer.put((byte) MessageType.REQUEST_FAILED.getId());
             buffer.putInt(requestId);
@@ -508,15 +511,13 @@
                 } finally {
                     IoUtils.safeClose(marshaller);
                 }
-            } catch (IOException e) {
-                log.error(e, "Failed to send an exception to the remote side");
             } catch (InterruptedException e) {
-                log.error(e, "Reply handler thread interrupted before an exception could be sent");
                 Thread.currentThread().interrupt();
+                throw new InterruptedIOException("Reply handler thread interrupted before an exception could be sent");
             }
         }
 
-        public void handleCancellation() {
+        public void handleCancellation() throws InterruptedIOException {
             final ByteBuffer buffer = allocator.allocate();
             buffer.put((byte) MessageType.CANCEL_ACK.getId());
             buffer.putInt(requestId);
@@ -524,8 +525,8 @@
             try {
                 registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
             } catch (InterruptedException e) {
-                // todo log
                 Thread.currentThread().interrupt();
+                throw new InterruptedIOException("Reply handler thread interrupted before cancellation could be sent");
             }
         }
     }




More information about the jboss-remoting-commits mailing list