[jboss-remoting-commits] JBoss Remoting SVN: r4437 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting and 7 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Tue Jul 29 15:34:26 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-07-29 15:34:26 -0400 (Tue, 29 Jul 2008)
New Revision: 4437

Removed:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCompletionHandler.java
Modified:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteExecutionException.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCancelHandler.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java
   remoting3/trunk/build.properties
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalRequestHandler.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
   remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
   remoting3/trunk/testing-support/src/main/resources/testing.policy
Log:
Change exception structure; use IoFuture as a basis for future replies rather than Future.

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -1,6 +1,8 @@
 package org.jboss.cx.remoting;
 
 import java.util.concurrent.ConcurrentMap;
+import java.io.IOException;
+import org.jboss.xnio.IoFuture;
 
 /**
  * A communications client.  The client may be associated with state maintained by the local and/or remote side.
@@ -24,7 +26,7 @@
      * @throws RemotingException if the request could not be sent
      * @throws RemoteExecutionException if the remote handler threw an exception
      */
-    O invoke(I request) throws RemotingException, RemoteExecutionException;
+    O invoke(I request) throws IOException;
 
     /**
      * Send a request asynchronously.
@@ -38,9 +40,9 @@
      *
      * @return a future representing the result of the request
      *
-     * @throws RemotingException if the request could not be sent
+     * @throws IOException if the request could not be sent
      */
-    FutureReply<O> send(I request) throws RemotingException;
+    IoFuture<O> send(I request) throws IOException;
 
     /**
      * Send a request asynchronously, ignoring the reply.
@@ -51,9 +53,9 @@
      * Returns immediately.
      *
      * @param request the request to send
-     * @throws RemotingException if the request could not be sent
+     * @throws IOException if the request could not be sent
      */
-    void sendOneWay(I request) throws RemotingException;
+    void sendOneWay(I request) throws IOException;
 
     /**
      * Get the attribute map.  This map holds metadata about the current clinet.

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -1,5 +1,7 @@
 package org.jboss.cx.remoting;
 
+import java.io.IOException;
+
 /**
  * A source for new Remoting contexts.
  *
@@ -11,12 +13,12 @@
      * Close the context source.  New contexts may no longer be created after this
      * method is called.  Subsequent calls to this method have no additional effect.
      */
-    void close() throws RemotingException;
+    void close() throws IOException;
 
     /**
      * Create a new communications context.
      *
      * @return the new context
      */
-    Client<I, O> createClient() throws RemotingException;
+    Client<I, O> createClient() throws IOException;
 }

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -1,5 +1,7 @@
 package org.jboss.cx.remoting;
 
+import java.io.IOException;
+
 /**
  * A Remoting resource that can be closed.
  *
@@ -10,9 +12,9 @@
     /**
      * Close, waiting for any outstanding processing to finish.
      *
-     * @throws RemotingException if the close failed
+     * @throws IOException if the close failed
      */
