Author: david.lloyd(a)jboss.com
Date: 2008-07-03 00:48:51 -0400 (Thu, 03 Jul 2008)
New Revision: 4343
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractAutoCloseable.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractCloseable.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientContextImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceContextImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/QueueExecutor.java
Log:
Working endpoint implementation again
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java 2008-07-03
00:44:19 UTC (rev 4342)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -61,12 +61,4 @@
* @return the attribute map
*/
ConcurrentMap<Object, Object> getAttributes();
-
- /**
- * Get the classloader associated with this client. This is the classloader that
will be used to resolve any
- * remote classes.
- *
- * @return the classloader
- */
- ClassLoader getClassLoader();
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-07-03
00:44:19 UTC (rev 4342)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -38,7 +38,7 @@
* @param requestListener the request listener
* @return the client
*/
- <I, O> RemoteClientEndpoint<I, O> createClient(RequestListener<I,
O> requestListener);
+ <I, O> RemoteClientEndpoint<I, O> createClient(RequestListener<I,
O> requestListener) throws RemotingException;
/**
* Create a client source that can be used to acquire clients associated with a
request listener on this endpoint.
@@ -52,7 +52,7 @@
* @param requestListener the request listener
* @return the context source
*/
- <I, O> RemoteServiceEndpoint<I, O> createService(RequestListener<I,
O> requestListener);
+ <I, O> RemoteServiceEndpoint<I, O> createService(RequestListener<I,
O> requestListener) throws RemotingException;
/**
* Add a listener that is notified when a session is created.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java 2008-07-03
00:44:19 UTC (rev 4342)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/FutureReply.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -31,7 +31,7 @@
/**
* Asynchronously send a request to cancel this request. Does not block the current
method. Use the
- * {@link #addCompletionNotifier(RequestCompletionHandler)} method to add a notifier
to be called upon completion.
+ * {@link #addCompletionHandler(RequestCompletionHandler)} method to add a notifier
to be called upon completion.
*
* @param mayInterruptIfRunning
*/
@@ -117,5 +117,5 @@
*
* @return this future reply
*/
- FutureReply<T> addCompletionNotifier(RequestCompletionHandler<T>
handler);
+ FutureReply<T> addCompletionHandler(RequestCompletionHandler<T>
handler);
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-03
00:44:19 UTC (rev 4342)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -25,6 +25,9 @@
import org.jboss.cx.remoting.spi.remote.ReplyHandler;
import org.jboss.cx.remoting.RequestCancelHandler;
import org.jboss.cx.remoting.RequestContext;
+import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.RequestCompletionHandler;
+import org.jboss.cx.remoting.FutureReply;
import org.jboss.xnio.log.Logger;
/**
@@ -42,7 +45,7 @@
* @param msg the message
* @param cause the cause
*/
- public static void safeHandleException(ReplyHandler<?> replyHandler, String
msg, Throwable cause) {
+ public static void safeHandleException(final ReplyHandler<?> replyHandler,
final String msg, final Throwable cause) {
try {
replyHandler.handleException(msg, cause);
} catch (Throwable t) {
@@ -57,7 +60,7 @@
* @param replyHandler the reply handler
* @param reply the reply
*/
- public static <O> void safeHandleReply(ReplyHandler<O> replyHandler, O
reply) {
+ public static <O> void safeHandleReply(final ReplyHandler<O>
replyHandler, final O reply) {
try {
replyHandler.handleReply(reply);
} catch (Throwable t) {
@@ -70,7 +73,7 @@
*
* @param replyHandler the reply handler
*/
- public static void safeHandleCancellation(ReplyHandler<?> replyHandler) {
+ public static void safeHandleCancellation(final ReplyHandler<?> replyHandler)
{
try {
replyHandler.handleCancellation();
} catch (Throwable t) {
@@ -81,11 +84,12 @@
/**
* Safely notify a request listener's cancel handler of cancellation.
*
+ * @param <O> the reply
* @param handler the request cancel handler
* @param requestContext the request context
* @param mayInterrupt {@code true} if the request listener threads may be
interrupted
*/
- public static <O> void safeNotifyCancellation(RequestCancelHandler<O>
handler, RequestContext<O> requestContext, boolean mayInterrupt) {
+ public static <O> void safeNotifyCancellation(final
RequestCancelHandler<O> handler, final RequestContext<O> requestContext,
boolean mayInterrupt) {
try {
handler.notifyCancel(requestContext, mayInterrupt);
} catch (Throwable t) {
@@ -93,6 +97,34 @@
}
}
+ /**
+ * Safely handle a close notification.
+ *
+ * @param <T> the type of the closable resource
+ * @param handler the close handler
+ * @param closed the object that was closed
+ */
+ public static <T> void safeHandleClose(final CloseHandler<T> handler,
final T closed) {
+ try {
+ handler.handleClose(closed);
+ } catch (Throwable t) {
+ log.error(t, "Close handler failed unexpectedly");
+ }
+ }
+ /**
+ * Safely handle a future request completion.
+ *
+ * @param <O> the reply type
+ * @param handler
+ * @param futureReply
+ */
+ public static <O> void safeHandleRequestCompletion(final
RequestCompletionHandler<O> handler, final FutureReply<O> futureReply) {
+ try {
+ handler.notifyComplete(futureReply);
+ } catch (Throwable t) {
+ log.error(t, "Request completion handler failed unexpectedly");
+ }
+ }
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-03
00:44:19 UTC (rev 4342)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -34,6 +34,14 @@
public interface RemoteClientEndpoint<I, O> extends
Closeable<RemoteClientEndpoint<I, O>> {
/**
+ * Receive a one-way request from a remote system. This method is intended to be
called by protocol handlers. No
+ * reply will be sent back to the client.
+ *
+ * @param request the request
+ */
+ void receiveRequest(I request);
+
+ /**
* Receive a request from a remote system. This method is intended to be called by
protocol handlers. If the
* request cannot be accepted for some reason, the
* {@link ReplyHandler#handleException(String, Throwable)}
@@ -67,7 +75,7 @@
/**
* Automatically close this client endpoint when all handles and local client
instances are closed.
*/
- void autoClose();
+ void autoClose() throws RemotingException;
/**
* Close this client endpoint. The outcome of any outstanding requests is not
defined, though implementations
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java 2008-07-03
00:44:19 UTC (rev 4342)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -30,6 +30,8 @@
/**
* Signal that the request should be cancelled, if possible.
+ *
+ * @param mayInterrupt {@code true} if the task can be interrupted (advisory)
*/
- void cancel();
+ void cancel(final boolean mayInterrupt);
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-03
00:44:19 UTC (rev 4342)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -63,7 +63,7 @@
* Automatically close this service endpoint when all handles and local client source
instances
* are closed.
*/
- void autoClose();
+ void autoClose() throws RemotingException;
/**
* Close this service endpoint immediately.
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java 2008-07-03
00:44:19 UTC (rev 4342)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -70,11 +70,4 @@
public ConcurrentMap<Object, Object> getAttributes() {
return delegate.getAttributes();
}
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public ClassLoader getClassLoader() {
- return delegate.getClassLoader();
- }
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-07-03
00:44:19 UTC (rev 4342)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -45,14 +45,14 @@
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> RemoteClientEndpoint<I, O> createClient(final
RequestListener<I, O> requestListener) {
+ public <I, O> RemoteClientEndpoint<I, O> createClient(final
RequestListener<I, O> requestListener) throws RemotingException {
return delegate.createClient(requestListener);
}
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> RemoteServiceEndpoint<I,O> createService(final
RequestListener<I, O> requestListener) {
+ public <I, O> RemoteServiceEndpoint<I, O> createService(final
RequestListener<I, O> requestListener) throws RemotingException {
return delegate.createService(requestListener);
}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractAutoCloseable.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractAutoCloseable.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractAutoCloseable.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -0,0 +1,103 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.xnio.log.Logger;
+
+/**
+ *
+ */
+public abstract class AbstractAutoCloseable<T> extends AbstractCloseable<T>
{
+
+ private final AtomicBoolean autoClose = new AtomicBoolean();
+ private final AtomicInteger refcount = new AtomicInteger(1);
+ private final Executor executor;
+
+ private static Logger log = Logger.getLogger(AbstractAutoCloseable.class);
+
+ protected AbstractAutoCloseable(final Executor executor) {
+ super(executor);
+ this.executor = executor;
+ }
+
+ protected void safeDec() {
+ try {
+ dec();
+ } catch (Throwable t) {
+ log.trace("Failed to decrement reference count: %s", t);
+ }
+ }
+
+ protected void dec() throws RemotingException {
+ final int v = refcount.decrementAndGet();
+ if (v == 0) {
+ // we dropped the refcount to zero
+ if (refcount.compareAndSet(0, -65536)) {
+ // we are closing
+ close();
+ }
+ // someone incremented it in the meantime... lucky them
+ } else if (v < 0) {
+ // was already closed; put the count back
+ refcount.incrementAndGet();
+ }
+ // otherwise, the resource remains open
+ }
+
+ protected void inc() throws RemotingException {
+ final int v = refcount.getAndIncrement();
+ if (v < 0) {
+ // was already closed
+ refcount.decrementAndGet();
+ throw new RemotingException("Resource is closed");
+ }
+ }
+
+ public void autoClose() throws RemotingException {
+ if (! autoClose.getAndSet(true)) {
+ dec();
+ }
+ }
+
+ public Handle<T> getHandle() throws RemotingException {
+ return new HandleImpl();
+ }
+
+ private final class HandleImpl extends AbstractCloseable<Handle<T>>
implements Handle<T> {
+
+ private HandleImpl() throws RemotingException {
+ super(AbstractAutoCloseable.this.executor);
+ inc();
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ public T getResource() {
+ return (T) AbstractAutoCloseable.this;
+ }
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractCloseable.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractCloseable.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractCloseable.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -0,0 +1,82 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.spi.SpiUtils;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public abstract class AbstractCloseable<T> implements Closeable<T> {
+
+ protected final Executor executor;
+ private final Object closeLock = new Object();
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private Set<CloseHandler<T>> closeHandlers;
+
+ protected AbstractCloseable(final Executor executor) {
+ this.executor = executor;
+ }
+
+ protected boolean isOpen() {
+ return ! closed.get();
+ }
+
+ public void close() throws RemotingException {
+ if (! closed.getAndSet(true)) {
+ synchronized (closeLock) {
+ if (closeHandlers != null) {
+ for (final CloseHandler<T> handler : closeHandlers) {
+ executor.execute(new Runnable() {
+ @SuppressWarnings({ "unchecked" })
+ public void run() {
+ SpiUtils.safeHandleClose(handler, (T)
AbstractCloseable.this);
+ }
+ });
+ }
+ closeHandlers = null;
+ }
+ }
+ }
+ }
+
+ public void addCloseHandler(final CloseHandler<T> handler) {
+ synchronized (closeLock) {
+ if (closeHandlers == null) {
+ closeHandlers = new HashSet<CloseHandler<T>>();
+ }
+ closeHandlers.add(handler);
+ }
+ }
+
+ protected Executor getExecutor() {
+ return executor;
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.util.CollectionUtil;
+
+/**
+ *
+ */
+public abstract class AbstractContextImpl<T> extends AbstractCloseable<T> {
+
+ private final ConcurrentMap<Object, Object> attributes =
CollectionUtil.concurrentMap();
+
+ protected AbstractContextImpl(final Executor executor) {
+ super(executor);
+ }
+
+ public ConcurrentMap<Object, Object> getAttributes() {
+ return attributes;
+ }
+}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientContextImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientContextImpl.java 2008-07-03
00:44:19 UTC (rev 4342)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientContextImpl.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -26,31 +26,32 @@
import org.jboss.cx.remoting.ServiceContext;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.spi.SpiUtils;
+import org.jboss.cx.remoting.util.CollectionUtil;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Set;
+import java.util.HashSet;
/**
*
*/
-public final class ClientContextImpl implements ClientContext {
+public final class ClientContextImpl extends AbstractContextImpl<ClientContext>
implements ClientContext {
- private Executor executor;
+ private final ServiceContextImpl serviceContext;
- public ConcurrentMap<Object, Object> getAttributes() {
- return null;
+ ClientContextImpl(final Executor executor) {
+ super(executor);
+ serviceContext = null;
}
- public ServiceContext getServiceContext() {
- return null;
+ ClientContextImpl(final ServiceContextImpl serviceContext) {
+ super(serviceContext.getExecutor());
+ this.serviceContext = serviceContext;
}
- public void close() throws RemotingException {
+ public ServiceContext getServiceContext() {
+ return serviceContext;
}
-
- public void addCloseHandler(final CloseHandler<ClientContext>
clientContextCloseHandler) {
- }
-
- public Executor getExecutor() {
- return executor;
- }
}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -0,0 +1,83 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.Client;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.FutureReply;
+import org.jboss.cx.remoting.RequestCompletionHandler;
+import org.jboss.cx.remoting.core.util.QueueExecutor;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.ReplyHandler;
+import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public final class ClientImpl<I, O> extends AbstractContextImpl<Client<I,
O>> implements Client<I, O> {
+
+ private final RemoteClientEndpoint<I, O> remoteClientEndpoint;
+
+ ClientImpl(final RemoteClientEndpoint<I, O> remoteClientEndpoint, final
Executor executor) {
+ super(executor);
+ this.remoteClientEndpoint = remoteClientEndpoint;
+ }
+
+ public O invoke(final I request) throws RemotingException, RemoteExecutionException
{
+ if (! isOpen()) {
+ throw new RemotingException("Client is not open");
+ }
+ final QueueExecutor executor = new QueueExecutor();
+ final FutureReplyImpl<O> futureReply = new
FutureReplyImpl<O>(executor);
+ final ReplyHandler<O> replyHandler = futureReply.getReplyHandler();
+ final RemoteRequestContext requestContext =
remoteClientEndpoint.receiveRequest(request, replyHandler);
+ futureReply.setRemoteRequestContext(requestContext);
+ futureReply.addCompletionHandler(new RequestCompletionHandler<O>() {
+ public void notifyComplete(final FutureReply<O> reply) {
+ executor.shutdown();
+ }
+ });
+ executor.runQueue();
+ return futureReply.get();
+ }
+
+ public FutureReply<O> send(final I request) throws RemotingException {
+ if (! isOpen()) {
+ throw new RemotingException("Client is not open");
+ }
+ final FutureReplyImpl<O> futureReply = new
FutureReplyImpl<O>(executor);
+ final ReplyHandler<O> replyHandler = futureReply.getReplyHandler();
+ final RemoteRequestContext requestContext =
remoteClientEndpoint.receiveRequest(request, replyHandler);
+ futureReply.setRemoteRequestContext(requestContext);
+ return futureReply;
+ }
+
+ public void sendOneWay(final I request) throws RemotingException {
+ if (! isOpen()) {
+ throw new RemotingException("Client is not open");
+ }
+ remoteClientEndpoint.receiveRequest(request);
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -0,0 +1,52 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.ClientSource;
+import org.jboss.cx.remoting.Client;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public final class ClientSourceImpl<I, O> extends
AbstractCloseable<ClientSource<I, O>> implements ClientSource<I, O> {
+
+ private final RemoteServiceEndpointLocalImpl<I, O> serviceEndpoint;
+
+ ClientSourceImpl(final RemoteServiceEndpointLocalImpl<I, O> serviceEndpoint,
final Executor executor) {
+ super(executor);
+ this.serviceEndpoint = serviceEndpoint;
+ }
+
+ public Client<I, O> createContext() throws RemotingException {
+ if (! isOpen()) {
+ throw new RemotingException("Client source is not open");
+ }
+ final RemoteClientEndpoint<I,O> clientEndpoint =
serviceEndpoint.openClient();
+ final Client<I, O> client = clientEndpoint.getClient();
+ clientEndpoint.autoClose();
+ return client;
+ }
+}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-03
00:44:19 UTC (rev 4342)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -9,6 +9,7 @@
import org.jboss.cx.remoting.Endpoint;
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.SessionListener;
+import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.core.util.OrderedExecutorFactory;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
@@ -112,12 +113,16 @@
return endpointMap;
}
- public <I, O> RemoteClientEndpoint<I, O> createClient(final
RequestListener<I, O> requestListener) {
- return new RemoteClientEndpointLocalImpl<I, O>(this, requestListener);
+ public <I, O> RemoteClientEndpoint<I, O> createClient(final
RequestListener<I, O> requestListener) throws RemotingException {
+ final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new
RemoteClientEndpointLocalImpl<I, O>(executor, requestListener);
+ clientEndpoint.open();
+ return clientEndpoint;
}
- public <I, O> RemoteServiceEndpoint<I, O> createService(final
RequestListener<I, O> requestListener) {
- return new RemoteServiceEndpointLocalImpl<I, O>(this, requestListener);
+ public <I, O> RemoteServiceEndpoint<I, O> createService(final
RequestListener<I, O> requestListener) throws RemotingException {
+ final RemoteServiceEndpointLocalImpl<I, O> serviceEndpoint = new
RemoteServiceEndpointLocalImpl<I, O>(executor, requestListener);
+ serviceEndpoint.open();
+ return serviceEndpoint;
}
public void addSessionListener(final SessionListener sessionListener) {
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -0,0 +1,360 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.FutureReply;
+import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.RequestCompletionHandler;
+import org.jboss.cx.remoting.util.CollectionUtil;
+import org.jboss.cx.remoting.spi.remote.ReplyHandler;
+import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.cx.remoting.spi.SpiUtils;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
+import java.util.List;
+
+/**
+ *
+ */
+public final class FutureReplyImpl<O> implements FutureReply<O> {
+
+ private final Executor executor;
+ private final ReplyHandler<O> replyHandler = new Handler();
+ private final Object lock = new Object();
+ // @protectedby lock
+ private State state = State.WAITING;
+ // @protectedby lock
+ private RemoteRequestContext remoteRequestContext;
+ // @protectedby lock
+ private O result;
+ // @protectedby lock
+ private Throwable cause;
+ // @protectedby lock
+ private String msg;
+ // @protectedby lock
+ private List<RequestCompletionHandler<O>> completionHandlers;
+
+ public FutureReplyImpl(final Executor executor) {
+ this.executor = executor;
+ }
+
+ private enum State {
+ NEW,
+ WAITING,
+ DONE,
+ CANCELLED,
+ FAILED,
+ }
+
+ void setRemoteRequestContext(final RemoteRequestContext remoteRequestContext) {
+ synchronized (lock) {
+ if (state != State.NEW) {
+ throw new IllegalStateException("Wrong state");
+ }
+ state = State.WAITING;
+ this.remoteRequestContext = remoteRequestContext;
+ }
+ }
+
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ final RemoteRequestContext context;
+ synchronized (lock) {
+ while (state == State.NEW) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+ context = remoteRequestContext;
+ }
+ context.cancel(mayInterruptIfRunning);
+ synchronized (lock) {
+ while (state == State.WAITING) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+ return state == State.CANCELLED;
+ }
+ }
+
+ public FutureReply<O> sendCancel(final boolean mayInterruptIfRunning) {
+ final RemoteRequestContext context;
+ synchronized (lock) {
+ while (state == State.NEW) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ return this;
+ }
+ }
+ context = remoteRequestContext;
+ }
+ context.cancel(mayInterruptIfRunning);
+ return this;
+ }
+
+ public boolean isCancelled() {
+ synchronized (lock) {
+ return state == State.CANCELLED;
+ }
+ }
+
+ public boolean isDone() {
+ synchronized (lock) {
+ return state == State.DONE;
+ }
+ }
+
+ public O get() throws CancellationException, RemoteExecutionException {
+ boolean intr = false;
+ try {
+ synchronized (lock) {
+ while (state == State.WAITING || state == State.NEW) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ }
+ switch (state) {
+ case CANCELLED:
+ throw new CancellationException("Request was
cancelled");
+ case DONE:
+ return result;
+ case FAILED:
+ throw new RemoteExecutionException(msg, cause);
+ default:
+ throw new IllegalStateException("Wrong state");
+ }
+ }
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public O getInterruptibly() throws InterruptedException, CancellationException,
RemoteExecutionException {
+ synchronized (lock) {
+ while (state == State.WAITING || state == State.NEW) {
+ lock.wait();
+ }
+ switch (state) {
+ case CANCELLED:
+ throw new CancellationException("Request was cancelled");
+ case DONE:
+ return result;
+ case FAILED:
+ throw new RemoteExecutionException(msg, cause);
+ default:
+ throw new IllegalStateException("Wrong state");
+ }
+ }
+ }
+
+ public O get(final long timeout, final TimeUnit unit) throws CancellationException,
RemoteExecutionException {
+ if (unit == null) {
+ throw new NullPointerException("unit is null");
+ }
+ if (timeout < 0L) {
+ throw new IllegalArgumentException("timeout is negative");
+ }
+ boolean intr = false;
+ try {
+ synchronized (lock) {
+ long now = System.currentTimeMillis();
+ final long deadline = now + unit.toMillis(timeout);
+ if (deadline < 0L) {
+ return get();
+ }
+ while (state == State.WAITING || state == State.NEW) {
+ try {
+ lock.wait(deadline - now);
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ now = System.currentTimeMillis();
+ }
+ switch (state) {
+ case CANCELLED:
+ throw new CancellationException("Request was
cancelled");
+ case DONE:
+ return result;
+ case FAILED:
+ throw new RemoteExecutionException(msg, cause);
+ default:
+ throw new IllegalStateException("Wrong state");
+ }
+ }
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public O getInterruptibly(final long timeout, final TimeUnit unit) throws
InterruptedException, CancellationException, RemoteExecutionException {
+ if (unit == null) {
+ throw new NullPointerException("unit is null");
+ }
+ if (timeout < 0L) {
+ throw new IllegalArgumentException("timeout is negative");
+ }
+ synchronized (lock) {
+ while (state == State.WAITING || state == State.NEW) {
+ unit.timedWait(lock, timeout);
+ }
+ switch (state) {
+ case CANCELLED:
+ throw new CancellationException("Request was cancelled");
+ case DONE:
+ return result;
+ case FAILED:
+ throw new RemoteExecutionException(msg, cause);
+ case WAITING:
+ case NEW:
+ return null;
+ default:
+ throw new IllegalStateException("Wrong state");
+ }
+ }
+ }
+
+ public FutureReply<O> addCompletionHandler(final
RequestCompletionHandler<O> handler) {
+ synchronized (lock) {
+ switch (state) {
+ case NEW:
+ case WAITING:
+ if (completionHandlers == null) {
+ completionHandlers = CollectionUtil.arrayList();
+ }
+ completionHandlers.add(handler);
+ break;
+ default:
+ SpiUtils.safeHandleRequestCompletion(handler, this);
+ break;
+ }
+ }
+ return this;
+ }
+
+ ReplyHandler<O> getReplyHandler() {
+ return replyHandler;
+ }
+
+ private void runCompletionHandlers() {
+ synchronized (lock) {
+ final List<RequestCompletionHandler<O>> handlers =
completionHandlers;
+ if (handlers != null) {
+ completionHandlers = null;
+ executor.execute(new Runnable() {
+ public void run() {
+ for (RequestCompletionHandler<O> handler : handlers) {
+ SpiUtils.safeHandleRequestCompletion(handler,
FutureReplyImpl.this);
+ }
+ }
+ });
+ }
+ }
+ }
+
+ private final class Handler implements ReplyHandler<O> {
+
+ public void handleReply(final O reply) {
+ synchronized (lock) {
+ while (state == State.NEW) {
+ boolean intr = false;
+ try {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ if (state == State.WAITING) {
+ state = State.DONE;
+ result = reply;
+ runCompletionHandlers();
+ }
+ }
+ }
+
+ public void handleException(final String exMsg, final Throwable exCause) {
+ synchronized (lock) {
+ while (state == State.NEW) {
+ boolean intr = false;
+ try {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ if (state == State.WAITING) {
+ state = State.FAILED;
+ msg = exMsg;
+ cause = exCause;
+ runCompletionHandlers();
+ }
+ }
+ }
+
+ public void handleCancellation() {
+ synchronized (lock) {
+ while (state == State.NEW) {
+ boolean intr = false;
+ try {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ if (state == State.WAITING) {
+ state = State.CANCELLED;
+ runCompletionHandlers();
+ }
+ }
+ }
+ }
+}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java 2008-07-03
00:44:19 UTC (rev 4342)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -25,40 +25,54 @@
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
import org.jboss.cx.remoting.spi.remote.ReplyHandler;
-import org.jboss.cx.remoting.spi.remote.Handle;
import org.jboss.cx.remoting.spi.SpiUtils;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.Client;
-import org.jboss.cx.remoting.CloseHandler;
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.CloseHandler;
import org.jboss.xnio.log.Logger;
import java.util.concurrent.Executor;
/**
*
*/
-public final class RemoteClientEndpointLocalImpl<I, O> implements
RemoteClientEndpoint<I, O> {
+public final class RemoteClientEndpointLocalImpl<I, O> extends
AbstractAutoCloseable<RemoteClientEndpoint<I, O>> implements
RemoteClientEndpoint<I, O> {
- private final EndpointImpl endpointImpl;
private final RequestListener<I, O> requestListener;
private final Executor executor;
- private final ClientContextImpl clientContext = new ClientContextImpl();
+ private final ClientContextImpl clientContext;
private static final Logger log =
Logger.getLogger(RemoteClientEndpointLocalImpl.class);
- public RemoteClientEndpointLocalImpl(final EndpointImpl endpointImpl, final
RequestListener<I, O> requestListener) {
- this.endpointImpl = endpointImpl;
+ private RemoteClientEndpointLocalImpl(final Executor executor, final
RequestListener<I, O> requestListener, final ClientContextImpl clientContext) {
+ super(executor);
+ this.executor = executor;
this.requestListener = requestListener;
- executor = endpointImpl.getExecutor();
+ this.clientContext = clientContext;
}
- public RemoteClientEndpointLocalImpl(final EndpointImpl endpointImpl, final
RemoteServiceEndpointLocalImpl<I, O> service, final RequestListener<I, O>
requestListener) {
- this.endpointImpl = endpointImpl;
- this.requestListener = requestListener;
- executor = endpointImpl.getExecutor();
+ RemoteClientEndpointLocalImpl(final Executor executor, final
RemoteServiceEndpointLocalImpl<I, O> service, final RequestListener<I, O>
requestListener) {
+ this(executor, requestListener, new
ClientContextImpl(service.getServiceContext()));
}
+ RemoteClientEndpointLocalImpl(final Executor executor, final RequestListener<I,
O> requestListener) {
+ this(executor, requestListener, new ClientContextImpl(executor));
+ }
+
+ public void receiveRequest(final I request) {
+ final RequestContextImpl<O> context = new
RequestContextImpl<O>(clientContext);
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ requestListener.handleRequest(context, request);
+ } catch (Throwable t) {
+ log.error(t, "Unexpected exception in request listener");
+ }
+ }
+ });
+ }
+
public RemoteRequestContext receiveRequest(final I request, final
ReplyHandler<O> replyHandler) {
final RequestContextImpl<O> context = new
RequestContextImpl<O>(replyHandler, clientContext);
executor.execute(new Runnable() {
@@ -73,26 +87,45 @@
}
});
return new RemoteRequestContext() {
- public void cancel() {
- context.cancel();
+ public void cancel(final boolean mayInterrupt) {
+ context.cancel(mayInterrupt);
}
};
}
- public Handle<RemoteClientEndpoint<I, O>> getHandle() throws
RemotingException {
- return null;
- }
-
public Client<I, O> getClient() throws RemotingException {
- return null;
+ inc();
+ boolean ok = false;
+ try {
+ final ClientImpl<I, O> client = new ClientImpl<I, O>(this,
executor);
+ client.addCloseHandler(new CloseHandler<Client<I, O>>() {
+ public void handleClose(final Client<I, O> closed) {
+ safeDec();
+ }
+ });
+ ok = true;
+ return client;
+ } finally {
+ if (! ok) {
+ safeDec();
+ }
+ }
}
- public void autoClose() {
+ void open() throws RemotingException {
+ try {
+ requestListener.handleClientOpen(clientContext);
+ addCloseHandler(new CloseHandler<RemoteClientEndpoint<I, O>>() {
+ public void handleClose(final RemoteClientEndpoint<I, O> closed) {
+ try {
+ requestListener.handleClientClose(clientContext);
+ } catch (Throwable t) {
+ log.error(t, "Unexpected exception in request listener
client close handler method");
+ }
+ }
+ });
+ } catch (Throwable t) {
+ throw new RemotingException("Failed to open client context", t);
+ }
}
-
- public void close() throws RemotingException {
- }
-
- public void addCloseHandler(final CloseHandler<RemoteClientEndpoint<I,
O>> handler) {
- }
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-03
00:44:19 UTC (rev 4342)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -24,43 +24,78 @@
import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
-import org.jboss.cx.remoting.spi.remote.Handle;
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.ClientSource;
import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.xnio.log.Logger;
+import java.util.concurrent.Executor;
/**
*
*/
-public final class RemoteServiceEndpointLocalImpl<I, O> implements
RemoteServiceEndpoint<I, O> {
+public final class RemoteServiceEndpointLocalImpl<I, O> extends
AbstractAutoCloseable<RemoteServiceEndpoint<I, O>> implements
RemoteServiceEndpoint<I, O> {
- private final EndpointImpl endpointImpl;
private final RequestListener<I, O> requestListener;
+ private final ServiceContextImpl serviceContext;
+ private final Executor executor;
- public RemoteServiceEndpointLocalImpl(final EndpointImpl endpointImpl, final
RequestListener<I, O> requestListener) {
- this.endpointImpl = endpointImpl;
+ private static final Logger log =
Logger.getLogger(RemoteServiceEndpointLocalImpl.class);
+
+ RemoteServiceEndpointLocalImpl(final Executor executor, final RequestListener<I,
O> requestListener) {
+ super(executor);
this.requestListener = requestListener;
+ this.executor = executor;
+ serviceContext = new ServiceContextImpl(executor);
}
public RemoteClientEndpoint<I, O> openClient() throws RemotingException {
- return new RemoteClientEndpointLocalImpl<I, O>(endpointImpl, this,
requestListener);
+ if (isOpen()) {
+ final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new
RemoteClientEndpointLocalImpl<I, O>(executor, this, requestListener);
+ clientEndpoint.open();
+ return clientEndpoint;
+ } else {
+ throw new RemotingException("RemotingServiceEndpoint is closed");
+ }
}
- public Handle<RemoteServiceEndpoint<I, O>> getHandle() throws
RemotingException {
- return null;
- }
-
public ClientSource<I, O> getClientSource() throws RemotingException {
- return null;
+ inc();
+ boolean ok = false;
+ try {
+ final ClientSourceImpl<I, O> clientSource = new ClientSourceImpl<I,
O>(this, executor);
+ clientSource.addCloseHandler(new CloseHandler<ClientSource<I,
O>>() {
+ public void handleClose(final ClientSource<I, O> closed) {
+ safeDec();
+ }
+ });
+ ok = true;
+ return clientSource;
+ } finally {
+ if (! ok) {
+ safeDec();
+ }
+ }
}
- public void autoClose() {
+ void open() throws RemotingException {
+ try {
+ requestListener.handleServiceOpen(serviceContext);
+ addCloseHandler(new CloseHandler<RemoteServiceEndpoint<I, O>>()
{
+ public void handleClose(final RemoteServiceEndpoint<I, O> closed)
{
+ try {
+ requestListener.handleServiceClose(serviceContext);
+ } catch (Throwable t) {
+ log.error(t, "Unexpected exception in request listener
client close handler method");
+ }
+ }
+ });
+ } catch (Throwable t) {
+ throw new RemotingException("Failed to open client context", t);
+ }
}
- public void close() throws RemotingException {
+ ServiceContextImpl getServiceContext() {
+ return serviceContext;
}
-
- public void addCloseHandler(final CloseHandler<RemoteServiceEndpoint<I,
O>> handler) {
- }
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java 2008-07-03
00:44:19 UTC (rev 4342)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -54,6 +54,12 @@
executor = new TaggingExecutor(clientContext.getExecutor());
}
+ RequestContextImpl(final ClientContextImpl clientContext) {
+ this.clientContext = clientContext;
+ executor = new TaggingExecutor(clientContext.getExecutor());
+ replyHandler = null;
+ }
+
public ClientContext getContext() {
return clientContext;
}
@@ -64,7 +70,9 @@
public void sendReply(final O reply) throws RemotingException, IllegalStateException
{
if (! closed.getAndSet(true)) {
- replyHandler.handleReply(reply);
+ if (replyHandler != null) {
+ replyHandler.handleReply(reply);
+ }
} else {
throw new IllegalStateException("Reply already sent");
}
@@ -72,7 +80,9 @@
public void sendFailure(final String msg, final Throwable cause) throws
RemotingException, IllegalStateException {
if (! closed.getAndSet(true)) {
- replyHandler.handleException(msg, cause);
+ if (replyHandler != null) {
+ replyHandler.handleException(msg, cause);
+ }
} else {
throw new IllegalStateException("Reply already sent");
}
@@ -80,7 +90,9 @@
public void sendCancelled() throws RemotingException, IllegalStateException {
if (! closed.getAndSet(true)) {
- replyHandler.handleCancellation();
+ if (replyHandler != null) {
+ replyHandler.handleCancellation();
+ }
} else {
throw new IllegalStateException("Reply already sent");
}
@@ -103,17 +115,23 @@
executor.execute(command);
}
- protected void cancel() {
+ protected void cancel(final boolean mayInterrupt) {
if (! cancelled.getAndSet(true)) {
synchronized (cancelLock) {
if (cancelHandlers != null) {
- for (RequestCancelHandler<O> handler : cancelHandlers) {
- SpiUtils.safeNotifyCancellation(handler, this, false);
+ for (final RequestCancelHandler<O> handler : cancelHandlers) {
+ executor.execute(new Runnable() {
+ public void run() {
+ SpiUtils.safeNotifyCancellation(handler,
RequestContextImpl.this, mayInterrupt);
+ }
+ });
}
cancelHandlers = null;
}
}
- executor.interruptAll();
+ if (mayInterrupt) {
+ executor.interruptAll();
+ }
}
}
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceContextImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceContextImpl.java 2008-07-03
00:44:19 UTC (rev 4342)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceContextImpl.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -25,20 +25,15 @@
import org.jboss.cx.remoting.ServiceContext;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.ClientContext;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
/**
*
*/
-public final class ServiceContextImpl implements ServiceContext {
-
- public ConcurrentMap<Object, Object> getAttributes() {
- return null;
+public final class ServiceContextImpl extends AbstractContextImpl<ServiceContext>
implements ServiceContext {
+ protected ServiceContextImpl(final Executor executor) {
+ super(executor);
}
-
- public void close() throws RemotingException {
- }
-
- public void addCloseHandler(final CloseHandler<ServiceContext>
serviceContextCloseHandler) {
- }
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/QueueExecutor.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/QueueExecutor.java 2008-07-03
00:44:19 UTC (rev 4342)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/QueueExecutor.java 2008-07-03
04:48:51 UTC (rev 4343)
@@ -32,6 +32,7 @@
state = State.RUNNING;
queue.notify();
// fall thru
+ case STOPPING:
case RUNNING:
queue.add(command);
break;
@@ -51,7 +52,7 @@
try {
queue.wait();
} catch (InterruptedException e) {
- e.printStackTrace();
+ intr = true;
}
}
if (state == State.DOWN) {