JBoss Remoting SVN: r4403 - remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-18 20:53:56 -0400 (Fri, 18 Jul 2008)
New Revision: 4403
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientContextImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceContextImpl.java
Log:
More logging improvements
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java 2008-07-19 00:37:56 UTC (rev 4402)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java 2008-07-19 00:53:56 UTC (rev 4403)
@@ -49,4 +49,8 @@
protected boolean isOpen() {
return super.isOpen();
}
+
+ public String toString() {
+ return "generic context instance <" + Integer.toString(hashCode()) + ">";
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientContextImpl.java 2008-07-19 00:37:56 UTC (rev 4402)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientContextImpl.java 2008-07-19 00:53:56 UTC (rev 4403)
@@ -54,4 +54,8 @@
public ServiceContext getServiceContext() {
return serviceContext;
}
+
+ public String toString() {
+ return "client context instance <" + Integer.toString(hashCode()) + ">";
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java 2008-07-19 00:37:56 UTC (rev 4402)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java 2008-07-19 00:53:56 UTC (rev 4403)
@@ -80,4 +80,8 @@
}
remoteClientEndpoint.receiveRequest(request);
}
+
+ public String toString() {
+ return "client instance <" + Integer.toString(hashCode()) + ">";
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-19 00:37:56 UTC (rev 4402)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-19 00:53:56 UTC (rev 4403)
@@ -58,4 +58,8 @@
IoUtils.safeClose(handle);
}
}
+
+ public String toString() {
+ return "client source instance <" + Integer.toString(hashCode()) + ">";
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-19 00:37:56 UTC (rev 4402)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-19 00:53:56 UTC (rev 4403)
@@ -184,4 +184,8 @@
}
}
}
+
+ public String toString() {
+ return "endpoint instance <" + Integer.toString(hashCode()) + ">";
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceContextImpl.java 2008-07-19 00:37:56 UTC (rev 4402)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceContextImpl.java 2008-07-19 00:53:56 UTC (rev 4403)
@@ -36,4 +36,8 @@
protected ServiceContextImpl(final Executor executor) {
super(executor);
}
+
+ public String toString() {
+ return "service context instance <" + Integer.toString(hashCode()) + ">";
+ }
}
16 years, 5 months
JBoss Remoting SVN: r4402 - remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-18 20:37:56 -0400 (Fri, 18 Jul 2008)
New Revision: 4402
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
Log:
More logging
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java 2008-07-19 00:31:18 UTC (rev 4401)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java 2008-07-19 00:37:56 UTC (rev 4402)
@@ -56,6 +56,7 @@
throw new NullPointerException("executor is null");
}
this.executor = executor;
+ log.trace("Opened %s", this);
}
/**
16 years, 5 months
JBoss Remoting SVN: r4401 - in remoting3/trunk: core/src/main/java/org/jboss/cx/remoting/core and 1 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-18 20:31:18 -0400 (Fri, 18 Jul 2008)
New Revision: 4401
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
Log:
Yet more logging improvements
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java 2008-07-19 00:28:25 UTC (rev 4400)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java 2008-07-19 00:31:18 UTC (rev 4401)
@@ -117,7 +117,7 @@
}
public String toString() {
- return "handle to " + String.valueOf(AbstractAutoCloseable.this);
+ return "handle <" + Integer.toString(hashCode(), 16) + "> to " + String.valueOf(AbstractAutoCloseable.this);
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java 2008-07-19 00:28:25 UTC (rev 4400)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java 2008-07-19 00:31:18 UTC (rev 4401)
@@ -113,6 +113,6 @@
}
public String toString() {
- return "local client endpoint (request listener = " + String.valueOf(requestListener) + ")";
+ return "local client endpoint <" + Integer.toString(hashCode(), 16) + "> (request listener = " + String.valueOf(requestListener) + ")";
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-19 00:28:25 UTC (rev 4400)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-19 00:31:18 UTC (rev 4401)
@@ -82,6 +82,6 @@
}
public String toString() {
- return "local service endpoint (request listener = " + String.valueOf(requestListener) + ")";
+ return "local service endpoint <" + Integer.toString(hashCode(), 16) + "> (request listener = " + String.valueOf(requestListener) + ")";
}
}
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-19 00:28:25 UTC (rev 4400)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-19 00:31:18 UTC (rev 4401)
@@ -561,7 +561,7 @@
}
public String toString() {
- return "forwarded client endpoint (id = " + identifier + ")";
+ return "forwarded client endpoint <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
}
}
@@ -645,7 +645,7 @@
}
public String toString() {
- return "forwarded service endpoint (id = " + identifier + ")";
+ return "forwarded service endpoint <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
}
}
}
16 years, 5 months
JBoss Remoting SVN: r4400 - in remoting3/trunk: core/src/main/java/org/jboss/cx/remoting/core and 1 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-18 20:28:25 -0400 (Fri, 18 Jul 2008)
New Revision: 4400
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
Log:
Fix logging and toString() to be more readable and useful
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java 2008-07-19 00:16:52 UTC (rev 4399)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java 2008-07-19 00:28:25 UTC (rev 4400)
@@ -60,7 +60,7 @@
final int v = refcount.decrementAndGet();
if (v == 0) {
// we dropped the refcount to zero
- log.trace("Refcount of %s dropped to zero, closing", this);
+ log.trace("Lowering reference count of %s to 0 (closing)", this);
if (refcount.compareAndSet(0, -65536)) {
// we are closing
close();
@@ -70,7 +70,7 @@
// was already closed; put the count back
refcount.incrementAndGet();
} else {
- log.trace("Clearing reference to %s to %d", this, Integer.valueOf(v));
+ log.trace("Lowering reference count of %s to %d", this, Integer.valueOf(v));
}
// otherwise, the resource remains open
}
@@ -82,7 +82,7 @@
*/
protected void inc() throws RemotingException {
final int v = refcount.getAndIncrement();
- log.trace("Adding reference to %s to %d", this, Integer.valueOf(v + 1));
+ log.trace("Raising reference count of %s to %d", this, Integer.valueOf(v + 1));
if (v < 0) {
// was already closed
refcount.decrementAndGet();
@@ -115,5 +115,13 @@
public T getResource() {
return (T) AbstractAutoCloseable.this;
}
+
+ public String toString() {
+ return "handle to " + String.valueOf(AbstractAutoCloseable.this);
+ }
}
+
+ public String toString() {
+ return "generic resource <" + Integer.toString(hashCode(), 16) + ">";
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java 2008-07-19 00:16:52 UTC (rev 4399)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java 2008-07-19 00:28:25 UTC (rev 4400)
@@ -111,4 +111,8 @@
throw new RemotingException("Failed to open client context", t);
}
}
+
+ public String toString() {
+ return "local client endpoint (request listener = " + String.valueOf(requestListener) + ")";
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-19 00:16:52 UTC (rev 4399)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-19 00:28:25 UTC (rev 4400)
@@ -80,4 +80,8 @@
ServiceContextImpl getServiceContext() {
return serviceContext;
}
+
+ public String toString() {
+ return "local service endpoint (request listener = " + String.valueOf(requestListener) + ")";
+ }
}
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-19 00:16:52 UTC (rev 4399)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-19 00:28:25 UTC (rev 4400)
@@ -559,6 +559,10 @@
return SpiUtils.getBlankRemoteRequestContext();
}
}
+
+ public String toString() {
+ return "forwarded client endpoint (id = " + identifier + ")";
+ }
}
public final class RemoteRequestContextImpl implements RemoteRequestContext {
@@ -639,5 +643,9 @@
}
}
}
+
+ public String toString() {
+ return "forwarded service endpoint (id = " + identifier + ")";
+ }
}
}
16 years, 5 months
JBoss Remoting SVN: r4399 - remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-18 20:16:52 -0400 (Fri, 18 Jul 2008)
New Revision: 4399
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
Log:
Javadoc
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java 2008-07-19 00:04:32 UTC (rev 4398)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java 2008-07-19 00:16:52 UTC (rev 4399)
@@ -31,7 +31,8 @@
import org.jboss.xnio.log.Logger;
/**
- *
+ * A closeable implementation that supports reference counting. Since the initial reference count is zero, implementors
+ * must be careful to ensure that the first operation invoked is a call to {@link #getHandle()}.
*/
public abstract class AbstractAutoCloseable<T> extends AbstractCloseable<T> {
@@ -40,19 +41,21 @@
private static final Logger log = Logger.getLogger(AbstractAutoCloseable.class);
+ /**
+ * Basic constructor.
+ *
+ * @param executor the executor used to execute the close notification handlers
+ */
protected AbstractAutoCloseable(final Executor executor) {
super(executor);
this.executor = executor;
}
- protected void safeDec() {
- try {
- dec();
- } catch (Throwable t) {
- log.trace("Failed to decrement reference count: %s", t);
- }
- }
-
+ /**
+ * Decrement the reference count by one. If the count drops to zero, the resource is closed.
+ *
+ * @throws RemotingException if the reference count dropped to zero and the close operation threw an exception
+ */
protected void dec() throws RemotingException {
final int v = refcount.decrementAndGet();
if (v == 0) {
@@ -72,6 +75,11 @@
// otherwise, the resource remains open
}
+ /**
+ * Increment the reference count by one. If the resource is closed, an exception is thrown.
+ *
+ * @throws RemotingException if the resource is closed
+ */
protected void inc() throws RemotingException {
final int v = refcount.getAndIncrement();
log.trace("Adding reference to %s to %d", this, Integer.valueOf(v + 1));
@@ -82,6 +90,13 @@
}
}
+ /**
+ * Get a handle to this resource. Increments the reference count by one. If the resource is closed, an exception
+ * is thrown.
+ *
+ * @return the handle
+ * @throws RemotingException if the resource is closed
+ */
public Handle<T> getHandle() throws RemotingException {
return new HandleImpl();
}
16 years, 5 months
JBoss Remoting SVN: r4398 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting and 9 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-18 20:04:32 -0400 (Fri, 18 Jul 2008)
New Revision: 4398
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java
remoting3/trunk/build.xml
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
Remove auto-closing; replace with a root handle notion
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -3,6 +3,7 @@
import java.util.concurrent.ConcurrentMap;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.Handle;
/**
* A potential participant in a JBoss Remoting communications relationship.
@@ -34,10 +35,10 @@
* @param <I> the request type
* @param <O> the reply type
* @param requestListener the request listener
- * @return the client
+ * @return a handle for the client
* @throws RemotingException if an error occurs
*/
- <I, O> RemoteClientEndpoint createClientEndpoint(RequestListener<I, O> requestListener) throws RemotingException;
+ <I, O> Handle<RemoteClientEndpoint> createClientEndpoint(RequestListener<I, O> requestListener) throws RemotingException;
/**
* Create a client source that can be used to acquire clients associated with a request listener on this endpoint.
@@ -49,10 +50,10 @@
* @param <I> the request type
* @param <O> the reply type
* @param requestListener the request listener
- * @return the context source
+ * @return a handle for the client source
* @throws RemotingException if an error occurs
*/
- <I, O> RemoteServiceEndpoint createServiceEndpoint(RequestListener<I, O> requestListener) throws RemotingException;
+ <I, O> Handle<RemoteServiceEndpoint> createServiceEndpoint(RequestListener<I, O> requestListener) throws RemotingException;
/**
* Create a client from a remote client endpoint.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
import org.jboss.cx.remoting.spi.remote.Handle;
import org.jboss.xnio.log.Logger;
@@ -34,8 +35,7 @@
*/
public abstract class AbstractAutoCloseable<T> extends AbstractCloseable<T> {
- private final AtomicBoolean autoClose = new AtomicBoolean();
- private final AtomicInteger refcount = new AtomicInteger(1);
+ private final AtomicInteger refcount = new AtomicInteger(0);
private final Executor executor;
private static final Logger log = Logger.getLogger(AbstractAutoCloseable.class);
@@ -82,24 +82,17 @@
}
}
- public void autoClose() throws RemotingException {
- if (! autoClose.getAndSet(true)) {
- dec();
- }
- }
-
public Handle<T> getHandle() throws RemotingException {
return new HandleImpl();
}
private final class HandleImpl extends AbstractCloseable<Handle<T>> implements Handle<T> {
-
private HandleImpl() throws RemotingException {
super(AbstractAutoCloseable.this.executor);
inc();
}
- public void close() throws RemotingException {
+ protected void closeAction() throws RemotingException {
dec();
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -34,7 +34,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
/**
- *
+ * A basic implementation of a closeable resource. Use as a convenient base class for your closeable resources.
+ * Ensures that the {@code close()} method is idempotent; implements the registry of close handlers.
*/
public abstract class AbstractCloseable<T> implements Closeable<T> {
@@ -45,6 +46,11 @@
private final AtomicBoolean closed = new AtomicBoolean();
private Set<CloseHandler<? super T>> closeHandlers;
+ /**
+ * Basic constructor.
+ *
+ * @param executor the executor used to execute the close notification handlers
+ */
protected AbstractCloseable(final Executor executor) {
if (executor == null) {
throw new NullPointerException("executor is null");
@@ -52,11 +58,27 @@
this.executor = executor;
}
+ /**
+ * Read the status of this resource. This is just a snapshot in time; there is no guarantee that the resource
+ * will remain open for any amount of time, even if this method returns {@code true}.
+ *
+ * @return {@code true} if the resource is still open
+ */
protected boolean isOpen() {
return ! closed.get();
}
- public void close() throws RemotingException {
+ /**
+ * Called exactly once when the {@code close()} method is invoked; the actual close operation should take place here.
+ *
+ * @throws RemotingException if the close failed
+ */
+ protected void closeAction() throws RemotingException {}
+
+ /**
+ * {@inheritDoc}
+ */
+ public final void close() throws RemotingException {
if (! closed.getAndSet(true)) {
log.trace("Closed %s", this);
synchronized (closeLock) {
@@ -72,9 +94,13 @@
closeHandlers = null;
}
}
+ closeAction();
}
}
+ /**
+ * {@inheritDoc}
+ */
public void addCloseHandler(final CloseHandler<? super T> handler) {
synchronized (closeLock) {
if (closeHandlers == null) {
@@ -84,10 +110,18 @@
}
}
+ /**
+ * Get the executor to use for handler invocation.
+ *
+ * @return the executor
+ */
protected Executor getExecutor() {
return executor;
}
+ /**
+ * Finalize this closeable instance. If the instance hasn't been closed, it is closed and a warning is logged.
+ */
protected void finalize() throws Throwable {
try {
super.finalize();
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -136,22 +136,6 @@
private static final RemoteRequestContext BLANK_REMOTE_REQUEST_CONTEXT = new BlankRemoteRequestContext();
- public static void safeAutoClose(final RemoteClientEndpoint remoteClientEndpoint) {
- try {
- remoteClientEndpoint.autoClose();
- } catch (Throwable t) {
- log.error("Failed to set autoClose on %s: %s", remoteClientEndpoint, t);
- }
- }
-
- public static void safeAutoClose(final RemoteServiceEndpoint remoteServiceEndpoint) {
- try {
- remoteServiceEndpoint.autoClose();
- } catch (Throwable t) {
- log.error("Failed to set autoClose on %s: %s", remoteServiceEndpoint, t);
- }
- }
-
private static final class BlankRemoteRequestContext implements RemoteRequestContext {
public void cancel(final boolean mayInterrupt) {
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -64,11 +64,6 @@
Handle<RemoteClientEndpoint> getHandle() throws RemotingException;
/**
- * Automatically close this client endpoint when all handles and local client instances are closed.
- */
- void autoClose() throws RemotingException;
-
- /**
* Close this client endpoint. The outcome of any outstanding requests is not defined, though implementations
* should make an effort to cancel any outstanding requests.
*
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -40,7 +40,7 @@
* @return a client endpoint
* @throws RemotingException if a client could not be opened
*/
- RemoteClientEndpoint createClientEndpoint() throws RemotingException;
+ Handle<RemoteClientEndpoint> createClientEndpoint() throws RemotingException;
/**
* Get a handle to this service endpoint. The service endpoint will not auto-close as long as there is at least
@@ -54,12 +54,6 @@
Handle<RemoteServiceEndpoint> getHandle() throws RemotingException;
/**
- * Automatically close this service endpoint when all handles and local client source instances
- * are closed.
- */
- void autoClose() throws RemotingException;
-
- /**
* Close this service endpoint immediately.
*/
void close() throws RemotingException;
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -8,6 +8,7 @@
import org.jboss.cx.remoting.ClientSource;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.Handle;
/**
* A simple delegating wrapper for endpoints.
@@ -41,14 +42,14 @@
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> RemoteClientEndpoint createClientEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
+ public <I, O> Handle<RemoteClientEndpoint> createClientEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
return delegate.createClientEndpoint(requestListener);
}
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> RemoteServiceEndpoint createServiceEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
+ public <I, O> Handle<RemoteServiceEndpoint> createServiceEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
return delegate.createServiceEndpoint(requestListener);
}
Modified: remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java
===================================================================
--- remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -78,6 +78,7 @@
final AbstractAutoCloseable<Object> closeable = new AbstractAutoCloseable<Object>(executorService) {
// empty
};
+ final Handle<Object> rootHandle = closeable.getHandle();
try {
closeable.addCloseHandler(new CloseHandler<Object>() {
public void handleClose(final Object x) {
@@ -87,7 +88,7 @@
});
assertTrue(closeable.isOpen());
assertFalse(closed.get());
- closeable.autoClose();
+ rootHandle.close();
assertTrue(latch.await(500L, TimeUnit.MILLISECONDS));
assertFalse(closeable.isOpen());
assertTrue(closed.get());
@@ -107,6 +108,7 @@
final AbstractAutoCloseable<Object> closeable = new AbstractAutoCloseable<Object>(executorService) {
// empty
};
+ final Handle<Object> rootHandle = closeable.getHandle();
try {
closeable.addCloseHandler(new CloseHandler<Object>() {
public void handleClose(final Object x) {
@@ -119,7 +121,7 @@
final Handle<Object> h1 = closeable.getHandle();
assertTrue(closeable.isOpen());
assertFalse(closed.get());
- closeable.autoClose();
+ rootHandle.close();
assertTrue(closeable.isOpen());
assertFalse(closed.get());
h1.close();
@@ -142,6 +144,7 @@
final AbstractAutoCloseable<Object> closeable = new AbstractAutoCloseable<Object>(executorService) {
// empty
};
+ final Handle<Object> rootHandle = closeable.getHandle();
try {
closeable.addCloseHandler(new CloseHandler<Object>() {
public void handleClose(final Object x) {
@@ -156,7 +159,7 @@
final Handle<Object> h3 = closeable.getHandle();
assertTrue(closeable.isOpen());
assertFalse(closed.get());
- closeable.autoClose();
+ rootHandle.close();
assertTrue(closeable.isOpen());
assertFalse(closed.get());
h1.close();
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/build.xml 2008-07-19 00:04:32 UTC (rev 4398)
@@ -849,6 +849,7 @@
<path refid="api.classpath"/>
<path refid="core.classpath"/>
<path refid="util.classpath"/>
+ <pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
<touch file="standalone/target/main/.lastcompile" verbose="false"/>
@@ -858,7 +859,7 @@
<delete dir="standalone/target"/>
</target>
- <target name="standalone" description="Build the standalone module" depends="api,core,util,standalone.compile">
+ <target name="standalone" description="Build the standalone module" depends="lib.xnio-api,api,core,util,standalone.compile">
<path id="standalone.classpath">
<pathelement location="standalone/target/main/classes"/>
</path>
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -28,7 +28,9 @@
import org.jboss.cx.remoting.Endpoint;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.Handle;
import org.jboss.cx.remoting.spi.AbstractCloseable;
+import org.jboss.xnio.IoUtils;
/**
*
@@ -48,9 +50,12 @@
if (! isOpen()) {
throw new RemotingException("Client source is not open");
}
- final RemoteClientEndpoint clientEndpoint = serviceEndpoint.createClientEndpoint();
- final Client<I, O> client = endpoint.createClient(clientEndpoint);
- clientEndpoint.autoClose();
- return client;
+ final Handle<RemoteClientEndpoint> handle = serviceEndpoint.createClientEndpoint();
+ try {
+ final Client<I, O> client = endpoint.createClient(handle.getResource());
+ return client;
+ } finally {
+ IoUtils.safeClose(handle);
+ }
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -121,18 +121,18 @@
return endpointMap;
}
- public <I, O> RemoteClientEndpoint createClientEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
+ public <I, O> Handle<RemoteClientEndpoint> createClientEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new RemoteClientEndpointLocalImpl<I, O>(executor, requestListener);
clientEndpoint.addCloseHandler(remover);
clientEndpoint.open();
- return clientEndpoint;
+ return clientEndpoint.getHandle();
}
- public <I, O> RemoteServiceEndpoint createServiceEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
+ public <I, O> Handle<RemoteServiceEndpoint> createServiceEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
final RemoteServiceEndpointLocalImpl<I, O> serviceEndpoint = new RemoteServiceEndpointLocalImpl<I, O>(executor, requestListener);
serviceEndpoint.addCloseHandler(remover);
serviceEndpoint.open();
- return serviceEndpoint;
+ return serviceEndpoint.getHandle();
}
public <I, O> Client<I, O> createClient(final RemoteClientEndpoint endpoint) throws RemotingException {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -24,6 +24,7 @@
import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.Handle;
import org.jboss.cx.remoting.spi.AbstractAutoCloseable;
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.RemotingException;
@@ -49,11 +50,11 @@
serviceContext = new ServiceContextImpl(executor);
}
- public RemoteClientEndpoint createClientEndpoint() throws RemotingException {
+ public Handle<RemoteClientEndpoint> createClientEndpoint() throws RemotingException {
if (isOpen()) {
final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new RemoteClientEndpointLocalImpl<I, O>(executor, this, requestListener);
clientEndpoint.open();
- return clientEndpoint;
+ return clientEndpoint.getHandle();
} else {
throw new RemotingException("RemotingServiceEndpoint is closed");
}
Modified: remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java
===================================================================
--- remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -35,6 +35,7 @@
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.test.support.LoggingHelper;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.Handle;
import org.jboss.xnio.IoUtils;
/**
@@ -78,7 +79,7 @@
endpoint.setExecutor(executorService);
endpoint.start();
try {
- final RemoteClientEndpoint clientEndpoint = endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
+ final Handle<RemoteClientEndpoint> handle = endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
assertEquals(request, requestObj);
try {
@@ -92,6 +93,7 @@
}
}
});
+ final RemoteClientEndpoint clientEndpoint = handle.getResource();
try {
clientEndpoint.addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
public void handleClose(final RemoteClientEndpoint closed) {
@@ -106,7 +108,6 @@
}
});
assertEquals(replyObj, client.invoke(requestObj));
- clientEndpoint.autoClose();
client.close();
} finally {
IoUtils.safeClose(client);
@@ -137,7 +138,7 @@
endpoint.setExecutor(executorService);
endpoint.start();
try {
- final RemoteClientEndpoint clientEndpoint = endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
+ final Handle<RemoteClientEndpoint> handle = endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
assertEquals(request, requestObj);
try {
@@ -151,6 +152,7 @@
}
}
});
+ final RemoteClientEndpoint clientEndpoint = handle.getResource();
try {
clientEndpoint.addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
public void handleClose(final RemoteClientEndpoint closed) {
@@ -165,7 +167,6 @@
}
});
assertEquals(replyObj, client.send(requestObj).get());
- clientEndpoint.autoClose();
client.close();
} finally {
IoUtils.safeClose(client);
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -269,7 +269,6 @@
break;
}
final RemoteServiceEndpoint serviceEndpoint = handle.getResource();
- final RemoteClientEndpoint clientEndpoint = serviceEndpoint.createClientEndpoint();
break;
}
@@ -617,7 +616,7 @@
});
}
- public RemoteClientEndpoint createClientEndpoint() throws RemotingException {
+ public Handle<RemoteClientEndpoint> createClientEndpoint() throws RemotingException {
final int id = openClientFromService();
final ByteBuffer buffer = allocator.allocate();
buffer.putInt(identifier);
@@ -629,7 +628,7 @@
try {
registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
try {
- return new RemoteClientEndpointImpl(id, marshallerFactory, allocator);
+ return new RemoteClientEndpointImpl(id, marshallerFactory, allocator).getHandle();
} finally {
if (intr) {
Thread.currentThread().interrupt();
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -62,16 +62,8 @@
return new IoHandlerFactory<AllocatedMessageChannel>() {
public IoHandler<? super AllocatedMessageChannel> createHandler() {
try {
- final RemoteClientEndpoint remoteClientEndpoint = localRootSource.createClientEndpoint();
- try {
- return new BasicHandler(true, allocator, remoteClientEndpoint, executor, remoteListener, new JavaSerializationMarshallerFactory(executor));
- } finally {
- try {
- remoteClientEndpoint.autoClose();
- } catch (RemotingException e) {
- log.error(e, "Error setting auto-close mode");
- }
- }
+ final RemoteClientEndpoint remoteClientEndpoint = localRootSource.createClientEndpoint().getResource();
+ return new BasicHandler(true, allocator, remoteClientEndpoint, executor, remoteListener, new JavaSerializationMarshallerFactory(executor));
} catch (RemotingException e) {
throw new IllegalStateException("The local root endpoint is unusable", e);
}
@@ -95,7 +87,7 @@
return new AbstractConvertingIoFuture<RemoteClientEndpoint, AllocatedMessageChannel>(futureChannel) {
protected RemoteClientEndpoint convert(final AllocatedMessageChannel channel) throws RemotingException {
final RemoteClientEndpoint remoteClientEndpoint = basicHandler.getRemoteClient(0);
- return (RemoteClientEndpoint) remoteClientEndpoint;
+ return remoteClientEndpoint;
}
};
}
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -23,6 +23,7 @@
package org.jboss.cx.remoting.protocol.basic;
import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.Handle;
import org.jboss.cx.remoting.RemotingException;
/**
@@ -35,5 +36,5 @@
void unbind(int id) throws RemotingException;
- RemoteServiceEndpoint lookup(int id) throws RemotingException;
+ Handle<RemoteServiceEndpoint> lookup(int id) throws RemotingException;
}
Modified: remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -88,109 +88,6 @@
final EndpointImpl endpoint = new EndpointImpl();
endpoint.setExecutor(executorService);
endpoint.start();
- try {
- final RemoteServiceEndpoint serverServiceEndpoint = endpoint.createServiceEndpoint(new RequestListener<Object, Object>() {
- public void handleClientOpen(final ClientContext context) {
- if (clientOpened.getAndSet(true)) {
- if (client2Opened.getAndSet(true)) {
- problems.add(new IllegalStateException("Too many client opens"));
- }
- }
- }
-
- public void handleServiceOpen(final ServiceContext context) {
- if (serviceOpened.getAndSet(true)) {
- problems.add(new IllegalStateException("Multiple service opens"));
- }
- }
-
- public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
- try {
- System.out.println("Received request; sending response!");
- context.sendReply("response");
- } catch (RemotingException e) {
- try {
- context.sendFailure("failed", e);
- } catch (RemotingException e1) {
- problems.add(e1);
- }
- }
- }
-
- public void handleServiceClose(final ServiceContext context) {
- if (serviceClosed.getAndSet(true)) {
- problems.add(new IllegalStateException("Multiple service closes"));
- }
- cleanupLatch.countDown();
- }
-
- public void handleClientClose(final ClientContext context) {
- if (clientClosed.getAndSet(true)) {
- problems.add(new IllegalStateException("Multiple client closes"));
- }
- cleanupLatch.countDown();
- }
- });
- try {
- final Handle<RemoteServiceEndpoint> serviceHandle = serverServiceEndpoint.getHandle();
- serverServiceEndpoint.autoClose();
- try {
- final RemoteClientEndpointListener remoteListener = new RemoteClientEndpointListener() {
- public <I, O> void notifyCreated(final RemoteClientEndpoint endpoint) {
- }
- };
- final ConfigurableFactory<Closeable> tcpServer = xnio.createTcpServer(executorService, Channels.convertStreamToAllocatedMessage(BasicProtocol.createServer(executorService, serverServiceEndpoint, allocator, remoteListener), 32768, 32768), new InetSocketAddress(12345));
- final Closeable tcpServerCloseable = tcpServer.create();
- try {
- // now create a client to connect to it
- final RemoteClientEndpoint localRoot = ;
- try {
- serverServiceEndpoint.autoClose();
- final InetSocketAddress destAddr = new InetSocketAddress("localhost", 12345);
- final TcpClient tcpClient = xnio.createTcpConnector().create().createChannelSource(destAddr);
- try {
- final ChannelSource<AllocatedMessageChannel> messageChannelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
- final IoFuture<RemoteClientEndpoint> futureClient = BasicProtocol.connect(executorService, localRoot, messageChannelSource, allocator);
- final RemoteClientEndpoint clientEndpoint = futureClient.get();
- try {
- final Client<Object,Object> client = endpoint.createClient(clientEndpoint);
- try {
- clientEndpoint.autoClose();
- final Object result = client.send("Test").get();
- assertEquals("response", result);
- client.close();
- cleanupLatch.await(500L, TimeUnit.MILLISECONDS);
- tcpServerCloseable.close();
- serviceHandle.close();
- assertTrue(serviceOpened.get());
- assertTrue(clientOpened.get());
- assertTrue(client2Opened.get());
- assertTrue(clientClosed.get());
- assertTrue(serviceClosed.get());
- } finally {
- IoUtils.safeClose(client);
- }
- } finally {
- IoUtils.safeClose(clientEndpoint);
- }
- } finally {
- // todo close tcpClient
- }
- } finally {
- IoUtils.safeClose(localRoot);
- }
- } finally {
- IoUtils.safeClose(tcpServerCloseable);
- }
- } finally {
- IoUtils.safeClose(serviceHandle);
- }
- } finally {
- IoUtils.safeClose(serverServiceEndpoint);
- }
- } finally {
- endpoint.stop();
- }
} finally {
IoUtils.safeClose(xnio);
}
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-07-18 23:28:49 UTC (rev 4397)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-07-19 00:04:32 UTC (rev 4398)
@@ -4,6 +4,8 @@
import org.jboss.cx.remoting.core.EndpointImpl;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.xnio.IoUtils;
/**
*
@@ -31,20 +33,20 @@
}
public static <I, O> Client<I, O> createLocalClient(Endpoint endpoint, RequestListener<I, O> requestListener) throws RemotingException {
- final RemoteClientEndpoint clientEndpoint = endpoint.createClientEndpoint(requestListener);
+ final Handle<RemoteClientEndpoint> handle = endpoint.createClientEndpoint(requestListener);
try {
- return endpoint.createClient(clientEndpoint);
+ return endpoint.createClient(handle.getResource());
} finally {
- clientEndpoint.autoClose();
+ IoUtils.safeClose(handle);
}
}
public static <I, O> ClientSource<I, O> createLocalClientSource(Endpoint endpoint, RequestListener<I, O> requestListener) throws RemotingException {
- final RemoteServiceEndpoint serviceEndpoint = endpoint.createServiceEndpoint(requestListener);
+ final Handle<RemoteServiceEndpoint> handle = endpoint.createServiceEndpoint(requestListener);
try {
- return endpoint.createClientSource(serviceEndpoint);
+ return endpoint.createClientSource(handle.getResource());
} finally {
- serviceEndpoint.autoClose();
+ IoUtils.safeClose(handle);
}
}
16 years, 5 months
JBoss Remoting SVN: r4397 - remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-18 19:28:49 -0400 (Fri, 18 Jul 2008)
New Revision: 4397
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
Log:
More generic fixups
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-18 23:23:07 UTC (rev 4396)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-18 23:28:49 UTC (rev 4397)
@@ -90,7 +90,6 @@
private final ObjectResolver resolver;
private final ClassLoader classLoader;
- @SuppressWarnings({ "unchecked" })
public BasicHandler(final boolean server, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpoint root, final Executor executor, final RemoteClientEndpointListener remoteListener, final MarshallerFactory<ByteBuffer> marshallerFactory) throws RemotingException {
this.server = server;
this.allocator = allocator;
@@ -427,11 +426,11 @@
return id;
}
- @SuppressWarnings({ "unchecked" })
public void openClientForForwardedService(int id, RemoteClientEndpoint clientEndpoint) {
try {
- forwardedClients.put(Integer.valueOf(id), ((RemoteClientEndpoint)clientEndpoint).getHandle());
+ forwardedClients.put(Integer.valueOf(id), clientEndpoint.getHandle());
} catch (RemotingException e) {
+ // TODO fix
e.printStackTrace();
}
}
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-07-18 23:23:07 UTC (rev 4396)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-07-18 23:28:49 UTC (rev 4397)
@@ -82,8 +82,6 @@
/**
* Create a request client for the basic protocol.
*
- * @param <I> the request type of the new remote root service endpoint
- * @param <O> the reply type of the new remote root service endpoint
* @param executor the executor to use for invocations
* @param localRoot the client endpoint to use as the local root client
* @param channelSource the XNIO channel source to use to establish the connection
@@ -91,11 +89,10 @@
* @return the future client endpoint of the remote side's root client
* @throws IOException if an error occurs
*/
- public static <I, O> IoFuture<RemoteClientEndpoint> connect(final Executor executor, final RemoteClientEndpoint localRoot, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
+ public static IoFuture<RemoteClientEndpoint> connect(final Executor executor, final RemoteClientEndpoint localRoot, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
final BasicHandler basicHandler = new BasicHandler(false, allocator, localRoot, executor, null, new JavaSerializationMarshallerFactory(executor));
final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(basicHandler);
return new AbstractConvertingIoFuture<RemoteClientEndpoint, AllocatedMessageChannel>(futureChannel) {
- @SuppressWarnings({ "unchecked" })
protected RemoteClientEndpoint convert(final AllocatedMessageChannel channel) throws RemotingException {
final RemoteClientEndpoint remoteClientEndpoint = basicHandler.getRemoteClient(0);
return (RemoteClientEndpoint) remoteClientEndpoint;
16 years, 5 months
JBoss Remoting SVN: r4396 - remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-18 19:23:07 -0400 (Fri, 18 Jul 2008)
New Revision: 4396
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
Log:
Fix compile problem
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java 2008-07-18 23:18:52 UTC (rev 4395)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java 2008-07-18 23:23:07 UTC (rev 4396)
@@ -35,5 +35,5 @@
void unbind(int id) throws RemotingException;
- RemoteServiceEndpoint
+ RemoteServiceEndpoint lookup(int id) throws RemotingException;
}
16 years, 5 months
JBoss Remoting SVN: r4395 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi and 7 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-18 19:18:52 -0400 (Fri, 18 Jul 2008)
New Revision: 4395
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
RemoteClient/ServiceEndpoints need and should not be generic
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -37,7 +37,7 @@
* @return the client
* @throws RemotingException if an error occurs
*/
- <I, O> RemoteClientEndpoint<I, O> createClientEndpoint(RequestListener<I, O> requestListener) throws RemotingException;
+ <I, O> RemoteClientEndpoint createClientEndpoint(RequestListener<I, O> requestListener) throws RemotingException;
/**
* Create a client source that can be used to acquire clients associated with a request listener on this endpoint.
@@ -52,7 +52,7 @@
* @return the context source
* @throws RemotingException if an error occurs
*/
- <I, O> RemoteServiceEndpoint<I, O> createServiceEndpoint(RequestListener<I, O> requestListener) throws RemotingException;
+ <I, O> RemoteServiceEndpoint createServiceEndpoint(RequestListener<I, O> requestListener) throws RemotingException;
/**
* Create a client from a remote client endpoint.
@@ -63,7 +63,7 @@
* @return the client
* @throws RemotingException if an error occurs
*/
- <I, O> Client<I, O> createClient(RemoteClientEndpoint<I, O> endpoint) throws RemotingException;
+ <I, O> Client<I, O> createClient(RemoteClientEndpoint endpoint) throws RemotingException;
/**
* Create a client source from a remote service endpoint.
@@ -74,5 +74,5 @@
* @return the client source
* @throws RemotingException if an error occurs
*/
- <I, O> ClientSource<I, O> createClientSource(RemoteServiceEndpoint<I, O> endpoint) throws RemotingException;
+ <I, O> ClientSource<I, O> createClientSource(RemoteServiceEndpoint endpoint) throws RemotingException;
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -48,7 +48,7 @@
* @param msg the message
* @param cause the cause
*/
- public static void safeHandleException(final ReplyHandler<?> replyHandler, final String msg, final Throwable cause) {
+ public static void safeHandleException(final ReplyHandler replyHandler, final String msg, final Throwable cause) {
try {
replyHandler.handleException(msg, cause);
} catch (Throwable t) {
@@ -63,7 +63,7 @@
* @param replyHandler the reply handler
* @param reply the reply
*/
- public static <O> void safeHandleReply(final ReplyHandler<O> replyHandler, final O reply) {
+ public static <O> void safeHandleReply(final ReplyHandler replyHandler, final O reply) {
try {
replyHandler.handleReply(reply);
} catch (Throwable t) {
@@ -76,7 +76,7 @@
*
* @param replyHandler the reply handler
*/
- public static void safeHandleCancellation(final ReplyHandler<?> replyHandler) {
+ public static void safeHandleCancellation(final ReplyHandler replyHandler) {
try {
replyHandler.handleCancellation();
} catch (Throwable t) {
@@ -136,7 +136,7 @@
private static final RemoteRequestContext BLANK_REMOTE_REQUEST_CONTEXT = new BlankRemoteRequestContext();
- public static void safeAutoClose(final RemoteClientEndpoint<?, ?> remoteClientEndpoint) {
+ public static void safeAutoClose(final RemoteClientEndpoint remoteClientEndpoint) {
try {
remoteClientEndpoint.autoClose();
} catch (Throwable t) {
@@ -144,7 +144,7 @@
}
}
- public static void safeAutoClose(final RemoteServiceEndpoint<Object, Object> remoteServiceEndpoint) {
+ public static void safeAutoClose(final RemoteServiceEndpoint remoteServiceEndpoint) {
try {
remoteServiceEndpoint.autoClose();
} catch (Throwable t) {
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -25,13 +25,12 @@
import org.jboss.cx.remoting.Closeable;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.Client;
/**
* A remote client endpoint, which can be passed to remote endpoints. Remote systems can then use the client endpoint
* to make invocations, or they may pass the client endpoint on to other remote systems.
*/
-public interface RemoteClientEndpoint<I, O> extends Closeable<RemoteClientEndpoint<I, O>> {
+public interface RemoteClientEndpoint extends Closeable<RemoteClientEndpoint> {
/**
* Receive a one-way request from a remote system. This method is intended to be called by protocol handlers. No
@@ -39,7 +38,7 @@
*
* @param request the request
*/
- void receiveRequest(I request);
+ void receiveRequest(Object request);
/**
* Receive a request from a remote system. This method is intended to be called by protocol handlers. If the
@@ -51,7 +50,7 @@
* @param replyHandler a handler for the reply
* @return a context which may be used to cancel the request
*/
- RemoteRequestContext receiveRequest(I request, ReplyHandler<O> replyHandler);
+ RemoteRequestContext receiveRequest(Object request, ReplyHandler replyHandler);
/**
* Get a handle to this client endpoint. The client endpoint will not auto-close as long as there is at least
@@ -62,7 +61,7 @@
* @return the handle
* @throws RemotingException if a handle could not be acquired
*/
- Handle<RemoteClientEndpoint<I, O>> getHandle() throws RemotingException;
+ Handle<RemoteClientEndpoint> getHandle() throws RemotingException;
/**
* Automatically close this client endpoint when all handles and local client instances are closed.
@@ -82,5 +81,5 @@
*
* @param handler the handler to be called
*/
- void addCloseHandler(final CloseHandler<? super RemoteClientEndpoint<I, O>> handler);
+ void addCloseHandler(final CloseHandler<? super RemoteClientEndpoint> handler);
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -34,5 +34,5 @@
* @param <O> the reply type
* @param endpoint the endpoint that was created
*/
- <I, O> void notifyCreated(RemoteClientEndpoint<I, O> endpoint);
+ <I, O> void notifyCreated(RemoteClientEndpoint endpoint);
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -25,7 +25,6 @@
import org.jboss.cx.remoting.Closeable;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.ClientSource;
/**
* A remote service endpoint, which can be passed to remote endpoints. Remote systems can then use the service endpoint
@@ -33,7 +32,7 @@
* has the advantage that a round trip to the remote side is not necessary; the local side can spawn a client endpoint
* and simply notify the remote side of the change.
*/
-public interface RemoteServiceEndpoint<I, O> extends Closeable<RemoteServiceEndpoint<I, O>> {
+public interface RemoteServiceEndpoint extends Closeable<RemoteServiceEndpoint> {
/**
* Create a client endpoint for the service corresponding to this service endpoint.
@@ -41,7 +40,7 @@
* @return a client endpoint
* @throws RemotingException if a client could not be opened
*/
- RemoteClientEndpoint<I, O> createClientEndpoint() throws RemotingException;
+ RemoteClientEndpoint createClientEndpoint() throws RemotingException;
/**
* Get a handle to this service endpoint. The service endpoint will not auto-close as long as there is at least
@@ -52,7 +51,7 @@
* @return the handle
* @throws RemotingException if a handle could not be acquired
*/
- Handle<RemoteServiceEndpoint<I, O>> getHandle() throws RemotingException;
+ Handle<RemoteServiceEndpoint> getHandle() throws RemotingException;
/**
* Automatically close this service endpoint when all handles and local client source instances
@@ -70,5 +69,5 @@
*
* @param handler the handler to be called
*/
- void addCloseHandler(final CloseHandler<? super RemoteServiceEndpoint<I, O>> handler);
+ void addCloseHandler(final CloseHandler<? super RemoteServiceEndpoint> handler);
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -26,14 +26,14 @@
* A handler for replies from a request. The handler should respect the first invocation made on it, and ignore
* any subsequent invocations.
*/
-public interface ReplyHandler<O> {
+public interface ReplyHandler {
/**
* Handle a successful reply.
*
* @param reply the reply
*/
- void handleReply(O reply);
+ void handleReply(Object reply);
/**
* Handle a remote exception.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -41,28 +41,28 @@
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> RemoteClientEndpoint<I, O> createClientEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
+ public <I, O> RemoteClientEndpoint createClientEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
return delegate.createClientEndpoint(requestListener);
}
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> RemoteServiceEndpoint<I, O> createServiceEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
+ public <I, O> RemoteServiceEndpoint createServiceEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
return delegate.createServiceEndpoint(requestListener);
}
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> Client<I, O> createClient(final RemoteClientEndpoint<I, O> endpoint) throws RemotingException {
+ public <I, O> Client<I, O> createClient(final RemoteClientEndpoint endpoint) throws RemotingException {
return delegate.createClient(endpoint);
}
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> ClientSource<I, O> createClientSource(final RemoteServiceEndpoint<I, O> endpoint) throws RemotingException {
+ public <I, O> ClientSource<I, O> createClientSource(final RemoteServiceEndpoint endpoint) throws RemotingException {
return delegate.createClientSource(endpoint);
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -38,9 +38,9 @@
*/
public final class ClientImpl<I, O> extends AbstractContextImpl<Client<I, O>> implements Client<I, O> {
- private final RemoteClientEndpoint<I, O> remoteClientEndpoint;
+ private final RemoteClientEndpoint remoteClientEndpoint;
- ClientImpl(final RemoteClientEndpoint<I, O> remoteClientEndpoint, final Executor executor) {
+ ClientImpl(final RemoteClientEndpoint remoteClientEndpoint, final Executor executor) {
super(executor);
this.remoteClientEndpoint = remoteClientEndpoint;
}
@@ -51,7 +51,7 @@
}
final QueueExecutor executor = new QueueExecutor();
final FutureReplyImpl<O> futureReply = new FutureReplyImpl<O>(executor);
- final ReplyHandler<O> replyHandler = futureReply.getReplyHandler();
+ final ReplyHandler replyHandler = futureReply.getReplyHandler();
final RemoteRequestContext requestContext = remoteClientEndpoint.receiveRequest(request, replyHandler);
futureReply.setRemoteRequestContext(requestContext);
futureReply.addCompletionHandler(new RequestCompletionHandler<O>() {
@@ -68,7 +68,7 @@
throw new RemotingException("Client is not open");
}
final FutureReplyImpl<O> futureReply = new FutureReplyImpl<O>(executor);
- final ReplyHandler<O> replyHandler = futureReply.getReplyHandler();
+ final ReplyHandler replyHandler = futureReply.getReplyHandler();
final RemoteRequestContext requestContext = remoteClientEndpoint.receiveRequest(request, replyHandler);
futureReply.setRemoteRequestContext(requestContext);
return futureReply;
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -35,10 +35,10 @@
*/
public final class ClientSourceImpl<I, O> extends AbstractCloseable<ClientSource<I, O>> implements ClientSource<I, O> {
- private final RemoteServiceEndpoint<I, O> serviceEndpoint;
+ private final RemoteServiceEndpoint serviceEndpoint;
private final Endpoint endpoint;
- ClientSourceImpl(final RemoteServiceEndpoint<I, O> serviceEndpoint, final EndpointImpl endpoint) {
+ ClientSourceImpl(final RemoteServiceEndpoint serviceEndpoint, final EndpointImpl endpoint) {
super(endpoint.getExecutor());
this.serviceEndpoint = serviceEndpoint;
this.endpoint = endpoint;
@@ -48,7 +48,7 @@
if (! isOpen()) {
throw new RemotingException("Client source is not open");
}
- final RemoteClientEndpoint<I,O> clientEndpoint = serviceEndpoint.createClientEndpoint();
+ final RemoteClientEndpoint clientEndpoint = serviceEndpoint.createClientEndpoint();
final Client<I, O> client = endpoint.createClient(clientEndpoint);
clientEndpoint.autoClose();
return client;
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -121,23 +121,23 @@
return endpointMap;
}
- public <I, O> RemoteClientEndpoint<I, O> createClientEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
+ public <I, O> RemoteClientEndpoint createClientEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new RemoteClientEndpointLocalImpl<I, O>(executor, requestListener);
clientEndpoint.addCloseHandler(remover);
clientEndpoint.open();
return clientEndpoint;
}
- public <I, O> RemoteServiceEndpoint<I, O> createServiceEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
+ public <I, O> RemoteServiceEndpoint createServiceEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
final RemoteServiceEndpointLocalImpl<I, O> serviceEndpoint = new RemoteServiceEndpointLocalImpl<I, O>(executor, requestListener);
serviceEndpoint.addCloseHandler(remover);
serviceEndpoint.open();
return serviceEndpoint;
}
- public <I, O> Client<I, O> createClient(final RemoteClientEndpoint<I, O> endpoint) throws RemotingException {
+ public <I, O> Client<I, O> createClient(final RemoteClientEndpoint endpoint) throws RemotingException {
boolean ok = false;
- final Handle<RemoteClientEndpoint<I,O>> handle = endpoint.getHandle();
+ final Handle<RemoteClientEndpoint> handle = endpoint.getHandle();
try {
final ClientImpl<I, O> client = new ClientImpl<I, O>(endpoint, executor);
client.addCloseHandler(new CloseHandler<Client<I, O>>() {
@@ -154,9 +154,9 @@
}
}
- public <I, O> ClientSource<I, O> createClientSource(final RemoteServiceEndpoint<I, O> endpoint) throws RemotingException {
+ public <I, O> ClientSource<I, O> createClientSource(final RemoteServiceEndpoint endpoint) throws RemotingException {
boolean ok = false;
- final Handle<RemoteServiceEndpoint<I,O>> handle = endpoint.getHandle();
+ final Handle<RemoteServiceEndpoint> handle = endpoint.getHandle();
try {
final ClientSourceImpl<I, O> client = new ClientSourceImpl<I, O>(endpoint, this);
client.addCloseHandler(new CloseHandler<ClientSource<I, O>>() {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -40,7 +40,7 @@
public final class FutureReplyImpl<O> implements FutureReply<O> {
private final Executor executor;
- private final ReplyHandler<O> replyHandler = new Handler();
+ private final ReplyHandler replyHandler = new Handler();
private final Object lock = new Object();
// @protectedby lock
private State state = State.NEW;
@@ -264,7 +264,7 @@
return this;
}
- ReplyHandler<O> getReplyHandler() {
+ ReplyHandler getReplyHandler() {
return replyHandler;
}
@@ -284,9 +284,10 @@
}
}
- private final class Handler implements ReplyHandler<O> {
+ private final class Handler implements ReplyHandler {
- public void handleReply(final O reply) {
+ @SuppressWarnings({ "unchecked" })
+ public void handleReply(final Object reply) {
synchronized (lock) {
while (state == State.NEW) {
boolean intr = false;
@@ -304,7 +305,7 @@
}
if (state == State.WAITING) {
state = State.DONE;
- result = reply;
+ result = (O) reply;
runCompletionHandlers();
lock.notifyAll();
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -37,7 +37,7 @@
/**
*
*/
-public final class RemoteClientEndpointLocalImpl<I, O> extends AbstractAutoCloseable<RemoteClientEndpoint<I, O>> implements RemoteClientEndpoint<I, O> {
+public final class RemoteClientEndpointLocalImpl<I, O> extends AbstractAutoCloseable<RemoteClientEndpoint> implements RemoteClientEndpoint {
private final RequestListener<I, O> requestListener;
private final Executor executor;
@@ -60,12 +60,13 @@
this(executor, requestListener, new ClientContextImpl(executor));
}
- public void receiveRequest(final I request) {
+ public void receiveRequest(final Object request) {
final RequestContextImpl<O> context = new RequestContextImpl<O>(clientContext);
executor.execute(new Runnable() {
+ @SuppressWarnings({ "unchecked" })
public void run() {
try {
- requestListener.handleRequest(context, request);
+ requestListener.handleRequest(context, (I) request);
} catch (Throwable t) {
log.error(t, "Unexpected exception in request listener");
}
@@ -73,12 +74,13 @@
});
}
- public RemoteRequestContext receiveRequest(final I request, final ReplyHandler<O> replyHandler) {
+ public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler replyHandler) {
final RequestContextImpl<O> context = new RequestContextImpl<O>(replyHandler, clientContext);
executor.execute(new Runnable() {
+ @SuppressWarnings({ "unchecked" })
public void run() {
try {
- requestListener.handleRequest(context, request);
+ requestListener.handleRequest(context, (I) request);
} catch (RemoteExecutionException e) {
SpiUtils.safeHandleException(replyHandler, e.getMessage(), e.getCause());
} catch (Throwable t) {
@@ -96,8 +98,8 @@
void open() throws RemotingException {
try {
requestListener.handleClientOpen(clientContext);
- addCloseHandler(new CloseHandler<RemoteClientEndpoint<I, O>>() {
- public void handleClose(final RemoteClientEndpoint<I, O> closed) {
+ addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
+ public void handleClose(final RemoteClientEndpoint closed) {
try {
requestListener.handleClientClose(clientContext);
} catch (Throwable t) {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -34,7 +34,7 @@
/**
*
*/
-public final class RemoteServiceEndpointLocalImpl<I, O> extends AbstractAutoCloseable<RemoteServiceEndpoint<I, O>> implements RemoteServiceEndpoint<I, O> {
+public final class RemoteServiceEndpointLocalImpl<I, O> extends AbstractAutoCloseable<RemoteServiceEndpoint> implements RemoteServiceEndpoint {
private final RequestListener<I, O> requestListener;
private final ServiceContextImpl serviceContext;
@@ -49,7 +49,7 @@
serviceContext = new ServiceContextImpl(executor);
}
- public RemoteClientEndpoint<I, O> createClientEndpoint() throws RemotingException {
+ public RemoteClientEndpoint createClientEndpoint() throws RemotingException {
if (isOpen()) {
final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new RemoteClientEndpointLocalImpl<I, O>(executor, this, requestListener);
clientEndpoint.open();
@@ -62,8 +62,8 @@
void open() throws RemotingException {
try {
requestListener.handleServiceOpen(serviceContext);
- addCloseHandler(new CloseHandler<RemoteServiceEndpoint<I, O>>() {
- public void handleClose(final RemoteServiceEndpoint<I, O> closed) {
+ addCloseHandler(new CloseHandler<RemoteServiceEndpoint>() {
+ public void handleClose(final RemoteServiceEndpoint closed) {
try {
requestListener.handleServiceClose(serviceContext);
} catch (Throwable t) {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -40,7 +40,7 @@
private final AtomicBoolean closed = new AtomicBoolean();
private final Object cancelLock = new Object();
- private final ReplyHandler<O> replyHandler;
+ private final ReplyHandler replyHandler;
private final ClientContextImpl clientContext;
private final AtomicBoolean cancelled = new AtomicBoolean();
@@ -48,7 +48,7 @@
private Set<RequestCancelHandler<O>> cancelHandlers;
private final TaggingExecutor executor;
- RequestContextImpl(final ReplyHandler<O> replyHandler, final ClientContextImpl clientContext) {
+ RequestContextImpl(final ReplyHandler replyHandler, final ClientContextImpl clientContext) {
this.replyHandler = replyHandler;
this.clientContext = clientContext;
executor = new TaggingExecutor(clientContext.getExecutor());
Modified: remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java
===================================================================
--- remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -78,7 +78,7 @@
endpoint.setExecutor(executorService);
endpoint.start();
try {
- final RemoteClientEndpoint<Object,Object> clientEndpoint = endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
+ final RemoteClientEndpoint clientEndpoint = endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
assertEquals(request, requestObj);
try {
@@ -93,8 +93,8 @@
}
});
try {
- clientEndpoint.addCloseHandler(new CloseHandler<RemoteClientEndpoint<Object, Object>>() {
- public void handleClose(final RemoteClientEndpoint<Object, Object> closed) {
+ clientEndpoint.addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
+ public void handleClose(final RemoteClientEndpoint closed) {
clientEndpointClosed.set(true);
}
});
@@ -137,7 +137,7 @@
endpoint.setExecutor(executorService);
endpoint.start();
try {
- final RemoteClientEndpoint<Object,Object> clientEndpoint = endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
+ final RemoteClientEndpoint clientEndpoint = endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
assertEquals(request, requestObj);
try {
@@ -152,8 +152,8 @@
}
});
try {
- clientEndpoint.addCloseHandler(new CloseHandler<RemoteClientEndpoint<Object, Object>>() {
- public void handleClose(final RemoteClientEndpoint<Object, Object> closed) {
+ clientEndpoint.addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
+ public void handleClose(final RemoteClientEndpoint closed) {
clientEndpointClosed.set(true);
}
});
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -52,6 +52,7 @@
import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_FAILED;
import static org.jboss.cx.remoting.protocol.basic.MessageType.CANCEL_ACK;
import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
@@ -71,13 +72,13 @@
private static final Logger log = Logger.getLogger(BasicHandler.class);
// clients whose requests get forwarded to the remote side
- private final ConcurrentMap<Integer, RemoteClientEndpoint<?, ?>> remoteClients = concurrentMap();
+ private final ConcurrentMap<Integer, RemoteClientEndpoint> remoteClients = concurrentMap();
// running on remote node
- private final ConcurrentMap<Integer, ReplyHandler<?>> outstandingRequests = concurrentMap();
+ private final ConcurrentMap<Integer, ReplyHandler> outstandingRequests = concurrentMap();
// forwarded to remote side
- private final ConcurrentMap<Integer, Handle<RemoteClientEndpoint<?, ?>>> forwardedClients = concurrentMap();
+ private final ConcurrentMap<Integer, Handle<RemoteClientEndpoint>> forwardedClients = concurrentMap();
// forwarded to remote side
- private final ConcurrentMap<Integer, Handle<RemoteServiceEndpoint<?, ?>>> forwardedServices = concurrentMap();
+ private final ConcurrentMap<Integer, Handle<RemoteServiceEndpoint>> forwardedServices = concurrentMap();
private final boolean server;
private final BufferAllocator<ByteBuffer> allocator;
@@ -90,12 +91,12 @@
private final ClassLoader classLoader;
@SuppressWarnings({ "unchecked" })
- public <I, O> BasicHandler(final boolean server, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpoint<I, O> root, final Executor executor, final RemoteClientEndpointListener remoteListener, final MarshallerFactory<ByteBuffer> marshallerFactory) throws RemotingException {
+ public BasicHandler(final boolean server, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpoint root, final Executor executor, final RemoteClientEndpointListener remoteListener, final MarshallerFactory<ByteBuffer> marshallerFactory) throws RemotingException {
this.server = server;
this.allocator = allocator;
this.executor = executor;
forwardedClients.put(Integer.valueOf(0), ((RemoteClientEndpoint)root).getHandle());
- final RemoteClientEndpointImpl<Object, Object> endpoint = new RemoteClientEndpointImpl<Object, Object>(0, marshallerFactory, allocator);
+ final RemoteClientEndpointImpl endpoint = new RemoteClientEndpointImpl(0, marshallerFactory, allocator);
remoteClients.put(Integer.valueOf(0), endpoint);
if (remoteListener != null) {
remoteListener.notifyCreated(endpoint);
@@ -148,7 +149,7 @@
switch (msgType) {
case REQUEST_ONEWAY: {
final int clientId = buffer.getInt();
- final Handle<RemoteClientEndpoint<?, ?>> handle = getForwardedClient(clientId);
+ final Handle<RemoteClientEndpoint> handle = getForwardedClient(clientId);
if (handle == null) {
log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
return;
@@ -165,13 +166,13 @@
log.trace("Class not found in one-way request for client ID %d", Integer.valueOf(clientId));
break;
}
- final RemoteClientEndpoint<?, ?> clientEndpoint = handle.getResource();
- receiveRequest(clientEndpoint, payload);
+ final RemoteClientEndpoint clientEndpoint = handle.getResource();
+ clientEndpoint.receiveRequest(payload);
break;
}
case REQUEST: {
final int clientId = buffer.getInt();
- final Handle<RemoteClientEndpoint<?, ?>> handle = getForwardedClient(clientId);
+ final Handle<RemoteClientEndpoint> handle = getForwardedClient(clientId);
if (handle == null) {
log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
break;
@@ -190,13 +191,13 @@
log.trace("Class not found in request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
break;
}
- final RemoteClientEndpoint<?, ?> clientEndpoint = handle.getResource();
- receiveRequest(clientEndpoint, new ReplyHandlerImpl(channel, requestId, allocator), payload);
+ final RemoteClientEndpoint clientEndpoint = handle.getResource();
+ clientEndpoint.receiveRequest(payload, (ReplyHandler) new ReplyHandlerImpl(channel, requestId, allocator));
break;
}
case REPLY: {
final int requestId = buffer.getInt();
- final ReplyHandler<?> replyHandler = takeOutstandingReqeust(requestId);
+ final ReplyHandler replyHandler = takeOutstandingReqeust(requestId);
if (replyHandler == null) {
log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
break;
@@ -215,12 +216,12 @@
log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
break;
}
- handleReply(replyHandler, payload);
+ SpiUtils.safeHandleReply(replyHandler, payload);
break;
}
case REQUEST_FAILED: {
final int requestId = buffer.getInt();
- final ReplyHandler<?> replyHandler = takeOutstandingReqeust(requestId);
+ final ReplyHandler replyHandler = takeOutstandingReqeust(requestId);
if (replyHandler == null) {
log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
break;
@@ -247,26 +248,40 @@
log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
break;
}
- handleException(replyHandler, message, cause);
+ SpiUtils.safeHandleException(replyHandler, message == null ? null : message.toString(), cause instanceof Throwable ? (Throwable) cause : null);
break;
}
case CLIENT_CLOSE: {
+ final int clientId = buffer.getInt();
+ final Handle<RemoteClientEndpoint> handle = takeForwardedClient(clientId);
+ if (handle == null) {
+ log.warn("Got client close message for unknown client %d", Integer.valueOf(clientId));
+ break;
+ }
+ IoUtils.safeClose(handle);
break;
}
case CLIENT_OPEN: {
final int serviceId = buffer.getInt();
final int clientId = buffer.getInt();
- final Handle<RemoteServiceEndpoint<?, ?>> handle = getForwardedService(serviceId);
+ final Handle<RemoteServiceEndpoint> handle = getForwardedService(serviceId);
if (handle == null) {
// todo log invalid request
break;
}
- final RemoteServiceEndpoint<?, ?> serviceEndpoint = handle.getResource();
- final RemoteClientEndpoint<?, ?> clientEndpoint = serviceEndpoint.createClientEndpoint();
+ final RemoteServiceEndpoint serviceEndpoint = handle.getResource();
+ final RemoteClientEndpoint clientEndpoint = serviceEndpoint.createClientEndpoint();
break;
}
case SERVICE_CLOSE: {
+ final int serviceId = buffer.getInt();
+ final Handle<RemoteServiceEndpoint> handle = takeForwardedService(serviceId);
+ if (handle == null) {
+ log.warn("Got client close message for unknown service %d", Integer.valueOf(serviceId));
+ break;
+ }
+ IoUtils.safeClose(handle);
break;
}
default: {
@@ -309,15 +324,11 @@
public void handleClosed(final AllocatedMessageChannel channel) {
}
- private <I, O> ReplyHandler<O> createReplyHandler(final AllocatedMessageChannel channel, final int requestId) {
- return new ReplyHandlerImpl<O>(channel, requestId, allocator);
- }
-
- RemoteClientEndpoint<?, ?> getRemoteClient(final int i) {
+ RemoteClientEndpoint getRemoteClient(final int i) {
return remoteClients.get(Integer.valueOf(i));
}
- private final class ReplyHandlerImpl<O> implements ReplyHandler<O> {
+ private final class ReplyHandlerImpl implements ReplyHandler {
private final AllocatedMessageChannel channel;
private final int requestId;
@@ -335,7 +346,7 @@
this.allocator = allocator;
}
- public void handleReply(final O reply) {
+ public void handleReply(final Object reply) {
ByteBuffer buffer = allocator.allocate();
buffer.put((byte) REPLY);
buffer.putInt(requestId);
@@ -400,7 +411,7 @@
// Session mgmt
- public int openRequest(ReplyHandler<?> handler) {
+ public int openRequest(ReplyHandler handler) {
int id;
do {
id = localRequestIdSeq.getAndIncrement();
@@ -412,12 +423,12 @@
int id;
do {
id = remoteClientIdSeq.getAndIncrement() << 1 | (server ? 1 : 0);
- } while (remoteClients.putIfAbsent(Integer.valueOf(id), new RemoteClientEndpointImpl<Object, Object>(id, null, allocator)) != null);
+ } while (remoteClients.putIfAbsent(Integer.valueOf(id), new RemoteClientEndpointImpl(id, null, allocator)) != null);
return id;
}
@SuppressWarnings({ "unchecked" })
- public void openClientForForwardedService(int id, RemoteClientEndpoint<?, ?> clientEndpoint) {
+ public void openClientForForwardedService(int id, RemoteClientEndpoint clientEndpoint) {
try {
forwardedClients.put(Integer.valueOf(id), ((RemoteClientEndpoint)clientEndpoint).getHandle());
} catch (RemotingException e) {
@@ -425,37 +436,26 @@
}
}
- public Handle<RemoteClientEndpoint<?, ?>> getForwardedClient(int id) {
+ public Handle<RemoteClientEndpoint> getForwardedClient(int id) {
return forwardedClients.get(Integer.valueOf(id));
}
- public ReplyHandler<?> takeOutstandingReqeust(int id) {
+ private Handle<RemoteClientEndpoint> takeForwardedClient(final int id) {
+ return forwardedClients.remove(Integer.valueOf(id));
+ }
+
+ public ReplyHandler takeOutstandingReqeust(int id) {
return outstandingRequests.remove(Integer.valueOf(id));
}
- public Handle<RemoteServiceEndpoint<?, ?>> getForwardedService(final int id) {
+ public Handle<RemoteServiceEndpoint> getForwardedService(final int id) {
return forwardedServices.get(Integer.valueOf(id));
}
- @SuppressWarnings({ "unchecked" })
- private static <I, O> void receiveRequest(RemoteClientEndpoint<I, O> clientEndpoint, Object request) {
- clientEndpoint.receiveRequest((I) request);
+ public Handle<RemoteServiceEndpoint> takeForwardedService(final int id) {
+ return forwardedServices.remove(Integer.valueOf(id));
}
- @SuppressWarnings({ "unchecked" })
- private static <I, O> RemoteRequestContext receiveRequest(RemoteClientEndpoint<I, O> clientEndpoint, ReplyHandler<O> replyHandler, Object request) {
- return clientEndpoint.receiveRequest((I) request, replyHandler);
- }
-
- @SuppressWarnings({ "unchecked" })
- private static <O> void handleReply(final ReplyHandler<O> replyHandler, final Object reply) {
- SpiUtils.safeHandleReply(replyHandler, (O) reply);
- }
-
- private static void handleException(final ReplyHandler<?> handler, final Object message, final Object cause) {
- SpiUtils.safeHandleException(handler, message == null ? null : message.toString(), cause instanceof Throwable ? (Throwable) cause : null);
- }
-
// Writer members
private final BlockingQueue<WriteHandler> outputQueue = CollectionUtil.blockingQueue(64);
@@ -470,7 +470,7 @@
// client endpoint
- private final class RemoteClientEndpointImpl<I, O> extends AbstractAutoCloseable<RemoteClientEndpoint<I, O>> implements RemoteClientEndpoint<I, O> {
+ private final class RemoteClientEndpointImpl extends AbstractAutoCloseable<RemoteClientEndpoint> implements RemoteClientEndpoint {
private final int identifier;
private final MarshallerFactory<ByteBuffer> marshallerFactory;
@@ -487,9 +487,22 @@
this.identifier = identifier;
this.marshallerFactory = marshallerFactory;
this.allocator = allocator;
+ addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
+ public void handleClose(final RemoteClientEndpoint closed) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CLIENT_CLOSE);
+ buffer.putInt(identifier);
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ log.warn("Client close notification was interrupted before it could be sent");
+ }
+ }
+ });
}
- public void receiveRequest(final I request) {
+ public void receiveRequest(final Object request) {
log.trace("Received one-way request of type %s", request == null ? "null" : request.getClass());
try {
final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(null);
@@ -517,7 +530,7 @@
}
}
- public RemoteRequestContext receiveRequest(final I request, final ReplyHandler<O> handler) {
+ public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler handler) {
log.trace("Received request of type %s", request == null ? "null" : request.getClass());
try {
final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(null);
@@ -579,7 +592,7 @@
}
}
- public final class RemoteServiceEndpointImpl<I, O> extends AbstractAutoCloseable<RemoteServiceEndpoint<I, O>> implements RemoteServiceEndpoint<I, O> {
+ public final class RemoteServiceEndpointImpl extends AbstractAutoCloseable<RemoteServiceEndpoint> implements RemoteServiceEndpoint {
private final MarshallerFactory<ByteBuffer> marshallerFactory;
private final BufferAllocator<ByteBuffer> allocator;
@@ -590,20 +603,34 @@
this.marshallerFactory = marshallerFactory;
this.allocator = allocator;
this.identifier = identifier;
+ addCloseHandler(new CloseHandler<RemoteServiceEndpoint>() {
+ public void handleClose(final RemoteServiceEndpoint closed) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.SERVICE_CLOSE);
+ buffer.putInt(identifier);
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ log.warn("Service close notification was interrupted before it could be sent");
+ }
+ }
+ });
}
- public RemoteClientEndpoint<I, O> createClientEndpoint() throws RemotingException {
+ public RemoteClientEndpoint createClientEndpoint() throws RemotingException {
final int id = openClientFromService();
final ByteBuffer buffer = allocator.allocate();
buffer.putInt(identifier);
buffer.putInt(openClientFromService());
buffer.flip();
+ // todo - probably should bail out if we're interrupted?
boolean intr = false;
for (;;) {
try {
registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
try {
- return new RemoteClientEndpointImpl<I,O>(id, marshallerFactory, allocator);
+ return new RemoteClientEndpointImpl(id, marshallerFactory, allocator);
} finally {
if (intr) {
Thread.currentThread().interrupt();
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -26,7 +26,6 @@
import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.core.marshal.JBossSerializationMarshallerFactory;
import org.jboss.cx.remoting.core.marshal.JavaSerializationMarshallerFactory;
import org.jboss.xnio.IoHandlerFactory;
import org.jboss.xnio.ChannelSource;
@@ -59,11 +58,11 @@
* @param remoteListener a listener which receives notification of the remote root client of the incoming connection
* @return a handler factory for passing to an XNIO server
*/
- public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Executor executor, final RemoteServiceEndpoint<?, ?> localRootSource, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpointListener remoteListener) {
+ public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Executor executor, final RemoteServiceEndpoint localRootSource, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpointListener remoteListener) {
return new IoHandlerFactory<AllocatedMessageChannel>() {
public IoHandler<? super AllocatedMessageChannel> createHandler() {
try {
- final RemoteClientEndpoint<?, ?> remoteClientEndpoint = localRootSource.createClientEndpoint();
+ final RemoteClientEndpoint remoteClientEndpoint = localRootSource.createClientEndpoint();
try {
return new BasicHandler(true, allocator, remoteClientEndpoint, executor, remoteListener, new JavaSerializationMarshallerFactory(executor));
} finally {
@@ -92,13 +91,13 @@
* @return the future client endpoint of the remote side's root client
* @throws IOException if an error occurs
*/
- public static <I, O> IoFuture<RemoteClientEndpoint<I, O>> connect(final Executor executor, final RemoteClientEndpoint<?, ?> localRoot, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
+ public static <I, O> IoFuture<RemoteClientEndpoint> connect(final Executor executor, final RemoteClientEndpoint localRoot, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
final BasicHandler basicHandler = new BasicHandler(false, allocator, localRoot, executor, null, new JavaSerializationMarshallerFactory(executor));
final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(basicHandler);
- return new AbstractConvertingIoFuture<RemoteClientEndpoint<I, O>, AllocatedMessageChannel>(futureChannel) {
+ return new AbstractConvertingIoFuture<RemoteClientEndpoint, AllocatedMessageChannel>(futureChannel) {
@SuppressWarnings({ "unchecked" })
- protected RemoteClientEndpoint<I, O> convert(final AllocatedMessageChannel channel) throws RemotingException {
- final RemoteClientEndpoint<?, ?> remoteClientEndpoint = basicHandler.getRemoteClient(0);
+ protected RemoteClientEndpoint convert(final AllocatedMessageChannel channel) throws RemotingException {
+ final RemoteClientEndpoint remoteClientEndpoint = basicHandler.getRemoteClient(0);
return (RemoteClientEndpoint) remoteClientEndpoint;
}
};
Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.RemotingException;
+
+/**
+ *
+ */
+public interface ServiceRegistry {
+ int bind(RemoteServiceEndpoint remoteServiceEndpoint) throws RemotingException;
+
+ void bind(RemoteServiceEndpoint remoteServiceEndpoint, int id) throws RemotingException;
+
+ void unbind(int id) throws RemotingException;
+
+ RemoteServiceEndpoint
+}
Modified: remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -50,6 +50,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.Collections;
+import java.util.List;
+import java.util.LinkedList;
import java.nio.ByteBuffer;
import java.net.InetSocketAddress;
import java.io.Closeable;
@@ -63,11 +66,13 @@
}
public void testConnection() throws Throwable {
+ final List<Throwable> problems = Collections.synchronizedList(new LinkedList<Throwable>());
final AtomicBoolean clientOpened = new AtomicBoolean(false);
+ final AtomicBoolean client2Opened = new AtomicBoolean(false);
final AtomicBoolean serviceOpened = new AtomicBoolean(false);
final AtomicBoolean clientClosed = new AtomicBoolean(false);
final AtomicBoolean serviceClosed = new AtomicBoolean(false);
- final CountDownLatch clientCloseLatch = new CountDownLatch(1);
+ final CountDownLatch cleanupLatch = new CountDownLatch(2);
final ExecutorService executorService = Executors.newCachedThreadPool();
try {
final BufferAllocator<ByteBuffer> allocator = new BufferAllocator<ByteBuffer>() {
@@ -84,13 +89,19 @@
endpoint.setExecutor(executorService);
endpoint.start();
try {
- final RemoteServiceEndpoint<Object,Object> serverServiceEndpoint = endpoint.createServiceEndpoint(new RequestListener<Object, Object>() {
+ final RemoteServiceEndpoint serverServiceEndpoint = endpoint.createServiceEndpoint(new RequestListener<Object, Object>() {
public void handleClientOpen(final ClientContext context) {
- clientOpened.set(true);
+ if (clientOpened.getAndSet(true)) {
+ if (client2Opened.getAndSet(true)) {
+ problems.add(new IllegalStateException("Too many client opens"));
+ }
+ }
}
public void handleServiceOpen(final ServiceContext context) {
- serviceOpened.set(true);
+ if (serviceOpened.getAndSet(true)) {
+ problems.add(new IllegalStateException("Multiple service opens"));
+ }
}
public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
@@ -101,61 +112,78 @@
try {
context.sendFailure("failed", e);
} catch (RemotingException e1) {
- System.out.println("Double fault!");
+ problems.add(e1);
}
}
}
public void handleServiceClose(final ServiceContext context) {
- serviceClosed.set(true);
+ if (serviceClosed.getAndSet(true)) {
+ problems.add(new IllegalStateException("Multiple service closes"));
+ }
+ cleanupLatch.countDown();
}
public void handleClientClose(final ClientContext context) {
- clientClosed.set(true);
- clientCloseLatch.countDown();
+ if (clientClosed.getAndSet(true)) {
+ problems.add(new IllegalStateException("Multiple client closes"));
+ }
+ cleanupLatch.countDown();
}
});
try {
- final Handle<RemoteServiceEndpoint<Object,Object>> handle = serverServiceEndpoint.getHandle();
+ final Handle<RemoteServiceEndpoint> serviceHandle = serverServiceEndpoint.getHandle();
serverServiceEndpoint.autoClose();
try {
final RemoteClientEndpointListener remoteListener = new RemoteClientEndpointListener() {
-
- public <I, O> void notifyCreated(final RemoteClientEndpoint<I, O> endpoint) {
-
+ public <I, O> void notifyCreated(final RemoteClientEndpoint endpoint) {
}
};
final ConfigurableFactory<Closeable> tcpServer = xnio.createTcpServer(executorService, Channels.convertStreamToAllocatedMessage(BasicProtocol.createServer(executorService, serverServiceEndpoint, allocator, remoteListener), 32768, 32768), new InetSocketAddress(12345));
final Closeable tcpServerCloseable = tcpServer.create();
try {
// now create a client to connect to it
- final RemoteClientEndpoint<?,?> localRoot = serverServiceEndpoint.createClientEndpoint();
- final InetSocketAddress destAddr = new InetSocketAddress("localhost", 12345);
- final TcpClient tcpClient = xnio.createTcpConnector().create().createChannelSource(destAddr);
- final ChannelSource<AllocatedMessageChannel> messageChannelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
- final IoFuture<RemoteClientEndpoint<Object,Object>> futureClient = BasicProtocol.connect(executorService, localRoot, messageChannelSource, allocator);
- final RemoteClientEndpoint<Object, Object> clientEndpoint = futureClient.get();
+ final RemoteClientEndpoint localRoot = ;
try {
- final Client<Object,Object> client = endpoint.createClient(clientEndpoint);
+ serverServiceEndpoint.autoClose();
+ final InetSocketAddress destAddr = new InetSocketAddress("localhost", 12345);
+ final TcpClient tcpClient = xnio.createTcpConnector().create().createChannelSource(destAddr);
try {
- clientEndpoint.autoClose();
- final Object result = client.send("Test").get();
- assertEquals("response", result);
- client.close();
- tcpServerCloseable.close();
- handle.close();
+ final ChannelSource<AllocatedMessageChannel> messageChannelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
+ final IoFuture<RemoteClientEndpoint> futureClient = BasicProtocol.connect(executorService, localRoot, messageChannelSource, allocator);
+ final RemoteClientEndpoint clientEndpoint = futureClient.get();
+ try {
+ final Client<Object,Object> client = endpoint.createClient(clientEndpoint);
+ try {
+ clientEndpoint.autoClose();
+ final Object result = client.send("Test").get();
+ assertEquals("response", result);
+ client.close();
+ cleanupLatch.await(500L, TimeUnit.MILLISECONDS);
+ tcpServerCloseable.close();
+ serviceHandle.close();
+ assertTrue(serviceOpened.get());
+ assertTrue(clientOpened.get());
+ assertTrue(client2Opened.get());
+ assertTrue(clientClosed.get());
+ assertTrue(serviceClosed.get());
+ } finally {
+ IoUtils.safeClose(client);
+ }
+ } finally {
+ IoUtils.safeClose(clientEndpoint);
+ }
} finally {
- IoUtils.safeClose(client);
- clientCloseLatch.await(500L, TimeUnit.MILLISECONDS);
+ // todo close tcpClient
}
} finally {
- IoUtils.safeClose(clientEndpoint);
+ IoUtils.safeClose(localRoot);
}
} finally {
IoUtils.safeClose(tcpServerCloseable);
}
} finally {
- IoUtils.safeClose(handle);
+ IoUtils.safeClose(serviceHandle);
}
} finally {
IoUtils.safeClose(serverServiceEndpoint);
@@ -169,9 +197,8 @@
} finally {
executorService.shutdownNow();
}
- assertTrue(serviceOpened.get());
- assertTrue(clientOpened.get());
- assertTrue(clientClosed.get());
- assertTrue(serviceClosed.get());
+ for (Throwable t : problems) {
+ throw t;
+ }
}
}
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-07-18 23:18:52 UTC (rev 4395)
@@ -31,7 +31,7 @@
}
public static <I, O> Client<I, O> createLocalClient(Endpoint endpoint, RequestListener<I, O> requestListener) throws RemotingException {
- final RemoteClientEndpoint<I, O> clientEndpoint = endpoint.createClientEndpoint(requestListener);
+ final RemoteClientEndpoint clientEndpoint = endpoint.createClientEndpoint(requestListener);
try {
return endpoint.createClient(clientEndpoint);
} finally {
@@ -40,7 +40,7 @@
}
public static <I, O> ClientSource<I, O> createLocalClientSource(Endpoint endpoint, RequestListener<I, O> requestListener) throws RemotingException {
- final RemoteServiceEndpoint<I, O> serviceEndpoint = endpoint.createServiceEndpoint(requestListener);
+ final RemoteServiceEndpoint serviceEndpoint = endpoint.createServiceEndpoint(requestListener);
try {
return endpoint.createClientSource(serviceEndpoint);
} finally {
16 years, 5 months
JBoss Remoting SVN: r4394 - remoting3/trunk/testing-support/src/main/resources.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-18 19:18:15 -0400 (Fri, 18 Jul 2008)
New Revision: 4394
Modified:
remoting3/trunk/testing-support/src/main/resources/testing.policy
Log:
Permission fix for tests
Modified: remoting3/trunk/testing-support/src/main/resources/testing.policy
===================================================================
--- remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-07-18 22:58:52 UTC (rev 4393)
+++ remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-07-18 23:18:15 UTC (rev 4394)
@@ -1,6 +1,11 @@
-// Permissions for the core tests
+// Permissions for the tests
+grant codeBase "file:${build.home}/api/target/test/classes/-"
+{
+ permission java.lang.RuntimePermission "modifyThread"; // for executor control
+};
+
grant codeBase "file:${build.home}/core/target/test/classes/-"
{
permission java.lang.RuntimePermission "modifyThread"; // for executor control
16 years, 5 months