-    void close() throws RemotingException;
+    void close() throws IOException;
 
     /**
      * Add a handler that will be called upon close.  The handler may be called before or after the close acutally

Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -1,121 +0,0 @@
-package org.jboss.cx.remoting;
-
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * The result of an invocation that may or may not yet have completed.
- * <p/>
- * In addition to representing the invocation results, this interface allows the user to cancel the request, or schedule
- * an asynchronous callback for when the request completes.
- *
- * @param <T> the result type of the future operation
- */
-public interface FutureReply<T> extends Future<T> {
-
-    /**
-     * Attempts to cancel execution of this request.  This attempt will fail if the request has already completed,
-     * already been cancelled, or could not be cancelled for some other reason.  The {@code mayInterruptIfRunning}
-     * parameter determines whether the thread executing this task should be interrupted in an attempt to stop the
-     * task.  If {@code false}, the thread will not be interrupted.  If {@code true}, then the remote service's
-     * interruption policy will be used.
-     *
-     * @param mayInterruptIfRunning {@code true} if the thread executing this task should be interrupted; otherwise,
-     * in-progress tasks are allowed to complete
-     *
-     * @return {@code false} if the task could not be cancelled, typically because it has already completed normally;
-     *         {@code true} otherwise
-     */
-    boolean cancel(boolean mayInterruptIfRunning);
-
-    /**
-     * Asynchronously send a request to cancel this request.  Does not block the current method.  Use the
-     * {@link #addCompletionHandler(RequestCompletionHandler)} method to add a notifier to be called upon completion.
-     *
-     * @param mayInterruptIfRunning
-     */
-    FutureReply<T> sendCancel(boolean mayInterruptIfRunning);
-
-    /**
-     * Returns {@code true} if this task was cancelled before it completed normally.
-     *
-     * @return {@code true} if task was cancelled before it completed
-     */
-    boolean isCancelled();
-
-    /**
-     * Returns {@code true} if this request completed.
-     * <p/>
-     * Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method
-     * will return {@code true}.
-     *
-     * @return {@code true} if this request completed
-     */
-    boolean isDone();
-
-    /**
-     * Waits if necessary for the request to complete, and then retrieves its reply.
-     *
-     * @return the reply
-     *
-     * @throws CancellationException if the computation was cancelled
-     * @throws RemoteExecutionException if the computation threw an exception
-     */
-    T get() throws CancellationException, RemoteExecutionException;
-
-    /**
-     * Waits if necessary for the request to complete, and then retrieves its reply.
-     *
-     * @return the reply
-     *
-     * @throws CancellationException if the computation was cancelled
-     * @throws RemoteExecutionException if the computation threw an exception
-     * @throws InterruptedException if the current thread was interrupted while waiting
-     */
-    T getInterruptibly() throws InterruptedException, CancellationException, RemoteExecutionException;
-
-    /**
-     * Waits if necessary for at most the given time for the request to complete, and then retrieves the reply, if
-     * available.  If no reply was available, {@code null} is returned.
-     *
-     * @param timeout the maximum time to wait
-     * @param unit the time unit of the timeout argument
-     *
-     * @return the reply, or {@code null} if the operation timed out
-     *
-     * @throws CancellationException if the computation was cancelled
-     * @throws RemoteExecutionException if the computation threw an exception
-     */
-    T get(long timeout, TimeUnit unit) throws CancellationException, RemoteExecutionException;
-
-    /**
-     * Waits if necessary for at most the given time for the request to complete, and then retrieves the reply, if
-     * available.  If no reply was available, {@code null} is returned.
-     *
-     * @param timeout the maximum time to wait
-     * @param unit the time unit of the timeout argument
-     *
-     * @return the reply, or {@code null} if the operation timed out
-     *
-     * @throws CancellationException if the computation was cancelled
-     * @throws RemoteExecutionException if the computation threw an exception
-     * @throws InterruptedException if the current thread was interrupted while waiting
-     */
-    T getInterruptibly(long timeout, TimeUnit unit) throws InterruptedException, CancellationException, RemoteExecutionException;
-
-    /**
-     * Add a notifier to be called when the request has completed.  The notifier may be called from the current thread
-     * or a different thread.  If the request has already completed, the notifier will be called immediately. Calling
-     * this method guarantees that the supplied handler will be called.  The handler may be called at any time after
-     * the request has completed, though implementations should make a reasonable effort to ensure that the handler is
-     * called in a timely manner.
-     * <p/>
-     * This method returns {@code this} in order to facilitate method call chaining.
-     *
-     * @param handler the handler to add, or {@code null} to clear the handler
-     *
-     * @return this future reply
-     */
-    FutureReply<T> addCompletionHandler(RequestCompletionHandler<T> handler);
-}

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteExecutionException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteExecutionException.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteExecutionException.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -1,11 +1,9 @@
 package org.jboss.cx.remoting;
 
