JBoss Remoting SVN: r3456 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/service and 3 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-02-18 21:42:17 -0500 (Mon, 18 Feb 2008)
New Revision: 3456
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptor.java
Removed:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ContextService.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/SecurityService.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/TxnService.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractClientInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractServerInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
Log:
Improve the interceptor API
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java 2008-02-19 02:40:46 UTC (rev 3455)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -1,7 +1,6 @@
package org.jboss.cx.remoting;
import java.util.concurrent.ConcurrentMap;
-import org.jboss.cx.remoting.spi.ContextService;
/**
* A communications context. The context may be associated with a security/authentication state and a transactional
@@ -60,7 +59,7 @@
* @return an instance of the given interface
* @throws RemotingException if the service is not valid or is not available
*/
- <T extends ContextService> T getService(Class<T> serviceType) throws RemotingException;
+ <T> T getService(Class<T> serviceType) throws RemotingException;
/**
* Determine whether this context supports a service with the given client interface.
@@ -68,5 +67,5 @@
* @param serviceType the service interface type
* @return {@code true} if the given service type is supported
*/
- <T extends ContextService> boolean hasService(Class<T> serviceType);
+ <T> boolean hasService(Class<T> serviceType);
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/SecurityService.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/SecurityService.java 2008-02-19 02:40:46 UTC (rev 3455)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/SecurityService.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -1,14 +1,13 @@
package org.jboss.cx.remoting.service;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.spi.ContextService;
import javax.security.auth.callback.CallbackHandler;
/**
*
*/
-public interface SecurityService extends ContextService {
+public interface SecurityService {
/**
* @param userName
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/TxnService.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/TxnService.java 2008-02-19 02:40:46 UTC (rev 3455)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/TxnService.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -1,14 +1,13 @@
package org.jboss.cx.remoting.service;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.spi.ContextService;
import javax.transaction.xa.XAResource;
/**
*
*/
-public interface TxnService extends ContextService {
+public interface TxnService {
/**
* Begin a transaction on the remote side.
*
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractClientInterceptor.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractClientInterceptor.java 2008-02-19 02:40:46 UTC (rev 3455)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractClientInterceptor.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -1,68 +1,14 @@
package org.jboss.cx.remoting.spi;
-import org.jboss.cx.remoting.Context;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
-
/**
* A simple base implementation of {@code ContextServiceInterceptor}. Use this class as a base for simple
* implementations of that interface.
*/
-public abstract class AbstractClientInterceptor implements ClientInterceptor {
- protected final Context<?, ?> context;
- protected ClientInterceptor next, prev;
-
- protected AbstractClientInterceptor(final Context<?, ?> context) {
- this.context = context;
+public abstract class AbstractClientInterceptor<T> extends AbstractInterceptor implements ClientInterceptor<T> {
+ protected AbstractClientInterceptor() {
}
- public final void setNext(final ClientInterceptor next) {
- this.next = next;
- }
-
- public final void setPrevious(final ClientInterceptor prev) {
- this.prev = prev;
- }
-
- public void processOutboundRequest(final InterceptorContext context, final RequestIdentifier requestIdentifier, final Object request) {
- next.processOutboundRequest(context, requestIdentifier, request);
- }
-
- public void processInboundReply(final InterceptorContext context, final RequestIdentifier requestIdentifier, final Object reply) {
- prev.processInboundReply(context, requestIdentifier, reply);
- }
-
- public void processInboundException(final InterceptorContext context, final RequestIdentifier requestIdentifier, final RemoteExecutionException exception) {
- prev.processInboundException(context, requestIdentifier, exception);
- }
-
- public <T extends ContextService> T getContextService(InterceptorContext context) {
+ public T getContextService(InterceptorContext context) {
return null;
}
-
- public void processOutboundCancelRequest(final InterceptorContext context, final RequestIdentifier requestIdentifier, final boolean mayInterrupt) {
- next.processOutboundCancelRequest(context, requestIdentifier, true);
- }
-
- public void processInboundCancelAcknowledge(final InterceptorContext context, final RequestIdentifier requestIdentifier) {
- prev.processInboundCancelAcknowledge(context, requestIdentifier);
- }
-
- public final void close() {
- try {
- doClose();
- } catch (RuntimeException ex) {
- // todo - log the exception
- // consume
- } finally {
- next.close();
- }
- }
-
- /**
- * Actually perform the close operation. No delegation is necessary.
- */
- protected void doClose() {
- // do nothing by default; user should override
- }
}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractInterceptor.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractInterceptor.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractInterceptor.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -0,0 +1,34 @@
+package org.jboss.cx.remoting.spi;
+
+import org.jboss.cx.remoting.RemoteExecutionException;
+
+/**
+ *
+ */
+public abstract class AbstractInterceptor implements Interceptor {
+ protected AbstractInterceptor() {
+ }
+
+ public void processRequest(final InterceptorContext context, final Object request) {
+ context.nextRequest(request);
+ }
+
+ public void processReply(final InterceptorContext context, final Object reply) {
+ context.nextReply(reply);
+ }
+
+ public void processCancelRequest(final InterceptorContext context, final boolean mayInterrupt) {
+ context.nextCancelRequest(mayInterrupt);
+ }
+
+ public void processCancelAcknowledge(final InterceptorContext context) {
+ context.nextCancelAcknowledge();
+ }
+
+ public void processException(final InterceptorContext context, final RemoteExecutionException exception) {
+ context.nextException(exception);
+ }
+
+ public void close() {
+ }
+}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractServerInterceptor.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractServerInterceptor.java 2008-02-19 02:40:46 UTC (rev 3455)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractServerInterceptor.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -1,61 +1,10 @@
package org.jboss.cx.remoting.spi;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
-
/**
* A simple base implementation of {@code ContextServiceInterceptor}. Use this class as a base for simple
* implementations of that interface.
*/
-public abstract class AbstractServerInterceptor implements ServerInterceptor {
- protected ServerInterceptor next, prev;
-
+public abstract class AbstractServerInterceptor extends AbstractInterceptor implements ServerInterceptor {
protected AbstractServerInterceptor() {
}
-
- public final void setNext(final ServerInterceptor next) {
- this.next = next;
- }
-
- public final void setPrevious(final ServerInterceptor prev) {
- this.prev = prev;
- }
-
- public void processInboundCancelRequest(final InterceptorContext context, final RequestIdentifier requestIdentifier, final boolean mayInterruptIfRunning) {
- prev.processInboundCancelRequest(context, requestIdentifier, true);
- }
-
- public void processOutboundCancelAcknowledge(final InterceptorContext context, final RequestIdentifier requestIdentifier) {
- next.processOutboundCancelAcknowledge(context, requestIdentifier);
- }
-
- public void processInboundRequest(final InterceptorContext context, final RequestIdentifier requestIdentifier, final Object request) {
- prev.processInboundRequest(context, requestIdentifier, request);
- }
-
- public void processOutboundReply(final InterceptorContext context, final RequestIdentifier requestIdentifier, final Object reply) {
- next.processOutboundReply(context, requestIdentifier, reply);
- }
-
- public void processOutboundException(final InterceptorContext context, final RequestIdentifier requestIdentifier, final RemoteExecutionException exception) {
- next.processOutboundException(context, requestIdentifier, exception);
- }
-
- public final void close() {
- try {
- doClose();
- } catch (RuntimeException ex) {
- // todo - log the exception
- // consume
- } finally {
- next.close();
- }
- }
-
- /**
- * Actually perform the close operation. No delegation is necessary.
- */
- protected void doClose() {
- // do nothing by default; user should override
- }
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptor.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptor.java 2008-02-19 02:40:46 UTC (rev 3455)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptor.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -1,100 +1,16 @@
package org.jboss.cx.remoting.spi;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
-
/**
* An interceptor that provides an additional service to a {@code Context}. A context service interceptor is created
- * for every context service for each context. Upon creation, the interceptors are tied together using the {@code
- * setNext}/{@code setPrevious} methods. Afterwards, the {@code processXXX} methods are invoked to handle data coming
- * in or going out through the context.
- * <p/>
- * The interceptor {@code processXXX} methods are expected to delegate to the next or previous interceptor after
- * performing the required processing. This diagram illustrates the relationship between interceptors and the Remoting
- * core: <p><img src="Interceptors.png" alt="Diagram depicting the relationship between interceptors and the Remoting
- * core"/></p>
- * <p/>
- * The general rule is that outbound process methods delegate to the next handler, and inbound process methods delegate
- * to the previous handler. The methods may make exceptions in certain circumstances, as described in the method
- * documentation, in order to "short-circuit" the request mechanism or to affect message delivery in a service-specific
- * way.
- * <p/>
- * The methods {@code processOutboundRequest}, {@code processOutboundMessage}, {@code processInboundReply}, and {@code
- * processInboundException} are all executed on the requesting ("client") side of the context.
+ * for every context service for each context.
*/
-public interface ClientInterceptor {
- /**
- * Set the next context service handler. When requests are processed, each handler delegates to the next handler in
- * the chain. Called once after the context service hander is created.
- *
- * @param nextInterceptor the next interceptor
- */
- void setNext(ClientInterceptor nextInterceptor);
+public interface ClientInterceptor<T> extends Interceptor {
/**
- * Set the previous context service handler. When replies are processed, each handler delegates to the previous
- * handler in the chain. Called once after the context service hander is created.
- *
- * @param previousInterceptor the previous interceptor
- */
- void setPrevious(ClientInterceptor previousInterceptor);
-
- /**
* Get the context service object associated with this handler. This instance is the end-user's interface into this
* service. If no interface is available for this context service, return {@code null}.
*
* @return the context service object
*/
- <T extends ContextService> T getContextService(InterceptorContext context);
-
- /**
- * Process an outbound request.
- *
- * @param context the context service interceptor context
- * @param requestIdentifier the request identifier
- * @param request the outbound request
- */
- void processOutboundRequest(InterceptorContext context, RequestIdentifier requestIdentifier, Object request);
-
- /**
- * Process an inbound request reply.
- *
- * @param context the context service interceptor context
- * @param requestIdentifier the request identifier
- * @param reply the inbound reply
- */
- void processInboundReply(InterceptorContext context, RequestIdentifier requestIdentifier, Object reply);
-
- /**
- * Process an inbound request exception.
- *
- * @param context the context service interceptor context
- * @param requestIdentifier the request identifier
- * @param exception the inbound exception
- */
- void processInboundException(InterceptorContext context, RequestIdentifier requestIdentifier, RemoteExecutionException exception);
-
- /**
- * Process an outbound cancellation request.
- *
- * @param context the context service interceptor context
- * @param requestIdentifier the request identifier
- * @param mayInterrupt {@code true} if the operation can be interrupted
- */
- void processOutboundCancelRequest(InterceptorContext context, RequestIdentifier requestIdentifier, boolean mayInterrupt);
-
- /**
- * Process an inbound cancellation acknowledgement.
- *
- * @param context the context service interceptor context
- * @param requestIdentifier the request identifier
- */
- void processInboundCancelAcknowledge(InterceptorContext context, RequestIdentifier requestIdentifier);
-
- /**
- * Close this interceptor. The handler MUST subsequently close the NEXT interceptor in the chain (i.e. in a {@code
- * finally} block). The handler may not access the previous interceptor in the chain, since it will already have
- * been closed.
- */
- void close();
+ T getContextService(InterceptorContext context);
}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ContextService.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ContextService.java 2008-02-19 02:40:46 UTC (rev 3455)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ContextService.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -1,7 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-/**
- *
- */
-public interface ContextService {
-}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptor.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptor.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptor.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -0,0 +1,52 @@
+package org.jboss.cx.remoting.spi;
+
+import org.jboss.cx.remoting.RemoteExecutionException;
+
+/**
+ *
+ */
+public interface Interceptor {
+ /**
+ * Process a request.
+ *
+ * @param context the context service interceptor context
+ * @param request the outbound request
+ */
+ void processRequest(InterceptorContext context, Object request);
+
+ /**
+ * Process a request reply.
+ *
+ * @param context the context service interceptor context
+ * @param reply the inbound reply
+ */
+ void processReply(InterceptorContext context, Object reply);
+
+ /**
+ * Process a request exception.
+ *
+ * @param context the context service interceptor context
+ * @param exception the inbound exception
+ */
+ void processException(InterceptorContext context, RemoteExecutionException exception);
+
+ /**
+ * Process a cancellation request.
+ *
+ * @param context the context service interceptor context
+ * @param mayInterrupt {@code true} if the operation can be interrupted
+ */
+ void processCancelRequest(InterceptorContext context, boolean mayInterrupt);
+
+ /**
+ * Process a cancellation acknowledgement.
+ *
+ * @param context the context service interceptor context
+ */
+ void processCancelAcknowledge(InterceptorContext context);
+
+ /**
+ * Close this interceptor.
+ */
+ void close();
+}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorContext.java 2008-02-19 02:40:46 UTC (rev 3455)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorContext.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -1,7 +1,18 @@
package org.jboss.cx.remoting.spi;
+import org.jboss.cx.remoting.RemoteExecutionException;
+
/**
*
*/
public interface InterceptorContext {
+ void nextRequest(Object request);
+
+ void nextReply(Object reply);
+
+ void nextException(RemoteExecutionException exception);
+
+ void nextCancelRequest(boolean mayInterrupt);
+
+ void nextCancelAcknowledge();
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptor.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptor.java 2008-02-19 02:40:46 UTC (rev 3455)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptor.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -1,92 +1,8 @@
package org.jboss.cx.remoting.spi;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
-
/**
* An interceptor that provides an additional service to a {@code Context}. A context service interceptor is created
- * for every context service for each context. Upon creation, the interceptors are tied together using the {@code
- * setNext}/{@code setPrevious} methods. Afterwards, the {@code processXXX} methods are invoked to handle data coming
- * in or going out through the context.
- * <p/>
- * The interceptor {@code processXXX} methods are expected to delegate to the next or previous interceptor after
- * performing the required processing. This diagram illustrates the relationship between interceptors and the Remoting
- * core: <p><img src="Interceptors.png" alt="Diagram depicting the relationship between interceptors and the Remoting
- * core"/></p>
- * <p/>
- * The general rule is that outbound process methods delegate to the next handler, and inbound process methods delegate
- * to the previous handler. The methods may make exceptions in certain circumstances, as described in the method
- * documentation, in order to "short-circuit" the request mechanism or to affect message delivery in a service-specific
- * way.
- * <p/>
- * The methods {@code processInboundRequest}, {@code processInboundMessage}, {@code processOutboundReply}, and {@code
- * processOutboundException} all operate on the responding ("server") side of the context.
+ * for every context service for each context.
*/
-public interface ServerInterceptor {
- /**
- * Set the next context service handler. When requests are processed, each handler delegates to the next handler in
- * the chain. Called once after the context service hander is created.
- *
- * @param nextInterceptor the next interceptor
- */
- void setNext(ServerInterceptor nextInterceptor);
-
- /**
- * Set the previous context service handler. When replies are processed, each handler delegates to the previous
- * handler in the chain. Called once after the context service hander is created.
- *
- * @param previousInterceptor the previous interceptor
- */
- void setPrevious(ServerInterceptor previousInterceptor);
-
- /**
- * Process an inbound request.
- *
- * @param context the context service interceptor context
- * @param requestIdentifier the request identifier
- * @param request the inbound request
- */
- void processInboundRequest(InterceptorContext context, RequestIdentifier requestIdentifier, Object request);
-
- /**
- * Process an outbound reply.
- *
- * @param context the context service interceptor context
- * @param requestIdentifier the request identifier
- * @param reply the outbound reply
- */
- void processOutboundReply(InterceptorContext context, RequestIdentifier requestIdentifier, Object reply);
-
- /**
- * Process an outbound request exception.
- *
- * @param context the context service interceptor context
- * @param requestIdentifier the request identifier
- * @param exception the exception that was thrown
- */
- void processOutboundException(InterceptorContext context, RequestIdentifier requestIdentifier, RemoteExecutionException exception);
-
- /**
- * Process an inbound cancellation request.
- *
- * @param context the context service interceptor context
- * @param requestIdentifier the request identifier
- * @param mayInterruptIfRunning {@code true} if the operation can be interrupted
- */
- void processInboundCancelRequest(InterceptorContext context, RequestIdentifier requestIdentifier, boolean mayInterruptIfRunning);
-
- /**
- * Process an outbound cancellation acknowledgement.
- *
- * @param context the context service interceptor context
- * @param requestIdentifier the request identifier
- */
- void processOutboundCancelAcknowledge(InterceptorContext context, RequestIdentifier requestIdentifier);
-
- /**
- * Close this interceptor. The handler MUST subsequently close the NEXT interceptor in the chain (i.e. in a {@code
- * finally} block). The handler may not access the previous interceptor in the chain, since it will already have
- * been closed.
- */
- void close();
+public interface ServerInterceptor extends Interceptor {
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java 2008-02-19 02:40:46 UTC (rev 3455)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ContextWrapper.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -5,7 +5,6 @@
import org.jboss.cx.remoting.FutureReply;
import org.jboss.cx.remoting.RemoteExecutionException;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.spi.ContextService;
/**
*
@@ -33,11 +32,11 @@
return delegate.getAttributes();
}
- public <T extends ContextService> T getService(Class<T> serviceType) throws RemotingException {
+ public <T> T getService(Class<T> serviceType) throws RemotingException {
return delegate.getService(serviceType);
}
- public <T extends ContextService> boolean hasService(Class<T> serviceType) {
+ public <T> boolean hasService(Class<T> serviceType) {
return delegate.hasService(serviceType);
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java 2008-02-19 02:40:46 UTC (rev 3455)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java 2008-02-19 02:42:17 UTC (rev 3456)
@@ -10,7 +10,6 @@
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.core.util.AtomicStateMachine;
import org.jboss.cx.remoting.core.util.CollectionUtil;
-import org.jboss.cx.remoting.spi.ContextService;
import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
@@ -175,12 +174,12 @@
return contextMap;
}
- public <T extends ContextService> T getService(final Class<T> serviceType) throws RemotingException {
+ public <T> T getService(final Class<T> serviceType) throws RemotingException {
// todo interceptors
return null;
}
- public <T extends ContextService> boolean hasService(final Class<T> serviceType) {
+ public <T> boolean hasService(final Class<T> serviceType) {
// todo interceptors
return false;
}
16 years, 9 months
JBoss Remoting SVN: r3455 - remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-02-18 21:40:46 -0500 (Mon, 18 Feb 2008)
New Revision: 3455
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/QueueExecutor.java
Log:
javadoc update
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/QueueExecutor.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/QueueExecutor.java 2008-02-19 02:39:22 UTC (rev 3454)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/QueueExecutor.java 2008-02-19 02:40:46 UTC (rev 3455)
@@ -6,7 +6,10 @@
import org.jboss.cx.remoting.log.Logger;
/**
- *
+ * An executor designed to run all submitted tasks in the current thread. The queue is run continuously
+ * until the {@code shutdown()} method is invoked. Jobs may be submitted to the queue from any thread.
+ * Only one thread should invoke the {@code runQueue()} method, which will run until the executor is
+ * shut down.
*/
public final class QueueExecutor implements Executor {
private static final Logger log = Logger.getLogger(QueueExecutor.class);
16 years, 9 months
JBoss Remoting SVN: r3454 - remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-02-18 21:39:22 -0500 (Mon, 18 Feb 2008)
New Revision: 3454
Removed:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ThreadPoolInterceptor.java
Log:
Remove useless interceptor implementation
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ThreadPoolInterceptor.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ThreadPoolInterceptor.java 2008-02-19 02:38:59 UTC (rev 3453)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ThreadPoolInterceptor.java 2008-02-19 02:39:22 UTC (rev 3454)
@@ -1,68 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.core.util.CollectionUtil;
-import org.jboss.cx.remoting.log.Logger;
-import org.jboss.cx.remoting.spi.AbstractServerInterceptor;
-import org.jboss.cx.remoting.spi.InterceptorContext;
-import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
-
-/**
- *
- */
-public final class ThreadPoolInterceptor extends AbstractServerInterceptor {
- private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(30, 50, 0, null, new ArrayBlockingQueue<Runnable>(100, true));
- private final ConcurrentMap<RequestIdentifier, Future<Void>> requests = CollectionUtil.concurrentWeakHashMap();
-
- private static final Logger log = Logger.getLogger(ThreadPoolInterceptor.class);
-
- public void processInboundCancelRequest(final InterceptorContext context, final RequestIdentifier requestIdentifier, final boolean mayInterruptIfRunning) {
- try {
- super.processInboundCancelRequest(context, requestIdentifier, false);
- } finally {
- final Future<Void> future = requests.get(requestIdentifier);
- if (future != null) {
- if (future.cancel(mayInterruptIfRunning)) {
-
- }
- }
- }
- }
-
- public void processInboundRequest(final InterceptorContext context, final RequestIdentifier requestIdentifier, final Object request) {
- try {
- // Use FutureTask so that we get a Future<> before the task actually starts
- FutureTask<Void> task = new FutureTask<Void>(new Runnable() {
- public void run() {
- ThreadPoolInterceptor.super.processInboundRequest(context, requestIdentifier, request);
- }
- }, null);
- requests.put(requestIdentifier, task);
- threadPoolExecutor.execute(task);
- } catch (RejectedExecutionException e) {
- processOutboundException(context, requestIdentifier, new RemoteExecutionException("Request job submission rejected (the server may be too busy to service this request)", e));
- }
- }
-
- public void processOutboundCancelAcknowledge(final InterceptorContext context, final RequestIdentifier requestIdentifier) {
- requests.remove(requestIdentifier);
- super.processOutboundCancelAcknowledge(context, requestIdentifier);
- }
-
- public void processOutboundReply(final InterceptorContext context, final RequestIdentifier requestIdentifier, final Object reply) {
- requests.remove(requestIdentifier);
- super.processOutboundReply(context, requestIdentifier, reply);
- }
-
- public void processOutboundException(final InterceptorContext context, final RequestIdentifier requestIdentifier, final RemoteExecutionException exception) {
- requests.remove(requestIdentifier);
- super.processOutboundException(context, requestIdentifier, exception);
- }
-
-}
16 years, 9 months
JBoss Remoting SVN: r3453 - remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/core/util.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-02-18 21:38:59 -0500 (Mon, 18 Feb 2008)
New Revision: 3453
Added:
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/core/util/ServiceURI.java
Log:
Add service URI utility class
Added: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/core/util/ServiceURI.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/core/util/ServiceURI.java (rev 0)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/core/util/ServiceURI.java 2008-02-19 02:38:59 UTC (rev 3453)
@@ -0,0 +1,87 @@
+package org.jboss.cx.remoting.core.util;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+/**
+ *
+ */
+public final class ServiceURI {
+ public static final String SCHEME = "jrs";
+
+ private static final String FIRST_CHAR = "[$_a-zA-Z]";
+ private static final String SUBSEQUENT_CHAR = "[-+$_a-zA-Z0-9]*";
+ private static final String ID = FIRST_CHAR + SUBSEQUENT_CHAR;
+ private static final String SEPARATOR = "[./]";
+
+ private static final Pattern VALID_PATTERN = Pattern.compile("^(?:" + ID + "(?:" + SEPARATOR + ID + ")*)*$");
+
+ private final URI uri;
+ private final String serviceType;
+ private final String groupName;
+ private final String endpointName;
+
+ public ServiceURI(final String str) throws URISyntaxException {
+ this(new URI(str));
+ }
+
+ public ServiceURI(final URI uri) {
+ this.uri = uri;
+ if (! uri.getScheme().equals(SCHEME)) {
+ throw new IllegalArgumentException("Invalid URI scheme for service");
+ }
+ final String ssp = uri.getSchemeSpecificPart();
+ final int stcp = ssp.indexOf(':');
+ if (stcp == -1) {
+ serviceType = ssp;
+ groupName = "";
+ endpointName = "";
+ } else {
+ serviceType = ssp.substring(0, stcp).trim();
+ final int gncp = ssp.indexOf(':', stcp + 1);
+ if (gncp == -1) {
+ groupName = ssp.substring(stcp + 1).trim();
+ endpointName = "";
+ } else {
+ groupName = ssp.substring(stcp + 1, gncp).trim();
+ // ignore everything after the last :
+ final int encp = ssp.indexOf(':', gncp + 1);
+ if (encp == -1) {
+ endpointName = ssp.substring(gncp + 1).trim();
+ } else {
+ endpointName = ssp.substring(gncp + 1, encp).trim();
+ }
+ }
+ }
+ final Matcher matcher = VALID_PATTERN.matcher(serviceType);
+ if (! matcher.matches()) {
+ throw new IllegalArgumentException("Syntax error in service type URI part");
+ }
+ matcher.reset(groupName);
+ if (! matcher.matches()) {
+ throw new IllegalArgumentException("Syntax error in group name URI part");
+ }
+ matcher.reset(endpointName);
+ if (! matcher.matches()) {
+ throw new IllegalArgumentException("Syntax error in endpoint name URI part");
+ }
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public String getServiceType() {
+ return serviceType;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public String getEndpointName() {
+ return endpointName;
+ }
+}
16 years, 9 months
JBoss Remoting SVN: r3452 - in remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp: id and 1 other directory.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-02-18 21:10:12 -0500 (Mon, 18 Feb 2008)
New Revision: 3452
Added:
remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppEndpointProtocolHandler.java
remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppIoHandler.java
remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppIoHandlerFactory.java
remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppIoProvider.java
remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppMessageType.java
remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppProtocolBean.java
remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServerBean.java
remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppSession.java
remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppSessionProtocolHandler.java
Modified:
remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java
remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/IdentifierManager.java
Log:
OK, this branch was a bad idea. Just commit the half-working stuff and I'll just merge back the one or two good ideas by hand...
Modified: remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java
===================================================================
--- remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java 2008-02-17 19:15:34 UTC (rev 3451)
+++ remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java 2008-02-19 02:10:12 UTC (rev 3452)
@@ -37,13 +37,6 @@
import org.jboss.cx.remoting.jrpp.id.JrppStreamIdentifier;
import org.jboss.cx.remoting.jrpp.mina.IoBufferByteInput;
import org.jboss.cx.remoting.jrpp.mina.IoBufferByteOutput;
-import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
-import org.jboss.cx.remoting.spi.protocol.ProtocolContext;
-import org.jboss.cx.remoting.spi.protocol.ProtocolHandler;
-import org.jboss.cx.remoting.spi.protocol.ProtocolServerContext;
-import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
-import org.jboss.cx.remoting.spi.protocol.ServiceIdentifier;
-import org.jboss.cx.remoting.spi.protocol.StreamIdentifier;
import javax.security.auth.callback.CallbackHandler;
import javax.security.sasl.Sasl;
@@ -66,8 +59,6 @@
private static final String SASL_SERVER_FILTER_NAME = "SASL server filter";
private IoSession ioSession;
- private final ProtocolHandler protocolHandler;
- private final ProtocolContext protocolContext;
private final SingleSessionIoHandler ioHandler;
private final IdentifierManager identifierManager;
@@ -194,34 +185,10 @@
return ioHandler;
}
- public ProtocolHandler getProtocolHandler() {
- return protocolHandler;
- }
-
- public ProtocolContext getProtocolContext() {
- return protocolContext;
- }
-
private void write(ObjectOutput output, MessageType messageType) throws IOException {
output.writeByte(messageType.ordinal());
}
- private void write(ObjectOutput output, ServiceIdentifier serviceIdentifier) throws IOException {
- output.writeShort(((JrppServiceIdentifier)serviceIdentifier).getId());
- }
-
- private void write(ObjectOutput output, ContextIdentifier contextIdentifier) throws IOException {
- output.writeShort(((JrppContextIdentifier)contextIdentifier).getId());
- }
-
- private void write(ObjectOutput output, StreamIdentifier streamIdentifier) throws IOException {
- output.writeShort(((JrppStreamIdentifier)streamIdentifier).getId());
- }
-
- private void write(ObjectOutput output, RequestIdentifier requestIdentifier) throws IOException {
- output.writeShort(((JrppRequestIdentifier)requestIdentifier).getId());
- }
-
public void sendResponse(byte[] rawMsgData) throws IOException {
final IoBuffer buffer = newBuffer(rawMsgData.length + 100, false);
final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
Added: remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppEndpointProtocolHandler.java
===================================================================
--- remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppEndpointProtocolHandler.java (rev 0)
+++ remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppEndpointProtocolHandler.java 2008-02-19 02:10:12 UTC (rev 3452)
@@ -0,0 +1,32 @@
+package org.jboss.cx.remoting.jrpp;
+
+import org.jboss.cx.remoting.spi.protocol.EndpointProtocolHandler;
+import org.jboss.cx.remoting.spi.protocol.SessionProtocolHandler;
+import org.jboss.cx.remoting.spi.protocol.SessionProtocolContext;
+import org.jboss.cx.remoting.spi.protocol.EndpointProtocolContext;
+import java.net.URI;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class JrppEndpointProtocolHandler implements EndpointProtocolHandler {
+ private EndpointProtocolContext endpointProtocolContext;
+
+ public JrppEndpointProtocolHandler() {
+ }
+
+ public SessionProtocolHandler establishSession(SessionProtocolContext context, URI remoteURI) throws IOException {
+ final JrppSessionProtocolHandler sessionProtocolHandler = new JrppSessionProtocolHandler(context);
+ sessionProtocolHandler.connect(remoteURI);
+ return sessionProtocolHandler;
+ }
+
+ public EndpointProtocolContext getEndpointProtocolContext() {
+ return endpointProtocolContext;
+ }
+
+ public void setEndpointProtocolContext(final EndpointProtocolContext endpointProtocolContext) {
+ this.endpointProtocolContext = endpointProtocolContext;
+ }
+}
Added: remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppIoHandler.java
===================================================================
--- remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppIoHandler.java (rev 0)
+++ remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppIoHandler.java 2008-02-19 02:10:12 UTC (rev 3452)
@@ -0,0 +1,14 @@
+package org.jboss.cx.remoting.jrpp;
+
+import org.jboss.cx.remoting.core.util.MessageInput;
+import org.jboss.cx.remoting.core.util.MessageOutput;
+import java.io.IOException;
+
+/**
+ *
+ */
+public interface JrppIoHandler {
+ void handleMessage(MessageInput message) throws IOException;
+
+ MessageOutput getSaslMessageOutput();
+}
Added: remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppIoHandlerFactory.java
===================================================================
--- remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppIoHandlerFactory.java (rev 0)
+++ remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppIoHandlerFactory.java 2008-02-19 02:10:12 UTC (rev 3452)
@@ -0,0 +1,25 @@
+package org.jboss.cx.remoting.jrpp;
+
+import org.apache.mina.handler.multiton.SingleSessionIoHandlerFactory;
+import org.apache.mina.handler.multiton.SingleSessionIoHandler;
+import org.apache.mina.common.IoSession;
+import org.jboss.cx.remoting.spi.protocol.EndpointProtocolContext;
+
+/**
+ *
+ */
+public final class JrppIoHandlerFactory implements SingleSessionIoHandlerFactory {
+ private final EndpointProtocolContext endpointProtocolContext;
+
+ public JrppIoHandlerFactory(final EndpointProtocolContext endpointProtocolContext) {
+ this.endpointProtocolContext = endpointProtocolContext;
+ }
+
+ public SingleSessionIoHandler getHandler(IoSession ioSession) throws Exception {
+ final JrppSession jrppSession = new JrppSession();
+ jrppSession.setIoProvider(ioSession);
+ jrppSession.establishServer();
+ endpointProtocolContext.esablishSession(jrppSession.getSessionProtocolHandler());
+ return jrppSession.getSingleSessionIoHandler();
+ }
+}
Added: remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppIoProvider.java
===================================================================
--- remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppIoProvider.java (rev 0)
+++ remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppIoProvider.java 2008-02-19 02:10:12 UTC (rev 3452)
@@ -0,0 +1,24 @@
+package org.jboss.cx.remoting.jrpp;
+
+import org.jboss.cx.remoting.core.util.MessageOutput;
+import org.jboss.cx.remoting.core.util.MessageInput;
+import java.io.IOException;
+
+/**
+ *
+ */
+public interface JrppIoProvider {
+ MessageOutput createNewMessage(int estimatedSize) throws IOException;
+
+ boolean receiveSaslChallenge(MessageInput data) throws IOException;
+
+ boolean receiveSaslResponse(MessageInput data) throws IOException;
+
+ boolean sendSaslInitialChallenge() throws IOException;
+
+ boolean sendSaslInitialResponse() throws IOException;
+
+ void close();
+
+ void startEncryption() throws IOException;
+}
Added: remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppMessageType.java
===================================================================
--- remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppMessageType.java (rev 0)
+++ remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppMessageType.java 2008-02-19 02:10:12 UTC (rev 3452)
@@ -0,0 +1,25 @@
+package org.jboss.cx.remoting.jrpp;
+
+/**
+ *
+ */
+public enum JrppMessageType {
+ VERSION, /* 0 */
+ SASL_CHALLENGE,
+ SASL_RESPONSE,
+ AUTH_SUCCESS,
+ AUTH_FAILED,
+ OPEN_CONTEXT, /* 5 */
+ CANCEL_ACK,
+ CANCEL_REQ,
+ CLOSE_CONTEXT,
+ CLOSE_SERVICE,
+ CLOSE_STREAM, /* 10 */
+ EXCEPTION,
+ REPLY,
+ REQUEST,
+ SERVICE_ACTIVATE,
+ SERVICE_REQUEST, /* 15 */
+ SERVICE_TERMINATE,
+ STREAM_DATA,
+}
Added: remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppProtocolBean.java
===================================================================
--- remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppProtocolBean.java (rev 0)
+++ remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppProtocolBean.java 2008-02-19 02:10:12 UTC (rev 3452)
@@ -0,0 +1,94 @@
+package org.jboss.cx.remoting.jrpp;
+
+import org.jboss.cx.remoting.Endpoint;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.spi.protocol.ProtocolRegistration;
+import org.jboss.cx.remoting.spi.protocol.ProtocolRegistrationSpec;
+import org.jboss.cx.remoting.spi.protocol.EndpointProtocolContext;
+
+/**
+ *
+ */
+public final class JrppProtocolBean {
+ /**
+ * The endpoint.
+ *
+ * @protectedby {@code this}
+ */
+ private Endpoint endpoint;
+
+ /**
+ * The protocol registration.
+ *
+ * @protectedby {@code this}
+ */
+ private ProtocolRegistration registration;
+
+ /**
+ * The endpoint protocol context.
+ *
+ * @protectedby {@code this}
+ */
+ private EndpointProtocolContext endpointProtocolContext;
+
+ public Endpoint getEndpoint() {
+ synchronized(this) {
+ return endpoint;
+ }
+ }
+
+ public void setEndpoint(final Endpoint endpoint) {
+ synchronized(this) {
+ this.endpoint = endpoint;
+ }
+ }
+
+ public ProtocolRegistration getRegistration() {
+ return registration;
+ }
+
+ public EndpointProtocolContext getEndpointProtocolContext() {
+ return endpointProtocolContext;
+ }
+
+ // Lifecycle methods
+
+ public void create() throws RemotingException {
+ final JrppEndpointProtocolHandler endpointProtocolHandler = new JrppEndpointProtocolHandler();
+ final ProtocolRegistrationSpec spec = ProtocolRegistrationSpec.DEFAULT.setScheme("jrpp").setEndpointProtocolHandler(endpointProtocolHandler);
+ final ProtocolRegistration registration;
+ final EndpointProtocolContext endpointProtocolContext;
+ synchronized(this) {
+ registration = endpoint.registerProtocol(spec);
+ this.registration = registration;
+ }
+ endpointProtocolContext = registration.getEndpointProtocolContext();
+ this.endpointProtocolContext = endpointProtocolContext;
+ endpointProtocolHandler.setEndpointProtocolContext(endpointProtocolContext);
+ }
+
+ public void start() {
+ final ProtocolRegistration registration;
+ synchronized(this) {
+ registration = this.registration;
+ }
+ registration.start();
+ }
+
+ public void stop() {
+ final ProtocolRegistration registration;
+ synchronized(this) {
+ registration = this.registration;
+ }
+ registration.stop();
+ }
+
+ public void destroy() {
+ final ProtocolRegistration registration;
+ synchronized(this) {
+ registration = this.registration;
+ this.registration = null;
+ }
+ registration.unregister();
+ }
+}
Added: remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServerBean.java
===================================================================
--- remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServerBean.java (rev 0)
+++ remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServerBean.java 2008-02-19 02:10:12 UTC (rev 3452)
@@ -0,0 +1,98 @@
+package org.jboss.cx.remoting.jrpp;
+
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.mina.transport.socket.nio.NioProcessor;
+import org.apache.mina.handler.multiton.SingleSessionIoHandlerDelegate;
+import org.jboss.cx.remoting.jrpp.mina.FramingIoFilter;
+import java.util.concurrent.ExecutorService;
+import java.net.SocketAddress;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class JrppServerBean {
+ /**
+ * @protectedby {@code this}
+ */
+ private JrppProtocolBean protocolBean;
+ /**
+ * @protectedby {@code this}
+ */
+ private ExecutorService threadPool;
+ /**
+ * @protectedby {@code this}
+ */
+ private IoAcceptor acceptor;
+ /**
+ * @protectedby {@code this}
+ */
+ private SocketAddress address;
+
+ public JrppProtocolBean getProtocolBean() {
+ synchronized(this) {
+ return protocolBean;
+ }
+ }
+
+ public void setProtocolBean(final JrppProtocolBean protocolBean) {
+ synchronized(this) {
+ this.protocolBean = protocolBean;
+ }
+ }
+
+ public ExecutorService getThreadPool() {
+ synchronized(this) {
+ return threadPool;
+ }
+ }
+
+ public void setThreadPool(final ExecutorService threadPool) {
+ synchronized(this) {
+ this.threadPool = threadPool;
+ }
+ }
+
+ public SocketAddress getAddress() {
+ synchronized(this) {
+ return address;
+ }
+ }
+
+ public void setAddress(final SocketAddress address) {
+ synchronized(this) {
+ this.address = address;
+ }
+ }
+
+ // Lifecycle methods
+
+ public void create() {
+ synchronized(this) {
+ final NioProcessor nioProcessor = new NioProcessor(threadPool);
+ final IoAcceptor acceptor = new NioSocketAcceptor(threadPool, nioProcessor);
+ acceptor.setDefaultLocalAddress(address);
+ acceptor.setHandler(new SingleSessionIoHandlerDelegate(new JrppIoHandlerFactory(protocolBean.getEndpointProtocolContext())));
+ acceptor.getFilterChain().addLast("framing filter", new FramingIoFilter());
+ this.acceptor = acceptor;
+ }
+ }
+
+ public void start() throws IOException {
+ acceptor.bind(address);
+ }
+
+ public void stop() {
+ acceptor.unbind();
+ }
+
+ public void destroy() {
+ final IoAcceptor acceptor;
+ synchronized(this) {
+ acceptor = this.acceptor;
+ this.acceptor = null;
+ }
+ acceptor.dispose();
+ }
+}
Added: remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppSession.java
===================================================================
--- remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppSession.java (rev 0)
+++ remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppSession.java 2008-02-19 02:10:12 UTC (rev 3452)
@@ -0,0 +1,343 @@
+package org.jboss.cx.remoting.jrpp;
+
+import org.jboss.cx.remoting.spi.protocol.SessionProtocolHandler;
+import org.jboss.cx.remoting.spi.protocol.ServiceClientProtocolHandler;
+import org.jboss.cx.remoting.spi.protocol.ServiceClientProtocolContext;
+import org.jboss.cx.remoting.spi.protocol.ServiceProtocolContext;
+import org.jboss.cx.remoting.spi.protocol.ServerContextProtocolContext;
+import org.jboss.cx.remoting.spi.protocol.ClientRequestProtocolContext;
+import org.jboss.cx.remoting.core.util.MessageInput;
+import org.jboss.cx.remoting.core.util.MessageOutput;
+import org.jboss.cx.remoting.core.util.AtomicStateMachine;
+import org.jboss.cx.remoting.core.util.CollectionUtil;
+import org.jboss.cx.remoting.ServiceLocator;
+import org.jboss.cx.remoting.Endpoint;
+import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.log.Logger;
+import java.io.IOException;
+import static java.lang.Math.min;
+import java.util.Set;
+
+import javax.security.sasl.SaslException;
+
+/**
+ *
+ */
+public final class JrppSession {
+ /**
+ * The protocol version used by this version of Remoting. Value is transmitted as an unsigned short.
+ */
+ private static final int PROTOCOL_VERSION = 0x0000;
+
+ private static final Logger log = Logger.getLogger(JrppSession.class);
+
+ private final Endpoint localEndpoint;
+
+ private JrppIoProvider ioProvider;
+ private SessionProtocolHandler sessionProtocolHandler;
+ private JrppIoHandler ioHandler;
+
+ private int authRetries = 3;
+
+ private int protocolVersion;
+ private String remoteName;
+
+ private enum State {
+ /** Not yet connected */
+ INITIAL,
+ /** Client side, waiting to receive protocol version info */
+ AWAITING_SERVER_VERSION,
+ /** Server side, waiting to receive protocol version info */
+ AWAITING_CLIENT_VERSION,
+ /** Client side, auth phase */
+ AWAITING_SERVER_CHALLENGE,
+ /** Server side, auth phase */
+ AWAITING_CLIENT_RESPONSE,
+ /** Connection is up */
+ UP,
+ /** Session is shutting down or closed */
+ CLOSED,
+ }
+
+ private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
+
+ public JrppSession(final Endpoint localEndpoint) {
+ this.localEndpoint = localEndpoint;
+ }
+
+ // accessors
+
+ public void setIoProvider(final JrppIoProvider ioProvider) {
+ this.ioProvider = ioProvider;
+ }
+
+ public JrppIoProvider getIoProvider() {
+ return ioProvider;
+ }
+
+ // message util methods - write
+
+ private void write(MessageOutput messageOutput, JrppMessageType type) throws IOException {
+ messageOutput.writeByte(type.ordinal());
+ }
+
+ // message util methods - read
+
+ private JrppMessageType readMessageType(MessageInput input) throws IOException {
+ final int typeId = input.readByte() & 0xff;
+ final JrppMessageType[] types = JrppMessageType.values();
+ if (types.length < typeId) {
+ throw new IOException("Received an invalid message type: " + typeId);
+ }
+ return types[typeId];
+ }
+
+ // init methods
+
+ private void writeVersionMessage(State newState) throws IOException {
+ state.requireTransitionExclusive(State.INITIAL, newState);
+ boolean ok = false;
+ try {
+ final MessageOutput message = ioProvider.createNewMessage(40);
+ write(message, JrppMessageType.VERSION);
+ message.writeShort(PROTOCOL_VERSION);
+ final String name = localEndpoint.getName();
+ message.writeUTF(name != null ? name : "");
+ message.commit();
+ ok = true;
+ } finally {
+ try {
+ if (! ok) close();
+ } finally {
+ state.releaseExclusive();
+ }
+ }
+ }
+
+ public void establishServer() throws IOException {
+ writeVersionMessage(State.AWAITING_CLIENT_VERSION);
+ }
+
+ public void establishClient() throws IOException {
+ writeVersionMessage(State.AWAITING_SERVER_VERSION);
+ }
+
+ private void close() throws IOException {
+ if (state.transition(State.CLOSED)) {
+ ioProvider.close();
+ }
+ }
+
+ public SessionProtocolHandler getSessionProtocolHandler() {
+ return sessionProtocolHandler;
+ }
+
+ public JrppIoHandler getIoHandler() {
+ return ioHandler;
+ }
+
+ public final class Handler implements JrppIoHandler {
+ public void handleMessage(MessageInput message) throws IOException {
+ final JrppMessageType type = readMessageType(message);
+ final boolean trace = log.isTrace();
+ final State current = state.getState();
+ OUT: switch (current) {
+ case AWAITING_CLIENT_VERSION: {
+ switch (type) {
+ case VERSION: {
+ protocolVersion = min(message.readUnsignedShort(), PROTOCOL_VERSION);
+ if (trace) {
+ log.trace("Server negotiated protocol version " + protocolVersion);
+ }
+ final String name = message.readUTF();
+ remoteName = name.length() > 0 ? name : null;
+ state.requireTransition(current, State.AWAITING_CLIENT_RESPONSE);
+ return;
+ }
+ default: break OUT;
+ }
+ }
+ case AWAITING_SERVER_VERSION: {
+ switch (type) {
+ case VERSION: {
+ protocolVersion = min(message.readUnsignedShort(), PROTOCOL_VERSION);
+ if (trace) {
+ log.trace("Client negotiated protocol version " + protocolVersion);
+ }
+ final String name = message.readUTF();
+ remoteName = name.length() > 0 ? name : null;
+ state.requireTransition(current, State.AWAITING_SERVER_CHALLENGE);
+ ioProvider.sendSaslInitialResponse();
+ return;
+ }
+ default: break OUT;
+ }
+ }
+ case AWAITING_CLIENT_RESPONSE: {
+ switch (type) {
+ case SASL_RESPONSE: {
+ if (trace) {
+ log.trace("Recevied SASL response from client");
+ }
+ try {
+ if (ioProvider.receiveSaslResponse(message)) {
+ final MessageOutput output = ioProvider.createNewMessage(60);
+ write(output, JrppMessageType.AUTH_SUCCESS);
+ output.commit();
+ ioProvider.startEncryption();
+ }
+ } catch (SaslException ex) {
+ final MessageOutput output = ioProvider.createNewMessage(60);
+ write(output, JrppMessageType.AUTH_FAILED);
+ output.commit();
+ log.info("Client authentication failed (" + ex.getMessage() + ")");
+ }
+ return;
+ }
+ default: break OUT;
+ }
+ }
+ case AWAITING_SERVER_CHALLENGE: {
+ switch (type) {
+ case SASL_CHALLENGE: {
+ ioProvider.receiveSaslChallenge(message);
+ return;
+ }
+ case AUTH_SUCCESS: {
+ ioProvider.startEncryption();
+ state.requireTransition(current, State.UP);
+ return;
+ }
+ case AUTH_FAILED: {
+ log.debug("JRPP client authentication failed");
+ if (--authRetries > 0) {
+ ioProvider.sendSaslInitialResponse();
+ } else {
+ close();
+ }
+ return;
+ }
+ default: break OUT;
+ }
+ }
+ case UP: {
+ switch (type) {
+ case OPEN_CONTEXT: {
+ final int serviceId = message.readInt();
+ ServiceProtocolContext serviceProtocolContext = null; // todo look it up
+ final int contextId = message.readInt();
+ final ServerContextProtocolContext serverContextProtocolContext = serviceProtocolContext.openContext();
+ // todo put the new context in the map
+ return;
+ }
+ case CANCEL_ACK: {
+ final int contextId = message.readInt();
+ final int requestId = message.readInt();
+ ClientRequestProtocolContext clientRequestProtocolContext = null; // todo look it up
+ clientRequestProtocolContext.receiveCancelled();
+ // todo drop the request
+ return;
+ }
+ case CANCEL_REQ: {
+ final ContextIdentifier contextIdentifier = readCtxtId(message);
+ final RequestIdentifier requestIdentifier = readReqId(message);
+ final boolean mayInterrupt = message.readBoolean();
+ protocolContext.receiveCancelRequest(contextIdentifier, requestIdentifier, mayInterrupt);
+ return;
+ }
+ case CLOSE_CONTEXT: {
+ final ContextIdentifier contextIdentifier = readCtxtId(message);
+ protocolContext.closeContext(contextIdentifier);
+ return;
+ }
+ case CLOSE_SERVICE: {
+ final ServiceIdentifier serviceIdentifier = readSvcId(message);
+ protocolContext.closeService(serviceIdentifier);
+ return;
+ }
+ case CLOSE_STREAM: {
+ final StreamIdentifier streamIdentifier = readStrId(message);
+ protocolContext.closeStream(streamIdentifier);
+ return;
+ }
+ case EXCEPTION: {
+ final ContextIdentifier contextIdentifier = readCtxtId(message);
+ final RequestIdentifier requestIdentifier = readReqId(message);
+ final RemoteExecutionException exception = (RemoteExecutionException) message.readObject();
+ protocolContext.receiveException(contextIdentifier, requestIdentifier, exception);
+ return;
+ }
+ case REPLY: {
+ final ContextIdentifier contextIdentifier = readCtxtId(message);
+ final RequestIdentifier requestIdentifier = readReqId(message);
+ final Object reply = message.readObject();
+ protocolContext.receiveReply(contextIdentifier, requestIdentifier, reply);
+ return;
+ }
+ case REQUEST: {
+ final ContextIdentifier contextIdentifier = readCtxtId(message);
+ final RequestIdentifier requestIdentifier = readReqId(message);
+ final Object request = message.readObject();
+ if (trace) {
+ log.trace("Received request - body is %s", request);
+ }
+ protocolContext.receiveRequest(contextIdentifier, requestIdentifier, request);
+ return;
+ }
+ case SERVICE_ACTIVATE: {
+ final ServiceIdentifier serviceIdentifier = readSvcId(message);
+ protocolContext.receiveServiceActivate(serviceIdentifier);
+ return;
+ }
+ case SERVICE_REQUEST: {
+ final ServiceIdentifier serviceIdentifier = readSvcId(message);
+ final Class<?> requestType = (Class<?>) message.readObject();
+ final Class<?> replyType = (Class<?>) message.readObject();
+ final String serviceType = message.readUTF();
+ final String serviceGroupName = message.readUTF();
+ final Set<String> interceptors = CollectionUtil.hashSet();
+ int c = message.readInt();
+ for (int i = 0; i < c; i ++) {
+ interceptors.add(message.readUTF());
+ }
+ final ServiceLocator<?, ?> locator = ServiceLocator.DEFAULT
+ .setRequestType(requestType)
+ .setReplyType(replyType)
+ .setServiceType(serviceType)
+ .setServiceGroupName(serviceGroupName)
+ .setAvailableInterceptors(interceptors);
+ protocolContext.receiveServiceRequest(serviceIdentifier, locator);
+ return;
+ }
+ case SERVICE_TERMINATE: {
+ final ServiceIdentifier serviceIdentifier = readSvcId(message);
+ protocolContext.receiveServiceTerminate(serviceIdentifier);
+ return;
+ }
+ case STREAM_DATA: {
+ final StreamIdentifier streamIdentifier = readStrId(message);
+ protocolContext.receiveStreamData(streamIdentifier, message);
+ return;
+ }
+ default: break OUT;
+ }
+ }
+ }
+ throw new IllegalStateException("Got message " + type + " during " + currentState);
+ }
+
+ public MessageOutput getSaslMessageOutput() {
+ return null;
+ }
+ }
+
+ public final class WritingHandler implements SessionProtocolHandler {
+ public ServiceClientProtocolHandler sendServiceRequest(ServiceLocator<?, ?> locator, ServiceClientProtocolContext context) throws IOException {
+ return null;
+ }
+
+ public String getRemoteName() {
+ return null;
+ }
+ }
+}
Added: remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppSessionProtocolHandler.java
===================================================================
--- remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppSessionProtocolHandler.java (rev 0)
+++ remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppSessionProtocolHandler.java 2008-02-19 02:10:12 UTC (rev 3452)
@@ -0,0 +1,30 @@
+package org.jboss.cx.remoting.jrpp;
+
+import org.jboss.cx.remoting.spi.protocol.SessionProtocolHandler;
+import org.jboss.cx.remoting.spi.protocol.SessionProtocolContext;
+import org.jboss.cx.remoting.spi.protocol.ServiceClientProtocolHandler;
+import org.jboss.cx.remoting.spi.protocol.ServiceClientProtocolContext;
+import org.jboss.cx.remoting.ServiceLocator;
+import java.net.URI;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class JrppSessionProtocolHandler implements SessionProtocolHandler {
+ public JrppSessionProtocolHandler(final SessionProtocolContext context) {
+
+ }
+
+ public ServiceClientProtocolHandler sendServiceRequest(ServiceLocator<?, ?> locator, ServiceClientProtocolContext context) throws IOException {
+ return null;
+ }
+
+ public String getRemoteName() {
+ return null;
+ }
+
+ public void connect(final URI remoteURI) throws IOException {
+
+ }
+}
Modified: remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/IdentifierManager.java
===================================================================
--- remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/IdentifierManager.java 2008-02-17 19:15:34 UTC (rev 3451)
+++ remoting3/branches/protocol-context-rework/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/IdentifierManager.java 2008-02-19 02:10:12 UTC (rev 3452)
@@ -6,8 +6,10 @@
*
*/
public final class IdentifierManager {
- private final BitSet bitSet = new BitSet(64);
+ private static final int INITIAL_BIT_COUNT = 64;
+ private final BitSet bitSet = new BitSet(INITIAL_BIT_COUNT);
+
public synchronized short getIdentifier() {
final int id = bitSet.nextClearBit(1);
if (id > 0xffff) {
@@ -20,8 +22,4 @@
public synchronized void freeIdentifier(short id) {
bitSet.clear(id & 0xffff);
}
-
- public void getIdentifier(final short id) {
-
- }
}
16 years, 9 months
JBoss Remoting SVN: r3451 - remoting2/branches/2.x/src/tests/org/jboss/test/remoting/oneway.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-02-17 14:15:34 -0500 (Sun, 17 Feb 2008)
New Revision: 3451
Modified:
remoting2/branches/2.x/src/tests/org/jboss/test/remoting/oneway/OnewayThreadPoolTestCase.java
Log:
JBREM-658: (1) In testHeavyLoadClientSideHttp() gave invocations extra time; (2) renamed test cases accidentally named xtest...().
Modified: remoting2/branches/2.x/src/tests/org/jboss/test/remoting/oneway/OnewayThreadPoolTestCase.java
===================================================================
--- remoting2/branches/2.x/src/tests/org/jboss/test/remoting/oneway/OnewayThreadPoolTestCase.java 2008-02-17 19:10:08 UTC (rev 3450)
+++ remoting2/branches/2.x/src/tests/org/jboss/test/remoting/oneway/OnewayThreadPoolTestCase.java 2008-02-17 19:15:34 UTC (rev 3451)
@@ -89,7 +89,7 @@
* This test verifies that thread and queue size are correctly set
* on the server side and client side.
*/
- public void xtestConfiguration() throws Throwable
+ public void testConfiguration() throws Throwable
{
log.info("entering " + getName());
String host = InetAddress.getLocalHost().getHostAddress();
@@ -140,7 +140,7 @@
* The http client invoker does not return until after the invocation has
* returned a response.
*/
- public void xtestThreadPoolHttpClientSide() throws Throwable
+ public void testThreadPoolHttpClientSide() throws Throwable
{
log.info("entering " + getName());
String host = InetAddress.getLocalHost().getHostAddress();
@@ -204,7 +204,7 @@
* This test exercises the client side thread pool using the socket transport
* The socket client invoker waits for a response. See JBREM-706.
*/
- public void xtestThreadPoolSocketClientSide() throws Throwable
+ public void testThreadPoolSocketClientSide() throws Throwable
{
log.info("entering " + getName());
String host = InetAddress.getLocalHost().getHostAddress();
@@ -351,7 +351,7 @@
}
poolCounter++;
- Thread.sleep(10000);
+ Thread.sleep(15000);
// Verify INVOCATIONS invocations were received.
assertEquals(INVOCATIONS, handler.startedCount);
@@ -379,7 +379,7 @@
* This test verifies that the client side thread pool can function under
* a heavy load. It uses the socket transport.
*/
- public void xtestHeavyLoadClientSideSocket() throws Throwable
+ public void testHeavyLoadClientSideSocket() throws Throwable
{
log.info("entering " + getName());
String host = InetAddress.getLocalHost().getHostAddress();
@@ -442,7 +442,7 @@
/**
* This test exercises the server side thread pool using the http transport
*/
- public void xtestThreadPoolHttpServerSide() throws Throwable
+ public void testThreadPoolHttpServerSide() throws Throwable
{
log.info("entering " + getName());
String host = InetAddress.getLocalHost().getHostAddress();
@@ -516,7 +516,7 @@
/**
* This test exercises the server side thread pool using the socket transport
*/
- public void xtestThreadPoolSocketServerSide() throws Throwable
+ public void testThreadPoolSocketServerSide() throws Throwable
{
log.info("entering " + getName());
String host = InetAddress.getLocalHost().getHostAddress();
@@ -597,7 +597,7 @@
* This test verifies that the server side thread pool can function under
* a heavy load. It uses the http transport.
*/
- public void xtestHeavyLoadServerSideHttp() throws Throwable
+ public void testHeavyLoadServerSideHttp() throws Throwable
{
log.info("entering " + getName());
String host = InetAddress.getLocalHost().getHostAddress();
@@ -655,7 +655,7 @@
* This test verifies that the server side thread pool can function under
* a heavy load. It uses the socket transport.
*/
- public void xtestHeavyLoadServerSideSocket() throws Throwable
+ public void testHeavyLoadServerSideSocket() throws Throwable
{
log.info("entering " + getName());
String host = InetAddress.getLocalHost().getHostAddress();
16 years, 9 months