Author: david.lloyd(a)jboss.com
Date: 2008-07-29 15:34:26 -0400 (Tue, 29 Jul 2008)
New Revision: 4437
Removed:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCompletionHandler.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteExecutionException.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCancelHandler.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java
remoting3/trunk/build.properties
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalRequestHandler.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
remoting3/trunk/testing-support/src/main/resources/testing.policy
Log:
Change exception structure; use IoFuture as a basis for future replies rather than
Future.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java 2008-07-29
15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -1,6 +1,8 @@
package org.jboss.cx.remoting;
import java.util.concurrent.ConcurrentMap;
+import java.io.IOException;
+import org.jboss.xnio.IoFuture;
/**
* A communications client. The client may be associated with state maintained by the
local and/or remote side.
@@ -24,7 +26,7 @@
* @throws RemotingException if the request could not be sent
* @throws RemoteExecutionException if the remote handler threw an exception
*/
- O invoke(I request) throws RemotingException, RemoteExecutionException;
+ O invoke(I request) throws IOException;
/**
* Send a request asynchronously.
@@ -38,9 +40,9 @@
*
* @return a future representing the result of the request
*
- * @throws RemotingException if the request could not be sent
+ * @throws IOException if the request could not be sent
*/
- FutureReply<O> send(I request) throws RemotingException;
+ IoFuture<O> send(I request) throws IOException;
/**
* Send a request asynchronously, ignoring the reply.
@@ -51,9 +53,9 @@
* Returns immediately.
*
* @param request the request to send
- * @throws RemotingException if the request could not be sent
+ * @throws IOException if the request could not be sent
*/
- void sendOneWay(I request) throws RemotingException;
+ void sendOneWay(I request) throws IOException;
/**
* Get the attribute map. This map holds metadata about the current clinet.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java 2008-07-29
15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -1,5 +1,7 @@
package org.jboss.cx.remoting;
+import java.io.IOException;
+
/**
* A source for new Remoting contexts.
*
@@ -11,12 +13,12 @@
* Close the context source. New contexts may no longer be created after this
* method is called. Subsequent calls to this method have no additional effect.
*/
- void close() throws RemotingException;
+ void close() throws IOException;
/**
* Create a new communications context.
*
* @return the new context
*/
- Client<I, O> createClient() throws RemotingException;
+ Client<I, O> createClient() throws IOException;
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java 2008-07-29
15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -1,5 +1,7 @@
package org.jboss.cx.remoting;
+import java.io.IOException;
+
/**
* A Remoting resource that can be closed.
*
@@ -10,9 +12,9 @@
/**
* Close, waiting for any outstanding processing to finish.
*
- * @throws RemotingException if the close failed
+ * @throws IOException if the close failed
*/
- void close() throws RemotingException;
+ void close() throws IOException;
/**
* Add a handler that will be called upon close. The handler may be called before or
after the close acutally
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java 2008-07-29
15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -1,121 +0,0 @@
-package org.jboss.cx.remoting;
-
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * The result of an invocation that may or may not yet have completed.
- * <p/>
- * In addition to representing the invocation results, this interface allows the user to
cancel the request, or schedule
- * an asynchronous callback for when the request completes.
- *
- * @param <T> the result type of the future operation
- */
-public interface FutureReply<T> extends Future<T> {
-
- /**
- * Attempts to cancel execution of this request. This attempt will fail if the
request has already completed,
- * already been cancelled, or could not be cancelled for some other reason. The
{@code mayInterruptIfRunning}
- * parameter determines whether the thread executing this task should be interrupted
in an attempt to stop the
- * task. If {@code false}, the thread will not be interrupted. If {@code true},
then the remote service's
- * interruption policy will be used.
- *
- * @param mayInterruptIfRunning {@code true} if the thread executing this task should
be interrupted; otherwise,
- * in-progress tasks are allowed to complete
- *
- * @return {@code false} if the task could not be cancelled, typically because it has
already completed normally;
- * {@code true} otherwise
- */
- boolean cancel(boolean mayInterruptIfRunning);
-
- /**
- * Asynchronously send a request to cancel this request. Does not block the current
method. Use the
- * {@link #addCompletionHandler(RequestCompletionHandler)} method to add a notifier
to be called upon completion.
- *
- * @param mayInterruptIfRunning
- */
- FutureReply<T> sendCancel(boolean mayInterruptIfRunning);
-
- /**
- * Returns {@code true} if this task was cancelled before it completed normally.
- *
- * @return {@code true} if task was cancelled before it completed
- */
- boolean isCancelled();
-
- /**
- * Returns {@code true} if this request completed.
- * <p/>
- * Completion may be due to normal termination, an exception, or cancellation -- in
all of these cases, this method
- * will return {@code true}.
- *
- * @return {@code true} if this request completed
- */
- boolean isDone();
-
- /**
- * Waits if necessary for the request to complete, and then retrieves its reply.
- *
- * @return the reply
- *
- * @throws CancellationException if the computation was cancelled
- * @throws RemoteExecutionException if the computation threw an exception
- */
- T get() throws CancellationException, RemoteExecutionException;
-
- /**
- * Waits if necessary for the request to complete, and then retrieves its reply.
- *
- * @return the reply
- *
- * @throws CancellationException if the computation was cancelled
- * @throws RemoteExecutionException if the computation threw an exception
- * @throws InterruptedException if the current thread was interrupted while waiting
- */
- T getInterruptibly() throws InterruptedException, CancellationException,
RemoteExecutionException;
-
- /**
- * Waits if necessary for at most the given time for the request to complete, and
then retrieves the reply, if
- * available. If no reply was available, {@code null} is returned.
- *
- * @param timeout the maximum time to wait
- * @param unit the time unit of the timeout argument
- *
- * @return the reply, or {@code null} if the operation timed out
- *
- * @throws CancellationException if the computation was cancelled
- * @throws RemoteExecutionException if the computation threw an exception
- */
- T get(long timeout, TimeUnit unit) throws CancellationException,
RemoteExecutionException;
-
- /**
- * Waits if necessary for at most the given time for the request to complete, and
then retrieves the reply, if
- * available. If no reply was available, {@code null} is returned.
- *
- * @param timeout the maximum time to wait
- * @param unit the time unit of the timeout argument
- *
- * @return the reply, or {@code null} if the operation timed out
- *
- * @throws CancellationException if the computation was cancelled
- * @throws RemoteExecutionException if the computation threw an exception
- * @throws InterruptedException if the current thread was interrupted while waiting
- */
- T getInterruptibly(long timeout, TimeUnit unit) throws InterruptedException,
CancellationException, RemoteExecutionException;
-
- /**
- * Add a notifier to be called when the request has completed. The notifier may be
called from the current thread
- * or a different thread. If the request has already completed, the notifier will be
called immediately. Calling
- * this method guarantees that the supplied handler will be called. The handler may
be called at any time after
- * the request has completed, though implementations should make a reasonable effort
to ensure that the handler is
- * called in a timely manner.
- * <p/>
- * This method returns {@code this} in order to facilitate method call chaining.
- *
- * @param handler the handler to add, or {@code null} to clear the handler
- *
- * @return this future reply
- */
- FutureReply<T> addCompletionHandler(RequestCompletionHandler<T>
handler);
-}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteExecutionException.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteExecutionException.java 2008-07-29
15:17:07 UTC (rev 4436)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteExecutionException.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -1,11 +1,9 @@
package org.jboss.cx.remoting;
-import java.util.concurrent.ExecutionException;
-
/**
* Exception thrown when execution of a remote operation fails for some reason.
*/
-public class RemoteExecutionException extends ExecutionException {
+public class RemoteExecutionException extends RemotingException {
private static final long serialVersionUID = 3580395686019440048L;
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCancelHandler.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCancelHandler.java 2008-07-29
15:17:07 UTC (rev 4436)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCancelHandler.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -11,7 +11,6 @@
* Receive notification that the request was cancelled.
*
* @param requestContext the request context
- * @param mayInterrupt the value of the cancellation {@code mayInterrupt} flag
*/
- void notifyCancel(RequestContext<O> requestContext, boolean mayInterrupt);
+ void notifyCancel(RequestContext<O> requestContext);
}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCompletionHandler.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCompletionHandler.java 2008-07-29
15:17:07 UTC (rev 4436)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestCompletionHandler.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -1,16 +0,0 @@
-package org.jboss.cx.remoting;
-
-/**
- * A handler for receiving notification of request completion on the client side.
- *
- * @param <T> the reply type
- */
-public interface RequestCompletionHandler<T> {
-
- /**
- * Receive notification that the request was completed, was cancelled, or has
failed.
- *
- * @param futureReply the future result
- */
- void notifyComplete(FutureReply<T> futureReply);
-}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java 2008-07-29
15:17:07 UTC (rev 4436)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -33,6 +33,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
/**
* A basic implementation of a closeable resource. Use as a convenient base class for
your closeable resources.
@@ -53,7 +55,11 @@
static {
boolean b = false;
try {
- b =
Boolean.parseBoolean(System.getProperty("jboss.remoting.leakdebugging",
"false"));
+ b = Boolean.parseBoolean(AccessController.doPrivileged(new
PrivilegedAction<String>() {
+ public String run() {
+ return System.getProperty("jboss.remoting.leakdebugging",
"false");
+ }
+ }));
} catch (SecurityException se) {
b = false;
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-29
15:17:07 UTC (rev 4436)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -27,8 +27,6 @@
import org.jboss.cx.remoting.RequestCancelHandler;
import org.jboss.cx.remoting.RequestContext;
import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.RequestCompletionHandler;
-import org.jboss.cx.remoting.FutureReply;
import org.jboss.xnio.log.Logger;
/**
@@ -85,14 +83,12 @@
/**
* Safely notify a request listener's cancel handler of cancellation.
*
- * @param <O> the reply
* @param handler the request cancel handler
* @param requestContext the request context
- * @param mayInterrupt {@code true} if the request listener threads may be
interrupted
*/
- public static <O> void safeNotifyCancellation(final
RequestCancelHandler<O> handler, final RequestContext<O> requestContext,
boolean mayInterrupt) {
+ public static <O> void safeNotifyCancellation(final
RequestCancelHandler<O> handler, final RequestContext<O> requestContext) {
try {
- handler.notifyCancel(requestContext, mayInterrupt);
+ handler.notifyCancel(requestContext);
} catch (Throwable t) {
log.error(t, "Request cancel handler threw an exception when calling
notifyCancel()");
}
@@ -113,21 +109,6 @@
}
}
- /**
- * Safely handle a future request completion.
- *
- * @param <O> the reply type
- * @param handler
- * @param futureReply
- */
- public static <O> void safeHandleRequestCompletion(final
RequestCompletionHandler<O> handler, final FutureReply<O> futureReply) {
- try {
- handler.notifyComplete(futureReply);
- } catch (Throwable t) {
- log.error(t, "Request completion handler failed unexpectedly");
- }
- }
-
public static RemoteRequestContext getBlankRemoteRequestContext() {
return BLANK_REMOTE_REQUEST_CONTEXT;
}
@@ -135,7 +116,7 @@
private static final RemoteRequestContext BLANK_REMOTE_REQUEST_CONTEXT = new
BlankRemoteRequestContext();
private static final class BlankRemoteRequestContext implements RemoteRequestContext
{
- public void cancel(final boolean mayInterrupt) {
+ public void cancel() {
}
}
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java 2008-07-29
15:17:07 UTC (rev 4436)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -30,8 +30,6 @@
/**
* Signal that the request should be cancelled, if possible.
- *
- * @param mayInterrupt {@code true} if the task can be interrupted (advisory)
*/
- void cancel(final boolean mayInterrupt);
+ void cancel();
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java 2008-07-29
15:17:07 UTC (rev 4436)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -1,11 +1,10 @@
package org.jboss.cx.remoting.spi.wrapper;
import java.util.concurrent.ConcurrentMap;
+import java.io.IOException;
import org.jboss.cx.remoting.CloseHandler;
import org.jboss.cx.remoting.Client;
-import org.jboss.cx.remoting.FutureReply;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.RemotingException;
+import org.jboss.xnio.IoFuture;
/**
* A simple delegating wrapper for clients.
@@ -28,7 +27,7 @@
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public void close() throws RemotingException {
+ public void close() throws IOException {
delegate.close();
}
@@ -46,21 +45,21 @@
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public O invoke(final I request) throws RemotingException, RemoteExecutionException
{
+ public O invoke(final I request) throws IOException {
return delegate.invoke(request);
}
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public FutureReply<O> send(final I request) throws RemotingException {
+ public IoFuture<O> send(final I request) throws IOException {
return delegate.send(request);
}
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public void sendOneWay(final I request) throws RemotingException {
+ public void sendOneWay(final I request) throws IOException {
delegate.sendOneWay(request);
}
Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties 2008-07-29 15:17:07 UTC (rev 4436)
+++ remoting3/trunk/build.properties 2008-07-29 19:34:26 UTC (rev 4437)
@@ -173,7 +173,7 @@
lib.trove.local=${local.repository}/${lib.trove.path}
lib.trove.remote=${remote.repository}/${lib.trove.path}
-lib.xnio.version=1.1.0.Alpha2008072101
+lib.xnio.version=1.1.0.Alpha2008072901
lib.xnio-api.name=xnio-api-${lib.xnio.version}.jar
lib.xnio-api.license=lgpl
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java 2008-07-29
15:17:07 UTC (rev 4436)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -24,15 +24,14 @@
import org.jboss.cx.remoting.Client;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.FutureReply;
-import org.jboss.cx.remoting.RequestCompletionHandler;
import org.jboss.cx.remoting.core.util.QueueExecutor;
import org.jboss.cx.remoting.spi.remote.RequestHandler;
import org.jboss.cx.remoting.spi.remote.ReplyHandler;
import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.xnio.IoFuture;
import java.util.concurrent.Executor;
+import java.io.IOException;
/**
*
@@ -50,7 +49,7 @@
handle.close();
}
- public O invoke(final I request) throws RemotingException, RemoteExecutionException
{
+ public O invoke(final I request) throws IOException {
if (! isOpen()) {
throw new RemotingException("Client is not open");
}
@@ -59,8 +58,8 @@
final ReplyHandler replyHandler = futureReply.getReplyHandler();
final RemoteRequestContext requestContext =
handle.getResource().receiveRequest(request, replyHandler);
futureReply.setRemoteRequestContext(requestContext);
- futureReply.addCompletionHandler(new RequestCompletionHandler<O>() {
- public void notifyComplete(final FutureReply<O> reply) {
+ futureReply.addNotifier(new IoFuture.Notifier<O>() {
+ public void notify(final IoFuture<O> future) {
executor.shutdown();
}
});
@@ -68,7 +67,7 @@
return futureReply.get();
}
- public FutureReply<O> send(final I request) throws RemotingException {
+ public IoFuture<O> send(final I request) throws IOException {
if (! isOpen()) {
throw new RemotingException("Client is not open");
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java 2008-07-29
15:17:07 UTC (rev 4436)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -22,347 +22,57 @@
package org.jboss.cx.remoting.core;
-import org.jboss.cx.remoting.FutureReply;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.RequestCompletionHandler;
-import org.jboss.cx.remoting.util.CollectionUtil;
+import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.spi.remote.ReplyHandler;
import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
-import org.jboss.cx.remoting.spi.SpiUtils;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.TimeUnit;
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
import java.util.concurrent.Executor;
-import java.util.List;
/**
*
*/
-public final class FutureReplyImpl<O> implements FutureReply<O> {
+public final class FutureReplyImpl<O> extends AbstractIoFuture<O> {
private final Executor executor;
private final ReplyHandler replyHandler = new Handler();
- private final Object lock = new Object();
- // @protectedby lock
- private State state = State.NEW;
- // @protectedby lock
- private RemoteRequestContext remoteRequestContext;
- // @protectedby lock
- private O result;
- // @protectedby lock
- private Throwable cause;
- // @protectedby lock
- private String msg;
- // @protectedby lock
- private List<RequestCompletionHandler<O>> completionHandlers;
+ private volatile RemoteRequestContext remoteRequestContext;
public FutureReplyImpl(final Executor executor) {
this.executor = executor;
}
- private enum State {
- NEW,
- WAITING,
- DONE,
- CANCELLED,
- FAILED,
- }
-
void setRemoteRequestContext(final RemoteRequestContext remoteRequestContext) {
- synchronized (lock) {
- if (state != State.NEW) {
- throw new IllegalStateException("Wrong state");
- }
- state = State.WAITING;
- this.remoteRequestContext = remoteRequestContext;
- lock.notifyAll();
- }
+ this.remoteRequestContext = remoteRequestContext;
}
- public boolean cancel(final boolean mayInterruptIfRunning) {
- final RemoteRequestContext context;
- synchronized (lock) {
- while (state == State.NEW) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- return false;
- }
- }
- context = remoteRequestContext;
- }
- context.cancel(mayInterruptIfRunning);
- synchronized (lock) {
- while (state == State.WAITING) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- return false;
- }
- }
- return state == State.CANCELLED;
- }
- }
-
- public FutureReply<O> sendCancel(final boolean mayInterruptIfRunning) {
- final RemoteRequestContext context;
- synchronized (lock) {
- while (state == State.NEW) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- return this;
- }
- }
- context = remoteRequestContext;
- }
- context.cancel(mayInterruptIfRunning);
+ public IoFuture<O> cancel() {
+ // must not be called before setRemoteRequestContext
+ remoteRequestContext.cancel();
return this;
}
- public boolean isCancelled() {
- synchronized (lock) {
- return state == State.CANCELLED;
- }
+ protected Executor getNotifierExecutor() {
+ return executor;
}
- public boolean isDone() {
- synchronized (lock) {
- return state == State.DONE;
- }
- }
-
- public O get() throws CancellationException, RemoteExecutionException {
- boolean intr = false;
- try {
- synchronized (lock) {
- while (state == State.WAITING || state == State.NEW) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- intr = true;
- }
- }
- switch (state) {
- case CANCELLED:
- throw new CancellationException("Request was
cancelled");
- case DONE:
- return result;
- case FAILED:
- throw new RemoteExecutionException(msg, cause);
- default:
- throw new IllegalStateException("Wrong state");
- }
- }
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public O getInterruptibly() throws InterruptedException, CancellationException,
RemoteExecutionException {
- synchronized (lock) {
- while (state == State.WAITING || state == State.NEW) {
- lock.wait();
- }
- switch (state) {
- case CANCELLED:
- throw new CancellationException("Request was cancelled");
- case DONE:
- return result;
- case FAILED:
- throw new RemoteExecutionException(msg, cause);
- default:
- throw new IllegalStateException("Wrong state");
- }
- }
- }
-
- public O get(final long timeout, final TimeUnit unit) throws CancellationException,
RemoteExecutionException {
- if (unit == null) {
- throw new NullPointerException("unit is null");
- }
- if (timeout < 0L) {
- throw new IllegalArgumentException("timeout is negative");
- }
- boolean intr = false;
- try {
- synchronized (lock) {
- long now = System.currentTimeMillis();
- final long deadline = now + unit.toMillis(timeout);
- if (deadline < 0L) {
- return get();
- }
- while (state == State.WAITING || state == State.NEW) {
- try {
- if (now >= deadline) {
- return null;
- }
- lock.wait(deadline - now);
- } catch (InterruptedException e) {
- intr = true;
- }
- now = System.currentTimeMillis();
- }
- switch (state) {
- case CANCELLED:
- throw new CancellationException("Request was
cancelled");
- case DONE:
- return result;
- case FAILED:
- throw new RemoteExecutionException(msg, cause);
- default:
- throw new IllegalStateException("Wrong state");
- }
- }
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public O getInterruptibly(final long timeout, final TimeUnit unit) throws
InterruptedException, CancellationException, RemoteExecutionException {
- if (unit == null) {
- throw new NullPointerException("unit is null");
- }
- if (timeout < 0L) {
- throw new IllegalArgumentException("timeout is negative");
- }
- synchronized (lock) {
- while (state == State.WAITING || state == State.NEW) {
- unit.timedWait(lock, timeout);
- }
- switch (state) {
- case CANCELLED:
- throw new CancellationException("Request was cancelled");
- case DONE:
- return result;
- case FAILED:
- throw new RemoteExecutionException(msg, cause);
- case WAITING:
- case NEW:
- return null;
- default:
- throw new IllegalStateException("Wrong state");
- }
- }
- }
-
- public FutureReply<O> addCompletionHandler(final
RequestCompletionHandler<O> handler) {
- synchronized (lock) {
- switch (state) {
- case NEW:
- case WAITING:
- if (completionHandlers == null) {
- completionHandlers = CollectionUtil.arrayList();
- }
- completionHandlers.add(handler);
- break;
- default:
- SpiUtils.safeHandleRequestCompletion(handler, this);
- break;
- }
- }
- return this;
- }
-
ReplyHandler getReplyHandler() {
return replyHandler;
}
- private void runCompletionHandlers() {
- synchronized (lock) {
- final List<RequestCompletionHandler<O>> handlers =
completionHandlers;
- if (handlers != null) {
- completionHandlers = null;
- executor.execute(new Runnable() {
- public void run() {
- for (RequestCompletionHandler<O> handler : handlers) {
- SpiUtils.safeHandleRequestCompletion(handler,
FutureReplyImpl.this);
- }
- }
- });
- }
- }
- }
-
private final class Handler implements ReplyHandler {
@SuppressWarnings({ "unchecked" })
public void handleReply(final Object reply) {
- synchronized (lock) {
- while (state == State.NEW) {
- boolean intr = false;
- try {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- intr = true;
- }
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- }
- if (state == State.WAITING) {
- state = State.DONE;
- result = (O) reply;
- runCompletionHandlers();
- lock.notifyAll();
- }
- }
+ setResult((O) reply);
}
public void handleException(final String exMsg, final Throwable exCause) {
- synchronized (lock) {
- while (state == State.NEW) {
- boolean intr = false;
- try {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- intr = true;
- }
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- }
- if (state == State.WAITING) {
- state = State.FAILED;
- msg = exMsg;
- cause = exCause;
- runCompletionHandlers();
- lock.notifyAll();
- }
- }
+ setException(new RemotingException(exMsg, exCause));
}
public void handleCancellation() {
- synchronized (lock) {
- while (state == State.NEW) {
- boolean intr = false;
- try {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- intr = true;
- }
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- }
- if (state == State.WAITING) {
- state = State.CANCELLED;
- runCompletionHandlers();
- lock.notifyAll();
- }
- }
+ finishCancel();
}
}
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalRequestHandler.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalRequestHandler.java 2008-07-29
15:17:07 UTC (rev 4436)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalRequestHandler.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -89,8 +89,8 @@
}
});
return new RemoteRequestContext() {
- public void cancel(final boolean mayInterrupt) {
- context.cancel(mayInterrupt);
+ public void cancel() {
+ context.cancel();
}
};
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java 2008-07-29
15:17:07 UTC (rev 4436)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -101,7 +101,7 @@
public void addCancelHandler(final RequestCancelHandler<O> handler) {
synchronized (cancelLock) {
if (cancelled.get()) {
- SpiUtils.safeNotifyCancellation(handler, this, false);
+ SpiUtils.safeNotifyCancellation(handler, this);
} else {
if (cancelHandlers == null) {
cancelHandlers = new HashSet<RequestCancelHandler<O>>();
@@ -115,23 +115,21 @@
executor.execute(command);
}
- protected void cancel(final boolean mayInterrupt) {
+ protected void cancel() {
if (! cancelled.getAndSet(true)) {
synchronized (cancelLock) {
if (cancelHandlers != null) {
for (final RequestCancelHandler<O> handler : cancelHandlers) {
executor.execute(new Runnable() {
public void run() {
- SpiUtils.safeNotifyCancellation(handler,
RequestContextImpl.this, mayInterrupt);
+ SpiUtils.safeNotifyCancellation(handler,
RequestContextImpl.this);
}
});
}
cancelHandlers = null;
}
}
- if (mayInterrupt) {
- executor.interruptAll();
- }
+ executor.interruptAll();
}
}
}
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-29
15:17:07 UTC (rev 4436)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -727,12 +727,11 @@
this.channel = channel;
}
- public void cancel(final boolean mayInterrupt) {
+ public void cancel() {
try {
final ByteBuffer buffer = allocator.allocate();
buffer.put((byte) MessageType.CANCEL_REQUEST);
buffer.putInt(id);
- buffer.put((byte) (mayInterrupt ? 1 : 0));
buffer.flip();
registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
} catch (InterruptedException e) {
Modified:
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
---
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-07-29
15:17:07 UTC (rev 4436)
+++
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-07-29
19:34:26 UTC (rev 4437)
@@ -38,7 +38,6 @@
import org.jboss.cx.remoting.ClientSource;
import org.jboss.cx.remoting.Client;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.FutureReply;
import org.jboss.cx.remoting.AbstractRequestListener;
import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
import org.jboss.cx.remoting.spi.remote.Handle;
@@ -51,6 +50,7 @@
import org.jboss.xnio.ChannelSource;
import org.jboss.xnio.CloseableTcpConnector;
import org.jboss.xnio.CloseableExecutor;
+import org.jboss.xnio.nio.NioXnio;
import org.jboss.xnio.channels.AllocatedMessageChannel;
import org.jboss.xnio.channels.Channels;
import org.jboss.xnio.channels.StreamChannel;
@@ -77,7 +77,7 @@
public void free(final ByteBuffer buffer) {
}
};
- final Xnio xnio = Xnio.createNio();
+ final Xnio xnio = NioXnio.create();
try {
final EndpointImpl endpoint = new EndpointImpl();
endpoint.setExecutor(closeableExecutor);
@@ -113,8 +113,9 @@
try {
final Client<Object,Object> client
= clientSource.createClient();
try {
- final FutureReply<Object>
future = client.send(REQUEST);
- assertEquals(REPLY, future.get(500L,
TimeUnit.MILLISECONDS));
+ final IoFuture<Object> future =
client.send(REQUEST);
+ assertEquals(IoFuture.Status.DONE,
future.await(TimeUnit.MILLISECONDS, 500L));
+ assertEquals(REPLY, future.get());
client.close();
clientSource.close();
handleThirteen.close();
Modified: remoting3/trunk/testing-support/src/main/resources/testing.policy
===================================================================
--- remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-07-29 15:17:07
UTC (rev 4436)
+++ remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-07-29 19:34:26
UTC (rev 4437)
@@ -15,20 +15,30 @@
{
permission java.lang.RuntimePermission "modifyThread"; // for executor
control
permission java.net.SocketPermission "*:*", "accept, connect,
resolve";
+ permission java.util.PropertyPermission "xnio.provider", "read";
// todo - fixed in XNIO trunk...
};
-grant codeBase "file:${build.home}/protocol/basic/target/main/classes/-"
+// Permissions for Remoting itself
+
+grant codeBase "file:${build.home}/api/target/main/classes/-"
{
- permission java.net.SocketPermission "*:*", "accept, connect,
resolve";
+ permission java.util.PropertyPermission "jboss.remoting.*",
"read";
};
-// TODO: this is for the marshallers, which ought to be in their own module/module set
-
grant codeBase "file:${build.home}/core/target/main/classes/-"
{
+ // TODO: this is for the marshallers, which ought to be in their own module/module
set
permission java.io.SerializablePermission "enableSubstitution";
+ permission java.util.PropertyPermission "jboss.remoting.*",
"read";
};
+grant codeBase "file:${build.home}/protocol/basic/target/main/classes/-"
+{
+ permission java.net.SocketPermission "*:*", "accept, connect,
resolve"; // todo - need a better solution
+ permission java.util.PropertyPermission "jboss.remoting.*",
"read";
+};
+
+
// Support classes
grant codeBase "file:${build.home}/testing-support/target/main/classes/-"