-import java.util.concurrent.ExecutionException;
-
 /**
  * Exception thrown when execution of a remote operation fails for some reason.
  */
-public class RemoteExecutionException extends ExecutionException {
+public class RemoteExecutionException extends RemotingException {
 
     private static final long serialVersionUID = 3580395686019440048L;
 

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCancelHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCancelHandler.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCancelHandler.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -11,7 +11,6 @@
      * Receive notification that the request was cancelled.
      *
      * @param requestContext the request context
-     * @param mayInterrupt the value of the cancellation {@code mayInterrupt} flag
      */
-    void notifyCancel(RequestContext<O> requestContext, boolean mayInterrupt);
+    void notifyCancel(RequestContext<O> requestContext);
 }

Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCompletionHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCompletionHandler.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCompletionHandler.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -1,16 +0,0 @@
-package org.jboss.cx.remoting;
-
-/**
- * A handler for receiving notification of request completion on the client side.
- *
- * @param <T> the reply type
- */
-public interface RequestCompletionHandler<T> {
-
-    /**
-     * Receive notification that the request was completed, was cancelled, or has failed.
-     *
-     * @param futureReply the future result
-     */
-    void notifyComplete(FutureReply<T> futureReply);
-}

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -33,6 +33,8 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 
 /**
  * A basic implementation of a closeable resource.  Use as a convenient base class for your closeable resources.
@@ -53,7 +55,11 @@
     static {
         boolean b = false;
         try {
-            b = Boolean.parseBoolean(System.getProperty("jboss.remoting.leakdebugging", "false"));
+            b = Boolean.parseBoolean(AccessController.doPrivileged(new PrivilegedAction<String>() {
+                public String run() {
+                    return System.getProperty("jboss.remoting.leakdebugging", "false");
+                }
+            }));
         } catch (SecurityException se) {
             b = false;
         }

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -27,8 +27,6 @@
 import org.jboss.cx.remoting.RequestCancelHandler;
 import org.jboss.cx.remoting.RequestContext;
 import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.RequestCompletionHandler;
-import org.jboss.cx.remoting.FutureReply;
 import org.jboss.xnio.log.Logger;
 
 /**
@@ -85,14 +83,12 @@
     /**
      * Safely notify a request listener's cancel handler of cancellation.
      *
-     * @param <O> the reply
      * @param handler the request cancel handler
      * @param requestContext the request context
-     * @param mayInterrupt {@code true} if the request listener threads may be interrupted
      */
-    public static <O> void safeNotifyCancellation(final RequestCancelHandler<O> handler, final RequestContext<O> requestContext, boolean mayInterrupt) {
+    public static <O> void safeNotifyCancellation(final RequestCancelHandler<O> handler, final RequestContext<O> requestContext) {
         try {
-            handler.notifyCancel(requestContext, mayInterrupt);
+            handler.notifyCancel(requestContext);
         } catch (Throwable t) {
             log.error(t, "Request cancel handler threw an exception when calling notifyCancel()");
         }
@@ -113,21 +109,6 @@
         }
     }
 
-    /**
-     * Safely handle a future request completion.
-     *
-     * @param <O> the reply type
-     * @param handler
-     * @param futureReply
-     */
-    public static <O> void safeHandleRequestCompletion(final RequestCompletionHandler<O> handler, final FutureReply<O> futureReply) {
-        try {
-            handler.notifyComplete(futureReply);
-        } catch (Throwable t) {
-            log.error(t, "Request completion handler failed unexpectedly");
-        }
-    }
-
     public static RemoteRequestContext getBlankRemoteRequestContext() {
         return BLANK_REMOTE_REQUEST_CONTEXT;
     }
@@ -135,7 +116,7 @@
     private static final RemoteRequestContext BLANK_REMOTE_REQUEST_CONTEXT = new BlankRemoteRequestContext();
 
     private static final class BlankRemoteRequestContext implements RemoteRequestContext {
-        public void cancel(final boolean mayInterrupt) {
+        public void cancel() {
         }
     }
 }

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -30,8 +30,6 @@
 
     /**
      * Signal that the request should be cancelled, if possible.
-     *
-     * @param mayInterrupt {@code true} if the task can be interrupted (advisory)
      */
-    void cancel(final boolean mayInterrupt);
+    void cancel();
 }

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -1,11 +1,10 @@
 package org.jboss.cx.remoting.spi.wrapper;
 
 import java.util.concurrent.ConcurrentMap;
+import java.io.IOException;
 import org.jboss.cx.remoting.CloseHandler;
 import org.jboss.cx.remoting.Client;
-import org.jboss.cx.remoting.FutureReply;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.RemotingException;
+import org.jboss.xnio.IoFuture;
 
 /**
  * A simple delegating wrapper for clients.
@@ -28,7 +27,7 @@
     /**
      * {@inheritDoc}  This implementation calls the same method on the delegate object.
      */
-    public void close() throws RemotingException {
+    public void close() throws IOException {
         delegate.close();
     }
 
@@ -46,21 +45,21 @@
     /**
      * {@inheritDoc}  This implementation calls the same method on the delegate object.
      */
-    public O invoke(final I request) throws RemotingException, RemoteExecutionException {
+    public O invoke(final I request) throws IOException {
         return delegate.invoke(request);
     }
 
     /**
      * {@inheritDoc}  This implementation calls the same method on the delegate object.
      */
-    public FutureReply<O> send(final I request) throws RemotingException {
+    public IoFuture<O> send(final I request) throws IOException {
         return delegate.send(request);
     }
 
     /**
      * {@inheritDoc}  This implementation calls the same method on the delegate object.
      */
-    public void sendOneWay(final I request) throws RemotingException {
+    public void sendOneWay(final I request) throws IOException {
         delegate.sendOneWay(request);
     }
 

Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/build.properties	2008-07-29 19:34:26 UTC (rev 4437)
@@ -173,7 +173,7 @@
 lib.trove.local=${local.repository}/${lib.trove.path}
 lib.trove.remote=${remote.repository}/${lib.trove.path}
 
-lib.xnio.version=1.1.0.Alpha2008072101
+lib.xnio.version=1.1.0.Alpha2008072901
 
 lib.xnio-api.name=xnio-api-${lib.xnio.version}.jar
 lib.xnio-api.license=lgpl

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -24,15 +24,14 @@
 
 import org.jboss.cx.remoting.Client;
 import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.FutureReply;
-import org.jboss.cx.remoting.RequestCompletionHandler;
 import org.jboss.cx.remoting.core.util.QueueExecutor;
 import org.jboss.cx.remoting.spi.remote.RequestHandler;
 import org.jboss.cx.remoting.spi.remote.ReplyHandler;
 import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
 import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.xnio.IoFuture;
 import java.util.concurrent.Executor;
+import java.io.IOException;
 
 /**
  *
@@ -50,7 +49,7 @@
         handle.close();
     }
 
-    public O invoke(final I request) throws RemotingException, RemoteExecutionException {
+    public O invoke(final I request) throws IOException {
         if (! isOpen()) {
             throw new RemotingException("Client is not open");
         }
@@ -59,8 +58,8 @@
         final ReplyHandler replyHandler = futureReply.getReplyHandler();
         final RemoteRequestContext requestContext = handle.getResource().receiveRequest(request, replyHandler);
         futureReply.setRemoteRequestContext(requestContext);
-        futureReply.addCompletionHandler(new RequestCompletionHandler<O>() {
-            public void notifyComplete(final FutureReply<O> reply) {
+        futureReply.addNotifier(new IoFuture.Notifier<O>() {
+            public void notify(final IoFuture<O> future) {
                 executor.shutdown();
             }
         });
@@ -68,7 +67,7 @@
         return futureReply.get();
     }
 
-    public FutureReply<O> send(final I request) throws RemotingException {
+    public IoFuture<O> send(final I request) throws IOException {
         if (! isOpen()) {
             throw new RemotingException("Client is not open");
         }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -22,347 +22,57 @@
 
 package org.jboss.cx.remoting.core;
 
-import org.jboss.cx.remoting.FutureReply;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.RequestCompletionHandler;
-import org.jboss.cx.remoting.util.CollectionUtil;
+import org.jboss.cx.remoting.RemotingException;
 import org.jboss.cx.remoting.spi.remote.ReplyHandler;
 import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
-import org.jboss.cx.remoting.spi.SpiUtils;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.TimeUnit;
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
 import java.util.concurrent.Executor;
-import java.util.List;
 
 /**
  *
  */
-public final class FutureReplyImpl<O> implements FutureReply<O> {
+public final class FutureReplyImpl<O> extends AbstractIoFuture<O> {
 
     private final Executor executor;
     private final ReplyHandler replyHandler = new Handler();
-    private final Object lock = new Object();
-    // @protectedby lock
-    private State state = State.NEW;
-    // @protectedby lock
-    private RemoteRequestContext remoteRequestContext;
-    // @protectedby lock
-    private O result;
-    // @protectedby lock
-    private Throwable cause;
-    // @protectedby lock
-    private String msg;
-    // @protectedby lock
-    private List<RequestCompletionHandler<O>> completionHandlers;
+    private volatile RemoteRequestContext remoteRequestContext;
 
     public FutureReplyImpl(final Executor executor) {
         this.executor = executor;
     }
 
-    private enum State {
-        NEW,
-        WAITING,
-        DONE,
-        CANCELLED,
-        FAILED,
-    }
-
     void setRemoteRequestContext(final RemoteRequestContext remoteRequestContext) {
-        synchronized (lock) {
-            if (state != State.NEW) {
-                throw new IllegalStateException("Wrong state");
-            }
-            state = State.WAITING;
-            this.remoteRequestContext = remoteRequestContext;
-            lock.notifyAll();
-        }
+        this.remoteRequestContext = remoteRequestContext;
     }
 
-    public boolean cancel(final boolean mayInterruptIfRunning) {
-        final RemoteRequestContext context;
-        synchronized (lock) {
-            while (state == State.NEW) {
-                try {
-                    lock.wait();
-                } catch (InterruptedException e) {
-                    return false;
-                }
-            }
-            context = remoteRequestContext;
-        }
-        context.cancel(mayInterruptIfRunning);
-        synchronized (lock) {
-            while (state == State.WAITING) {
-                try {
-                    lock.wait();
-                } catch (InterruptedException e) {
-                    return false;
-                }
-            }
-            return state == State.CANCELLED;
-        }
-    }
-
-    public FutureReply<O> sendCancel(final boolean mayInterruptIfRunning) {
-        final RemoteRequestContext context;
-        synchronized (lock) {
-            while (state == State.NEW) {
-                try {
-                    lock.wait();
-                } catch (InterruptedException e) {
-                    return this;
-                }
-            }
-            context = remoteRequestContext;
-        }
-        context.cancel(mayInterruptIfRunning);
+    public IoFuture<O> cancel() {
+        // must not be called before setRemoteRequestContext
+        remoteRequestContext.cancel();
         return this;
     }
 
-    public boolean isCancelled() {
-        synchronized (lock) {
-            return state == State.CANCELLED;
-        }
+    protected Executor getNotifierExecutor() {
+        return executor;
     }
 
-    public boolean isDone() {
-        synchronized (lock) {
-            return state == State.DONE;
-        }
-    }
-
-    public O get() throws CancellationException, RemoteExecutionException {
-        boolean intr = false;
-        try {
-            synchronized (lock) {
-                while (state == State.WAITING || state == State.NEW) {
-                    try {
-                        lock.wait();
-                    } catch (InterruptedException e) {
-                        intr = true;
-                    }
-                }
-                switch (state) {
-                    case CANCELLED:
-                        throw new CancellationException("Request was cancelled");
-                    case DONE:
-                        return result;
-                    case FAILED:
-                        throw new RemoteExecutionException(msg, cause);
-                    default:
-                        throw new IllegalStateException("Wrong state");
-                }
-            }
-        } finally {
-            if (intr) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-    public O getInterruptibly() throws InterruptedException, CancellationException, RemoteExecutionException {
-        synchronized (lock) {
-            while (state == State.WAITING || state == State.NEW) {
-                lock.wait();
-            }
-            switch (state) {
-                case CANCELLED:
-                    throw new CancellationException("Request was cancelled");
-                case DONE:
-                    return result;
-                case FAILED:
-                    throw new RemoteExecutionException(msg, cause);
-                default:
-                    throw new IllegalStateException("Wrong state");
-            }
-        }
-    }
-
-    public O get(final long timeout, final TimeUnit unit) throws CancellationException, RemoteExecutionException {
-        if (unit == null) {
-            throw new NullPointerException("unit is null");
-        }
-        if (timeout < 0L) {
-            throw new IllegalArgumentException("timeout is negative");
-        }
-        boolean intr = false;
-        try {
-            synchronized (lock) {
-                long now = System.currentTimeMillis();
-                final long deadline = now + unit.toMillis(timeout);
-                if (deadline < 0L) {
-                    return get();
-                }
-                while (state == State.WAITING || state == State.NEW) {
-                    try {
-                        if (now >= deadline) {
-                            return null;
-                        }
-                        lock.wait(deadline - now);
-                    } catch (InterruptedException e) {
-                        intr = true;
-                    }
-                    now = System.currentTimeMillis();
-                }
-                switch (state) {
-                    case CANCELLED:
-                        throw new CancellationException("Request was cancelled");
-                    case DONE:
-                        return result;
-                    case FAILED:
-                        throw new RemoteExecutionException(msg, cause);
-                    default:
-                        throw new IllegalStateException("Wrong state");
-                }
-            }
-        } finally {
-            if (intr) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-    public O getInterruptibly(final long timeout, final TimeUnit unit) throws InterruptedException, CancellationException, RemoteExecutionException {
-        if (unit == null) {
-            throw new NullPointerException("unit is null");
-        }
-        if (timeout < 0L) {
-            throw new IllegalArgumentException("timeout is negative");
-        }
-        synchronized (lock) {
-            while (state == State.WAITING || state == State.NEW) {
-                unit.timedWait(lock, timeout);
-            }
-            switch (state) {
-                case CANCELLED:
-                    throw new CancellationException("Request was cancelled");
-                case DONE:
-                    return result;
-                case FAILED:
-                    throw new RemoteExecutionException(msg, cause);
-                case WAITING:
-                case NEW:
-                    return null;
-                default:
-                    throw new IllegalStateException("Wrong state");
-            }
-        }
-    }
-
-    public FutureReply<O> addCompletionHandler(final RequestCompletionHandler<O> handler) {
-        synchronized (lock) {
-            switch (state) {
-                case NEW:
-                case WAITING:
-                    if (completionHandlers == null) {
-                        completionHandlers = CollectionUtil.arrayList();
-                    }
-                    completionHandlers.add(handler);
-                    break;
-                default:
-                    SpiUtils.safeHandleRequestCompletion(handler, this);
-                    break;
-            }
-        }
-        return this;
-    }
-
     ReplyHandler getReplyHandler() {
         return replyHandler;
     }
 
-    private void runCompletionHandlers() {
-        synchronized (lock) {
-            final List<RequestCompletionHandler<O>> handlers = completionHandlers;
-            if (handlers != null) {
-                completionHandlers = null;
-                executor.execute(new Runnable() {
-                    public void run() {
-                        for (RequestCompletionHandler<O> handler : handlers) {
-                            SpiUtils.safeHandleRequestCompletion(handler, FutureReplyImpl.this);
-                        }
-                    }
-                });
-            }
-        }
-    }
-
     private final class Handler implements ReplyHandler {
 
         @SuppressWarnings({ "unchecked" })
         public void handleReply(final Object reply) {
-            synchronized (lock) {
-                while (state == State.NEW) {
-                    boolean intr = false;
-                    try {
-                        try {
-                            lock.wait();
-                        } catch (InterruptedException e) {
-                            intr = true;
-                        }
-                    } finally {
-                        if (intr) {
-                            Thread.currentThread().interrupt();
-                        }
-                    }
-                }
-                if (state == State.WAITING) {
-                    state = State.DONE;
-                    result = (O) reply;
-                    runCompletionHandlers();
-                    lock.notifyAll();
-                }
-            }
+            setResult((O) reply);
         }
 
         public void handleException(final String exMsg, final Throwable exCause) {
-            synchronized (lock) {
-                while (state == State.NEW) {
-                    boolean intr = false;
-                    try {
-                        try {
-                            lock.wait();
-                        } catch (InterruptedException e) {
-                            intr = true;
-                        }
-                    } finally {
-                        if (intr) {
-                            Thread.currentThread().interrupt();
-                        }
-                    }
-                }
-                if (state == State.WAITING) {
-                    state = State.FAILED;
-                    msg = exMsg;
-                    cause = exCause;
-                    runCompletionHandlers();
-                    lock.notifyAll();
-                }
-            }
+            setException(new RemotingException(exMsg, exCause));
         }
 
         public void handleCancellation() {
-            synchronized (lock) {
-                while (state == State.NEW) {
-                    boolean intr = false;
-                    try {
-                        try {
-                            lock.wait();
-                        } catch (InterruptedException e) {
-                            intr = true;
-                        }
-                    } finally {
-                        if (intr) {
-                            Thread.currentThread().interrupt();
-                        }
-                    }
-                }
-                if (state == State.WAITING) {
-                    state = State.CANCELLED;
-                    runCompletionHandlers();
-                    lock.notifyAll();
-                }
-            }
+            finishCancel();
         }
     }
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalRequestHandler.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalRequestHandler.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalRequestHandler.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -89,8 +89,8 @@
             }
         });
         return new RemoteRequestContext() {
-            public void cancel(final boolean mayInterrupt) {
-                context.cancel(mayInterrupt);
+            public void cancel() {
+                context.cancel();
             }
         };
     }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -101,7 +101,7 @@
     public void addCancelHandler(final RequestCancelHandler<O> handler) {
         synchronized (cancelLock) {
             if (cancelled.get()) {
-                SpiUtils.safeNotifyCancellation(handler, this, false);
+                SpiUtils.safeNotifyCancellation(handler, this);
             } else {
                 if (cancelHandlers == null) {
                     cancelHandlers = new HashSet<RequestCancelHandler<O>>();
@@ -115,23 +115,21 @@
         executor.execute(command);
     }
 
-    protected void cancel(final boolean mayInterrupt) {
+    protected void cancel() {
         if (! cancelled.getAndSet(true)) {
             synchronized (cancelLock) {
                 if (cancelHandlers != null) {
                     for (final RequestCancelHandler<O> handler : cancelHandlers) {
                         executor.execute(new Runnable() {
                             public void run() {
-                                SpiUtils.safeNotifyCancellation(handler, RequestContextImpl.this, mayInterrupt);
+                                SpiUtils.safeNotifyCancellation(handler, RequestContextImpl.this);
                             }
                         });
                     }
                     cancelHandlers = null;
                 }
             }
-            if (mayInterrupt) {
-                executor.interruptAll();
-            }
+            executor.interruptAll();
         }
     }
 }

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -727,12 +727,11 @@
             this.channel = channel;
         }
 
-        public void cancel(final boolean mayInterrupt) {
+        public void cancel() {
             try {
                 final ByteBuffer buffer = allocator.allocate();
                 buffer.put((byte) MessageType.CANCEL_REQUEST);
                 buffer.putInt(id);
-                buffer.put((byte) (mayInterrupt ? 1 : 0));
                 buffer.flip();
                 registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
             } catch (InterruptedException e) {

Modified: remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java	2008-07-29 19:34:26 UTC (rev 4437)
@@ -38,7 +38,6 @@
 import org.jboss.cx.remoting.ClientSource;
 import org.jboss.cx.remoting.Client;
 import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.FutureReply;
 import org.jboss.cx.remoting.AbstractRequestListener;
 import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
 import org.jboss.cx.remoting.spi.remote.Handle;
@@ -51,6 +50,7 @@
 import org.jboss.xnio.ChannelSource;
 import org.jboss.xnio.CloseableTcpConnector;
 import org.jboss.xnio.CloseableExecutor;
+import org.jboss.xnio.nio.NioXnio;
 import org.jboss.xnio.channels.AllocatedMessageChannel;
 import org.jboss.xnio.channels.Channels;
 import org.jboss.xnio.channels.StreamChannel;
@@ -77,7 +77,7 @@
                 public void free(final ByteBuffer buffer) {
                 }
             };
-            final Xnio xnio = Xnio.createNio();
+            final Xnio xnio = NioXnio.create();
             try {
                 final EndpointImpl endpoint = new EndpointImpl();
                 endpoint.setExecutor(closeableExecutor);
@@ -113,8 +113,9 @@
                                             try {
                                                 final Client<Object,Object> client = clientSource.createClient();
                                                 try {
-                                                    final FutureReply<Object> future = client.send(REQUEST);
-                                                    assertEquals(REPLY, future.get(500L, TimeUnit.MILLISECONDS));
+                                                    final IoFuture<Object> future = client.send(REQUEST);
+                                                    assertEquals(IoFuture.Status.DONE, future.await(TimeUnit.MILLISECONDS, 500L));
+                                                    assertEquals(REPLY, future.get());
                                                     client.close();
                                                     clientSource.close();
                                                     handleThirteen.close();

Modified: remoting3/trunk/testing-support/src/main/resources/testing.policy
===================================================================
--- remoting3/trunk/testing-support/src/main/resources/testing.policy	2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/testing-support/src/main/resources/testing.policy	2008-07-29 19:34:26 UTC (rev 4437)
@@ -15,20 +15,30 @@
 {
     permission java.lang.RuntimePermission "modifyThread"; // for executor control
     permission java.net.SocketPermission "*:*", "accept, connect, resolve";
+    permission java.util.PropertyPermission "xnio.provider", "read"; // todo - fixed in XNIO trunk...
 };
 
-grant codeBase "file:${build.home}/protocol/basic/target/main/classes/-"
+// Permissions for Remoting itself
+
+grant codeBase "file:${build.home}/api/target/main/classes/-"
 {
-    permission java.net.SocketPermission "*:*", "accept, connect, resolve";
+    permission java.util.PropertyPermission "jboss.remoting.*", "read";
 };
 
-// TODO: this is for the marshallers, which ought to be in their own module/module set
-
 grant codeBase "file:${build.home}/core/target/main/classes/-"
 {
+    // TODO: this is for the marshallers, which ought to be in their own module/module set
     permission java.io.SerializablePermission "enableSubstitution";
+    permission java.util.PropertyPermission "jboss.remoting.*", "read";
 };
 
+grant codeBase "file:${build.home}/protocol/basic/target/main/classes/-"
+{
+    permission java.net.SocketPermission "*:*", "accept, connect, resolve"; // todo - need a better solution
+    permission java.util.PropertyPermission "jboss.remoting.*", "read";
+};
+
+
 // Support classes
 
 grant codeBase "file:${build.home}/testing-support/target/main/classes/-"




More information about the jboss-remoting-commits mailing list