JBoss Remoting SVN: r3506 - remoting2/branches/2.2/src/main/org/jboss/remoting.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-02-25 20:37:17 -0500 (Mon, 25 Feb 2008)
New Revision: 3506
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/Version.java
Log:
JBREM-913: Updated version.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/Version.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/Version.java 2008-02-26 01:36:09 UTC (rev 3505)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/Version.java 2008-02-26 01:37:17 UTC (rev 3506)
@@ -32,7 +32,7 @@
public static final byte VERSION_2 = 2;
public static final byte VERSION_2_2 = 22;
- public static final String VERSION = "2.2.2.SP4 (Bluto)";
+ public static final String VERSION = "2.2.2.SP5 (Bluto)";
private static final byte byteVersion = VERSION_2_2;
private static byte defaultByteVersion = byteVersion;
private static boolean performVersioning = true;
17 years, 8 months
JBoss Remoting SVN: r3505 - remoting2/branches/2.2/src/main/org/jboss/remoting.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-02-25 20:36:09 -0500 (Mon, 25 Feb 2008)
New Revision: 3505
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/ServerInvoker.java
Log:
JBREM-892: Added facility for responding to ConnectionValidator based on status of Lease..
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/ServerInvoker.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/ServerInvoker.java 2008-02-26 00:50:56 UTC (rev 3504)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/ServerInvoker.java 2008-02-26 01:36:09 UTC (rev 3505)
@@ -211,6 +211,8 @@
* registered as ConnectionListeners.
*/
public static final String REGISTER_CALLBACK_LISTENER = "registerCallbackListener";
+
+ public static final String INVOKER_SESSION_ID = "invokerSessionId";
// Static ---------------------------------------------------------------------------------------
@@ -699,11 +701,23 @@
// check to see if this is a is alive ping
if ("$PING$".equals(param))
{
- // if checking lease, need to update lease flag
+ Map metadata = invocation.getRequestPayload();
+ if (metadata != null)
+ {
+ String invokerSessionId = (String) metadata.get(INVOKER_SESSION_ID);
+ if (invokerSessionId != null)
+ {
+ // Comes from ConnectionValidator configured to tie validation with lease.
+ boolean response = checkForClientLease(invokerSessionId);
+ if (trace) log.trace(this + " responding " + response + " to $PING$ for invoker sessionId " + invokerSessionId);
+ return new Boolean(response);
+ }
+ }
+
if (leaseManagement)
{
- //NOTE we only update the lease when we receive a PING, not for
- //all invocations
+ // Otherwise, it's a normal PING. NOTE we only update the lease when we
+ // receive a PING, not for all invocations.
updateClientLease(invocation);
}
@@ -1754,6 +1768,28 @@
}
}
+ private boolean checkForClientLease(String invokerSessionId)
+ {
+ if(leaseManagement && invokerSessionId != null)
+ {
+ if(trace) { log.trace("Checking lease for client session id: " + invokerSessionId); }
+
+ Lease clientLease = (Lease)clientLeases.get(invokerSessionId);
+ if(clientLease == null)
+ {
+ if(trace) { log.trace("No lease established for invoker session id (" + invokerSessionId + ")"); }
+ return false;
+ }
+ else
+ {
+ if(trace) { log.trace("Found lease for invoker session id (" + invokerSessionId + ")"); }
+ return true;
+ }
+ }
+
+ return false;
+ }
+
/**
* Takes the real invocation from the client out of the OnewayInvocation and then executes the
* invoke() with the real invocation on a seperate thread.
17 years, 8 months
JBoss Remoting SVN: r3504 - remoting2/branches/2.2/src/main/org/jboss/remoting.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-02-25 19:50:56 -0500 (Mon, 25 Feb 2008)
New Revision: 3504
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java
Log:
JBREM-892: Added facility for forcing shut down of LeasePinger.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java 2008-02-26 00:48:33 UTC (rev 3503)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java 2008-02-26 00:50:56 UTC (rev 3504)
@@ -308,6 +308,11 @@
{
return this.unmarshaller;
}
+
+ public String getSessionId()
+ {
+ return this.invokerSessionID;
+ }
public void terminateLease(String sessionId, int disconnectTimeout)
{
@@ -316,19 +321,39 @@
if(leasePinger != null)
{
leasePinger.setDisconnectTimeout(disconnectTimeout);
- boolean isLastClientLease = leasePinger.removeClient(sessionId);
- if(isLastClientLease)
+
+ if (sessionId == null)
{
+ // Independent of any particular Client - force LeasePinger shutdown.
+ // Should be called only if there is a reasonable belief that the lease
+ // has already stopped on the server side.
try
{
leasePinger.stopPing();
}
catch (Exception e)
{
- log.error("error shutting down lease pinger");
+ log.debug("error shutting down lease pinger");
}
leasePinger = null;
}
+ else
+ {
+ // Remove a particular Client.
+ boolean isLastClientLease = leasePinger.removeClient(sessionId);
+ if(isLastClientLease)
+ {
+ try
+ {
+ leasePinger.stopPing();
+ }
+ catch (Exception e)
+ {
+ log.debug("error shutting down lease pinger");
+ }
+ leasePinger = null;
+ }
+ }
}
}
}
17 years, 8 months
JBoss Remoting SVN: r3503 - remoting2/branches/2.2/src/main/org/jboss/remoting.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-02-25 19:48:33 -0500 (Mon, 25 Feb 2008)
New Revision: 3503
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/ConnectionValidator.java
Log:
JBREM-892: Added facility for tying ConnectionValidator to Lease and LeasePinger.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/ConnectionValidator.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/ConnectionValidator.java 2008-02-26 00:40:52 UTC (rev 3502)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/ConnectionValidator.java 2008-02-26 00:48:33 UTC (rev 3503)
@@ -68,6 +68,17 @@
*/
public static final String DEFAULT_NUMBER_OF_CONNECTION_RETRIES = "1";
+ /**
+ * Key to determine if ConnectionValidator should tie failure to presence
+ * of active lease on server side. Default value is "true".
+ */
+ public static final String TIE_TO_LEASE = "tieToLease";
+ /**
+ * Key to determine whether to stop ConnectionValidator when PING fails.
+ * Default value is "true".
+ */
+ public static final String STOP_LEASE_ON_FAILURE = "stopLeaseOnFailure";
+
// Static ---------------------------------------------------------------------------------------
private static boolean trace = log.isTraceEnabled();
@@ -197,6 +208,9 @@
private ClientInvoker clientInvoker;
private Object lock = new Object();
private volatile boolean stopped;
+ private String invokerSessionId;
+ private boolean tieToLease = true;
+ private boolean stopLeaseOnFailure = true;
// Constructors ---------------------------------------------------------------------------------
@@ -211,7 +225,7 @@
this.pingPeriod = pingPeriod;
this.listeners = new ArrayList();
this.stopped = false;
-
+ getParameters(client, new HashMap());
log.debug(this + " created");
}
@@ -221,60 +235,8 @@
this.pingPeriod = DEFAULT_PING_PERIOD;
this.listeners = new ArrayList();
this.stopped = false;
-
- Map config = client.getConfiguration();
- if (config != null)
- {
- Object o = config.get(VALIDATOR_PING_PERIOD);
- if (o != null)
- {
- if (o instanceof String)
- {
- try
- {
- pingPeriod = Long.parseLong((String)o);
- }
- catch (Exception e)
- {
- log.warn(this + " could not convert " + VALIDATOR_PING_PERIOD +
- " value of " + o + " to a long value");
- }
- }
- else
- {
- log.warn(this + " could not convert " + VALIDATOR_PING_PERIOD +
- " value of " + o + " to a long value: must be a String");
- }
- }
- }
-
- if (metadata != null)
- {
- this.metadata = new HashMap(metadata);
-
- Object o = metadata.get(VALIDATOR_PING_PERIOD);
- if (o != null)
- {
- if (o instanceof String)
- {
- try
- {
- pingPeriod = Long.parseLong((String)o);
- }
- catch (Exception e)
- {
- log.warn(this + " could not convert " + VALIDATOR_PING_PERIOD +
- " value of " + o + " to a long value");
- }
- }
- else
- {
- log.warn(this + " could not convert " + VALIDATOR_PING_PERIOD +
- " value of " + o + " to a long value: must be a String");
- }
- }
- }
-
+ this.metadata = new HashMap(metadata);
+ getParameters(client, metadata);
log.debug(this + " created");
}
@@ -304,20 +266,44 @@
try
{
if (trace) { log.trace(this + " pinging ..."); }
+
+ boolean isValid = false;
- boolean isValid = doCheckConnection(clientInvoker);
+ if (tieToLease && client.getLeasePeriod() > 0)
+ {
+ if (trace) log.trace(this + " sending PING tied to lease");
+ isValid = doCheckConnectionWithLease();
+ }
+ else
+ {
+ isValid = doCheckConnection(clientInvoker);
+ }
if (!isValid)
{
log.debug(this + "'s connections is invalid");
notifyListeners(new Exception("Could not connect to server!"));
+
+ if (stopLeaseOnFailure)
+ {
+ log.debug(this + " detected connection failure: stopping LeasePinger");
+ MicroRemoteClientInvoker invoker = (MicroRemoteClientInvoker) client.getInvoker();
+ invoker.terminateLease(null, client.getDisconnectTimeout());
+ log.debug(this + " shut down lease pinger");
+ }
}
}
catch (Throwable thr)
{
log.debug(this + " got throwable while pinging", thr);
notifyListeners(thr);
+
+ if (stopLeaseOnFailure)
+ {
+ log.debug(this + " detected connection failure: stopping");
+ cancel();
+ }
}
}
}
@@ -383,6 +369,94 @@
// Private --------------------------------------------------------------------------------------
+ private void getParameters(Client client, Map metadata)
+ {
+ getParametersFromMap(client.getConfiguration());
+ getParametersFromMap(metadata);
+
+ ClientInvoker clientInvoker = client.getInvoker();
+ if (clientInvoker instanceof MicroRemoteClientInvoker)
+ {
+ invokerSessionId = ((MicroRemoteClientInvoker) clientInvoker).getSessionId();
+ }
+ else
+ {
+ throw new RuntimeException("creating a ConnectionValidator on a local connection");
+ }
+ }
+
+ private void getParametersFromMap(Map config)
+ {
+ if (config != null)
+ {
+ Object o = config.get(VALIDATOR_PING_PERIOD);
+ if (o != null)
+ {
+ if (o instanceof String)
+ {
+ try
+ {
+ pingPeriod = Long.parseLong((String)o);
+ }
+ catch (Exception e)
+ {
+ log.warn(this + " could not convert " + VALIDATOR_PING_PERIOD +
+ " value of " + o + " to a long value");
+ }
+ }
+ else
+ {
+ log.warn(this + " could not convert " + VALIDATOR_PING_PERIOD +
+ " value of " + o + " to a long value: must be a String");
+ }
+ }
+
+ o = config.get(TIE_TO_LEASE);
+ if (o != null)
+ {
+ if (o instanceof String)
+ {
+ try
+ {
+ tieToLease = Boolean.valueOf(((String) o)).booleanValue();
+ }
+ catch (Exception e)
+ {
+ log.warn(this + " could not convert " + TIE_TO_LEASE + " value" +
+ " to a boolean: " + o);
+ }
+ }
+ else
+ {
+ log.warn(this + " could not convert " + TIE_TO_LEASE + " value" +
+ " to a boolean: must be a String");
+ }
+ }
+
+ o = config.get(STOP_LEASE_ON_FAILURE);
+ if (o != null)
+ {
+ if (o instanceof String)
+ {
+ try
+ {
+ stopLeaseOnFailure = Boolean.valueOf(((String) o)).booleanValue();
+ }
+ catch (Exception e)
+ {
+ log.warn(this + " could not convert " + STOP_LEASE_ON_FAILURE + " value" +
+ " to a boolean: " + o);
+ }
+ }
+ else
+ {
+ log.warn(this + " could not convert " + STOP_LEASE_ON_FAILURE + " value" +
+ " to a boolean: must be a String");
+ }
+ }
+ }
+ }
+
private void start()
{
configMap = createPingConfig(client.getConfiguration(), metadata);
@@ -412,11 +486,46 @@
log.debug(this + " started");
}
+
+ private boolean doCheckConnectionWithLease() throws Throwable
+ {
+ boolean pingWorked = false;
+ try
+ {
+ Map metadata = new HashMap();
+ metadata.put(ServerInvoker.INVOKER_SESSION_ID, this.invokerSessionId);
+ InvocationRequest ir =
+ new InvocationRequest(null, Subsystem.SELF, "$PING$", metadata, null, null);
+
+ if (trace) { log.trace("pinging, sending " + ir + " over " + clientInvoker); }
+
+ Object o = clientInvoker.invoke(ir);
+ if (o instanceof Boolean && !((Boolean) o).booleanValue())
+ {
+ // Server indicates lease has stopped.
+ throw new Exception();
+ }
+
+ if (trace) { log.trace("ConnectionValidator got successful ping using " + clientInvoker);}
+
+ pingWorked = true;
+ }
+ catch (Throwable t)
+ {
+ log.debug("ConnectionValidator failed to ping via " + clientInvoker, t);
+ }
+
+ return pingWorked;
+ }
+
private boolean doStop()
{
synchronized(lock)
{
+ if (stopped)
+ return false;
+
if (!listeners.isEmpty())
{
listeners.clear();
17 years, 8 months
JBoss Remoting SVN: r3502 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/service and 8 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-02-25 19:40:52 -0500 (Mon, 25 Feb 2008)
New Revision: 3502
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassReply.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassRequest.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceReply.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java
Removed:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/InterceptorDeploymentSpec.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceDeploymentSpec.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceLocator.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/SecurityService.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/TxnService.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractClientInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractServerInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptorFactory.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Discovery.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/EndpointProvider.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorSpec.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ListenerFactoryContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptorFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpointProvider.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/IdentifierManager.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemotingException.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Session.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandler.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandlerFactory.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/RequestContextWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/SessionWrapper.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalProtocol.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionImpl.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppContextIdentifier.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppRequestIdentifier.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppServiceIdentifier.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppStreamIdentifier.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppSubChannelIdentifier.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
Major overhaul of the service location system. Now Context and ContextSource instances can be sent in requests and replies to facilitate service location using the standard request mechanism - with the added benefit of providing secondary service such as classloading etc. using the exact same mechanism.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,12 +1,13 @@
package org.jboss.cx.remoting;
import java.util.concurrent.ConcurrentMap;
+import java.io.Closeable;
/**
* A communications context. The context may be associated with a security/authentication state and a transactional
* state, as well as other state maintained by the remote side.
*/
-public interface Context<I, O> {
+public interface Context<I, O> extends Closeable {
void close() throws RemotingException;
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -3,8 +3,6 @@
import java.net.URI;
import java.util.concurrent.ConcurrentMap;
import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.cx.remoting.spi.Discovery;
-import org.jboss.cx.remoting.spi.Registration;
import org.jboss.cx.remoting.spi.protocol.ProtocolRegistration;
import org.jboss.cx.remoting.spi.protocol.ProtocolRegistrationSpec;
@@ -21,29 +19,10 @@
ConcurrentMap<Object, Object> getAttributes();
/**
- * Shut down this endpoint. Cancel any outstanding requests, tear down thread pools.
- */
- void shutdown();
-
- /**
- * Add a shutdown listener. This listener will be called after shutdown has been initiated.
- *
- * @param listener the listener
- */
- void addShutdownListener(EndpointShutdownListener listener);
-
- /**
- * Remove a previously added shutdown listener.
- *
- * @param listener the listener
- */
- void removeShutdownListener(EndpointShutdownListener listener);
-
- /**
* Open a session with another endpoint. The protocol used is determined by the URI scheme. The URI user-info part
* must be {@code null} unless the specific protocol has an additional authentication scheme (e.g. HTTP BASIC). The
- * authority is used to locate the server (the exact interpretation is dependent upon the protocol). The URI path is
- * the service to connect to. The path may be relative to a protocol-specific deployment path.
+ * authority is used to locate the server (the exact interpretation is dependent upon the protocol). The path may be
+ * relative to a protocol-specific deployment path.
*
* @param remoteUri the URI of the server to connect to
* @param attributeMap the attribute map to use to configure this session
@@ -62,18 +41,6 @@
String getName();
/**
- * Deploy a service into this endpoint.
- *
- * @param spec the specification for this service deployment
- *
- * @return a registration that may be used to control this deployment
- *
- * @throws RemotingException if the registration failed
- * @throws IllegalArgumentException if the specification failed validation
- */
- <I, O> Registration deployService(ServiceDeploymentSpec<I, O> spec) throws RemotingException, IllegalArgumentException;
-
- /**
* Register a protocol specification for this endpoint.
*
* @param spec the protocol specification
@@ -86,32 +53,21 @@
ProtocolRegistration registerProtocol(ProtocolRegistrationSpec spec) throws RemotingException, IllegalArgumentException;
/**
- * Deploy a context interceptor type into this endpoint. Subsequent sessions may negotiate to use this context
- * service.
+ * Create a context that can be used to invoke a request listener on this endpoint. The context may be passed to a
+ * remote endpoint as part of a request or a reply, or it may be used locally.
*
- * @param spec the deployment specification
- *
- * @return a registration that may be used to control this deployment
- *
- * @throws RemotingException if the registration failed
- * @throws IllegalArgumentException if the specification failed validation
+ * @param requestListener the request listener
+ * @return the context
*/
- Registration deployInterceptorType(InterceptorDeploymentSpec spec) throws RemotingException, IllegalArgumentException;
+ <I, O> Context<I, O> createContext(RequestListener<I, O> requestListener);
/**
- * Discover a remote endpoint. Adds the host to the internal routing table of the endpoint. Higher cost indicates
- * a less desirable route.
- * <p/>
- * The next hop URI should also include a path component, if the target endpoint is deployed relative to a base path
- * (e.g. a servlet).
+ * Create a context source that can be used to acquire contexts associated with a request listener on this endpoint.
+ * The context source may be passed to a remote endpoint as part of a request or a reply, or it may be used locally.
+ * The objects that are produced by this method may be used to mass-produce {@code Context} instances.
*
- * @param endpointName the name of the discovered endpoint
- * @param nextHop the URI of the means to connect to the next "hop" towards the named endpoint
- * @param cost the "cost" associated with traversing this route
- *
- * @return an obejct representing the discovery
- *
- * @throws RemotingException if there is a problem with the discovery parameters
+ * @param requestListener the request listener
+ * @return the context source
*/
- Discovery discover(String endpointName, URI nextHop, int cost) throws RemotingException;
+ <I, O> ContextSource<I, O> createService(RequestListener<I, O> requestListener);
}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/InterceptorDeploymentSpec.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/InterceptorDeploymentSpec.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/InterceptorDeploymentSpec.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,40 +0,0 @@
-package org.jboss.cx.remoting;
-
-import org.jboss.cx.remoting.spi.ClientInterceptorFactory;
-
-/**
- *
- */
-public final class InterceptorDeploymentSpec {
- private final String identifier;
- private final int preference;
- private final ClientInterceptorFactory contextInterceptorFactory;
-
- public InterceptorDeploymentSpec(final String identifier, final int preference, final ClientInterceptorFactory contextInterceptorFactory) {
- this.identifier = identifier;
- this.preference = preference;
- this.contextInterceptorFactory = contextInterceptorFactory;
- }
-
- public static final InterceptorDeploymentSpec DEFAULT = new InterceptorDeploymentSpec(null, 1000, null);
-
- public String getIdentifier() {
- return identifier;
- }
-
- public ClientInterceptorFactory getContextInterceptorFactory() {
- return contextInterceptorFactory;
- }
-
- public InterceptorDeploymentSpec setIdentifier(String identifier) {
- return new InterceptorDeploymentSpec(identifier, preference, contextInterceptorFactory);
- }
-
- public InterceptorDeploymentSpec setPreference(int preference) {
- return new InterceptorDeploymentSpec(identifier, preference, contextInterceptorFactory);
- }
-
- public InterceptorDeploymentSpec setContextInterceptorFactory(ClientInterceptorFactory contextInterceptorFactory) {
- return new InterceptorDeploymentSpec(identifier, preference, contextInterceptorFactory);
- }
-}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemotingException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemotingException.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemotingException.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,9 +1,11 @@
package org.jboss.cx.remoting;
+import java.io.IOException;
+
/**
*
*/
-public class RemotingException extends Exception {
+public class RemotingException extends IOException {
public RemotingException() {
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestContext.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestContext.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,9 +1,11 @@
package org.jboss.cx.remoting;
+import java.util.concurrent.Executor;
+
/**
* The context of a single request.
*/
-public interface RequestContext<O> {
+public interface RequestContext<O> extends Executor {
/**
* Determine whether the current request was cancelled.
*
@@ -46,4 +48,12 @@
* @param handler
*/
void addCancelHandler(RequestCancelHandler<O> handler);
+
+ /**
+ * Execute a task in the context of this request. This method can be used to continue execution of a request. Any
+ * tasks submitted to this executor will be interruptable in the event of cancellation.
+ *
+ * @param command the task to execute
+ */
+ void execute(Runnable command);
}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceDeploymentSpec.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceDeploymentSpec.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceDeploymentSpec.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,123 +0,0 @@
-package org.jboss.cx.remoting;
-
-import java.util.Collections;
-import java.util.List;
-import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.spi.InterceptorSpec;
-
-/**
- *
- */
-public final class ServiceDeploymentSpec<I, O> {
- private final List<InterceptorSpec> interceptorSpecs;
- private final String serviceName;
- private final String serviceType;
- private final Class<I> requestType;
- private final Class<O> replyType;
- private final RequestListener<I, O> requestListener;
-
- public static final ServiceDeploymentSpec<Void, Void> DEFAULT = new ServiceDeploymentSpec<Void, Void>(Collections.<InterceptorSpec>emptyList(), null, null, Void.class, Void.class, null);
-
- private ServiceDeploymentSpec(final List<InterceptorSpec> interceptorSpecs, final String serviceName, final String serviceType, final Class<I> requestType, final Class<O> replyType, final RequestListener<I, O> requestListener) {
- this.interceptorSpecs = interceptorSpecs;
- this.serviceName = serviceName;
- this.serviceType = serviceType;
- this.requestType = requestType;
- this.replyType = replyType;
- this.requestListener = requestListener;
- }
-
- public List<InterceptorSpec> getInterceptorSpecs() {
- return interceptorSpecs;
- }
-
- public String getServiceName() {
- return serviceName;
- }
-
- public String getServiceType() {
- return serviceType;
- }
-
- public Class<I> getRequestType() {
- return requestType;
- }
-
- public Class<O> getReplyType() {
- return replyType;
- }
-
- public RequestListener<I, O> getRequestListener() {
- return requestListener;
- }
-
- public ServiceDeploymentSpec<I, O> setInterceptorSpecs(InterceptorSpec... specs) {
- if (specs == null) {
- throw new NullPointerException("specs is null");
- }
- return new ServiceDeploymentSpec<I, O>(CollectionUtil.unmodifiableList(specs.clone()), serviceName, serviceType, requestType, replyType, requestListener);
- }
-
- public ServiceDeploymentSpec<I, O> setServiceGroupName(String serviceName) {
- if (serviceName == null) {
- throw new NullPointerException("serviceName is null");
- }
- return new ServiceDeploymentSpec<I, O>(interceptorSpecs, serviceName, serviceType, requestType, replyType, requestListener);
- }
-
- public ServiceDeploymentSpec<I, O> setServiceType(final String serviceType) {
- if (serviceType == null) {
- throw new NullPointerException("serviceType is null");
- }
- return new ServiceDeploymentSpec<I, O>(interceptorSpecs, serviceName, serviceType, requestType, replyType, requestListener);
- }
-
- public <T> ServiceDeploymentSpec<T, O> setRequestType(Class<T> requestType) {
- if (requestType == null) {
- throw new NullPointerException("requestType is null");
- }
- return new ServiceDeploymentSpec<T, O>(interceptorSpecs, serviceName, serviceType, requestType, replyType, null);
- }
-
- public <T> ServiceDeploymentSpec<I, T> setReplyType(Class<T> replyType) {
- if (replyType == null) {
- throw new NullPointerException("replyType is null");
- }
- return new ServiceDeploymentSpec<I, T>(interceptorSpecs, serviceName, serviceType, requestType, replyType, null);
- }
-
- public ServiceDeploymentSpec<I, O> setRequestListener(RequestListener<I, O> requestListener) {
- if (requestListener == null) {
- throw new NullPointerException("requestListener is null");
- }
- return new ServiceDeploymentSpec<I, O>(interceptorSpecs, serviceName, serviceType, requestType, replyType, requestListener);
- }
-
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("Service specification for ");
- if (serviceType == null) {
- builder.append("untyped ");
- } else {
- builder.append("typed (\"");
- builder.append(serviceType);
- builder.append("\") ");
- }
- if (serviceName == null) {
- builder.append(", unnamed service ");
- } else {
- builder.append("service named \"");
- builder.append(serviceName);
- builder.append("\" ");
- }
- builder.append(": Request type is ");
- builder.append(requestType.getName());
- builder.append(", reply type is ");
- builder.append(replyType.getName());
- builder.append(", interceptors are ");
- builder.append(interceptorSpecs.toString());
- builder.append(", request listener is ");
- builder.append(requestListener.toString());
- return builder.toString();
- }
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceLocator.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceLocator.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceLocator.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,189 +0,0 @@
-package org.jboss.cx.remoting;
-
-import java.util.Collections;
-import java.util.Set;
-
-/**
- *
- */
-public final class ServiceLocator<I, O> {
-
- /**
- * A basic service locator. Use this instance to create more specific locators.
- */
- public static final ServiceLocator<Void, Void> DEFAULT = new ServiceLocator<Void, Void>(Void.class, Void.class, null, "*", "*", Collections.<String>emptySet());
-
- private final Class<I> requestType;
- private final Class<O> replyType;
- private final String serviceType;
- private final String serviceGroupName;
- private final String endpointName;
- private final Set<String> availableInterceptors;
-
- private ServiceLocator(final Class<I> requestType, final Class<O> replyType, final String serviceType, final String serviceGroupName, final String endpointName, final Set<String> availableInterceptors) {
- if (requestType == null) {
- throw new NullPointerException("requestType is null");
- }
- if (replyType == null) {
- throw new NullPointerException("replyType is null");
- }
- if (availableInterceptors == null) {
- throw new NullPointerException("availableInterceptors is null");
- }
- this.requestType = requestType;
- this.replyType = replyType;
- this.serviceType = serviceType;
- this.serviceGroupName = serviceGroupName;
- this.endpointName = endpointName;
- this.availableInterceptors = availableInterceptors;
- }
-
- /**
- * Get the request type for this service locator. The remote service will match this request type if the actual
- * service accepts this type, or a superclass or superinterface thereof.
- *
- * @return the request type
- */
- public Class<I> getRequestType() {
- return requestType;
- }
-
- /**
- * Get the reply type for this service locator. The remote service will match this reply type if the actual
- * service returns this type, or a subtype thereof.
- *
- * @return the reply type
- */
- public Class<O> getReplyType() {
- return replyType;
- }
-
- /**
- * Get the name of the service group for this service locator.
- *
- * @return the service group name
- */
- public String getServiceGroupName() {
- return serviceGroupName;
- }
-
- /**
- * Get the type of the service for this service locator.
- *
- * @return the service type
- */
- public String getServiceType() {
- return serviceType;
- }
-
- /**
- * Get the name of the endpoitn for this service locator.
- *
- * @return the endpoint name
- */
- public String getEndpointName() {
- return endpointName;
- }
-
- /**
- * Get the names of the interceptors that the client has available.
- *
- * @return the names
- */
- public Set<String> getAvailableInterceptors() {
- return availableInterceptors;
- }
-
- /**
- * Change the request type. This method does not modify this object; instead, it returns a new modified instance.
- *
- * @param requestType the new request type
- *
- * @return an updated service locator
- */
- public <T> ServiceLocator<T, O> setRequestType(Class<T> requestType) {
- return new ServiceLocator<T, O>(requestType, replyType, serviceType, serviceGroupName, endpointName, availableInterceptors);
- }
-
- /**
- * Change the request type. This method does not modify this object; instead, it returns a new modified instance.
- *
- * @param replyType the new request type
- *
- * @return an updated service locator
- */
- public <T> ServiceLocator<I, T> setReplyType(Class<T> replyType) {
- return new ServiceLocator<I, T>(requestType, replyType, serviceType, serviceGroupName, endpointName, availableInterceptors);
- }
-
- /**
- * Change the service type. The service type is a string that identifies the "kind" of service provided. All
- * services of a given type should accept the same request and reply types as well.
- * <p/>
- * The service type should be a dot-separated name (like an Internet host name).
- * <p/>
- * This method does not modify this object; instead, it returns a new modified instance.
- *
- * @param serviceType the new service type; may not be {@code null}
- *
- * @return an updated service locator
- */
- public ServiceLocator<I, O> setServiceType(String serviceType) {
- if (serviceType == null) {
- throw new NullPointerException("serviceType is null");
- }
- return new ServiceLocator<I, O>(requestType, replyType, serviceType, serviceGroupName, endpointName, availableInterceptors);
- }
-
- /**
- * Change the service group name. The service group name is a string that identifies a group of endpoints that are
- * all providing the same service, for load-balancing or clustering purposes.
- *
- * @param serviceGroupName
- *
- * @return an updated service locator
- */
- public ServiceLocator<I, O> setServiceGroupName(String serviceGroupName) {
- if (serviceGroupName == null) {
- throw new NullPointerException("serviceGroupName is null");
- }
- return new ServiceLocator<I, O>(requestType, replyType, serviceType, serviceGroupName, endpointName, availableInterceptors);
- }
-
- /**
- * Change the endpoint name.
- * <p/>
- * The endpoint name should be a dot-separated name (like an Internet host name). A {@code "*"}
- * character can be used as a wildcard to match any name. So, the name {@code "foo.*"} would match {@code
- * "foo.bar"} and {@code "foo.bar.two"} but not {@code "foobar"}.
- * <p/>
- * If no endpoint name is specified, then this value defaults to {@code "*"} (match all endpoints).
- * <p/>
- * This method does not modify this object; instead, it returns a new modified instance.
- *
- * @param endpointName the new endpoint name; may not be {@code null}
- *
- * @return an updated service locator
- */
- public ServiceLocator<I, O> setEndpointName(String endpointName) {
- if (endpointName == null) {
- throw new NullPointerException("endpointName is null");
- }
- return new ServiceLocator<I, O>(requestType, replyType, serviceType, serviceGroupName, endpointName, availableInterceptors);
- }
-
- /**
- * Change the set of locally available interceptors.
- *
- * @param availableInterceptors the set of interceptors
- *
- * @return an updated service locator
- */
- public ServiceLocator<I, O> setAvailableInterceptors(Set<String> availableInterceptors) {
- if (availableInterceptors == null) {
- throw new NullPointerException("availableInterceptors is null");
- }
- return new ServiceLocator<I, O>(requestType, replyType, serviceType, serviceGroupName, endpointName, availableInterceptors);
- }
-
-}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Session.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Session.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Session.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,6 +1,7 @@
package org.jboss.cx.remoting;
import java.util.concurrent.ConcurrentMap;
+import java.io.Closeable;
/**
* Represents a point-to-point relationship with another endpoint.
@@ -9,7 +10,7 @@
* <p/>
* A session may be shared safely among multiple threads.
*/
-public interface Session {
+public interface Session extends Closeable {
/**
* Close this session. Any associated connection(s) will be closed. Calling this method multiple times has no
* effect.
@@ -38,10 +39,9 @@
String getRemoteEndpointName();
/**
- * Establish an agreement to communicate with a service on the remote side.
+ * Get the root context for this session.
*
- * @param locator the locator for the service
- * @return a context source which may be used to create communication contexts
+ * @return the root context
*/
- <I, O> ContextSource<I, O> openService(ServiceLocator<I, O> locator) throws RemotingException;
+ <I, O> Context<I, O> getRootContext();
}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassReply.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassReply.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassReply.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -0,0 +1,23 @@
+package org.jboss.cx.remoting.service;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public final class ClassReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private byte[] classBytes;
+
+ public ClassReply() {
+ }
+
+ public byte[] getClassBytes() {
+ return classBytes;
+ }
+
+ public void setClassBytes(final byte[] classBytes) {
+ this.classBytes = classBytes;
+ }
+}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassRequest.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassRequest.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassRequest.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -0,0 +1,23 @@
+package org.jboss.cx.remoting.service;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public final class ClassRequest implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private String name;
+
+ public ClassRequest() {
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/SecurityService.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/SecurityService.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/SecurityService.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,34 +0,0 @@
-package org.jboss.cx.remoting.service;
-
-import org.jboss.cx.remoting.RemotingException;
-
-import javax.security.auth.callback.CallbackHandler;
-
-/**
- *
- */
-public interface SecurityService {
-
- /**
- * @param userName
- *
- * @throws RemotingException
- */
- void changeUser(String userName) throws RemotingException;
-
- /**
- * @param userName
- * @param password
- *
- * @throws RemotingException
- */
- void changeUser(String userName, char[] password) throws RemotingException;
-
- /**
- * @param clientCallbackHandler
- *
- * @throws RemotingException
- */
- void changeUser(CallbackHandler clientCallbackHandler) throws RemotingException;
-
-}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceReply.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceReply.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceReply.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -0,0 +1,34 @@
+package org.jboss.cx.remoting.service;
+
+import org.jboss.cx.remoting.ContextSource;
+import org.jboss.cx.remoting.Context;
+import java.io.Serializable;
+
+/**
+ *
+ */
+public final class ServiceReply<I, O> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private ContextSource<I, O> serviceContextSource;
+ private Context<ClassRequest, ClassReply> classLoadingContext;
+
+ public ServiceReply() {
+ }
+
+ public ContextSource<I, O> getServiceContextSource() {
+ return serviceContextSource;
+ }
+
+ public void setServiceContextSource(final ContextSource<I, O> serviceContextSource) {
+ this.serviceContextSource = serviceContextSource;
+ }
+
+ public Context<ClassRequest, ClassReply> getClassLoadingContext() {
+ return classLoadingContext;
+ }
+
+ public void setClassLoadingContext(final Context<ClassRequest, ClassReply> classLoadingContext) {
+ this.classLoadingContext = classLoadingContext;
+ }
+}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceRequest.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceRequest.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceRequest.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -0,0 +1,50 @@
+package org.jboss.cx.remoting.service;
+
+import java.net.URI;
+import java.io.Serializable;
+
+/**
+ *
+ */
+public final class ServiceRequest<I, O> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private URI uri;
+ private Class<I> requestType;
+ private Class<O> replyType;
+
+ public ServiceRequest() {
+ }
+
+ public static <I, O> ServiceRequest<I, O> create(Class<I> requestType, Class<O> replyType, URI uri) {
+ ServiceRequest<I, O> serviceRequest = new ServiceRequest<I, O>();
+ serviceRequest.setRequestType(requestType);
+ serviceRequest.setReplyType(replyType);
+ serviceRequest.setUri(uri);
+ return serviceRequest;
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public void setUri(final URI uri) {
+ this.uri = uri;
+ }
+
+ public Class<I> getRequestType() {
+ return requestType;
+ }
+
+ public void setRequestType(final Class<I> requestType) {
+ this.requestType = requestType;
+ }
+
+ public Class<O> getReplyType() {
+ return replyType;
+ }
+
+ public void setReplyType(final Class<O> replyType) {
+ this.replyType = replyType;
+ }
+}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/TxnService.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/TxnService.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/TxnService.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,40 +0,0 @@
-package org.jboss.cx.remoting.service;
-
-import org.jboss.cx.remoting.RemotingException;
-
-import javax.transaction.xa.XAResource;
-
-/**
- *
- */
-public interface TxnService {
- /**
- * Begin a transaction on the remote side.
- *
- * @throws org.jboss.cx.remoting.RemotingException if the transaction could not be started
- */
- void begin() throws RemotingException;
-
- /**
- * Commit the current transaction on the remote side.
- *
- * @throws RemotingException if the transaction could not be committed.
- */
- void commit() throws RemotingException;
-
- /**
- * Roll back the current transaction on the remote side.
- *
- * @throws RemotingException if the transaction could not be rolled back
- */
- void rollback() throws RemotingException;
-
- /**
- * Get an XA resource to control transactions on the remote side for this context.
- *
- * @return the XA resource
- *
- * @throws RemotingException if the XA resource could not be acquired
- */
- XAResource getXAResource() throws RemotingException;
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractClientInterceptor.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractClientInterceptor.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractClientInterceptor.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,14 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-/**
- * A simple base implementation of {@code ContextServiceInterceptor}. Use this class as a base for simple
- * implementations of that interface.
- */
-public abstract class AbstractClientInterceptor<T> extends AbstractInterceptor implements ClientInterceptor<T> {
- protected AbstractClientInterceptor() {
- }
-
- public T getContextService(InterceptorContext context) {
- return null;
- }
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractInterceptor.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractInterceptor.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractInterceptor.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,34 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.RemoteExecutionException;
-
-/**
- *
- */
-public abstract class AbstractInterceptor implements Interceptor {
- protected AbstractInterceptor() {
- }
-
- public void processRequest(final InterceptorContext context, final Object request) {
- context.nextRequest(request);
- }
-
- public void processReply(final InterceptorContext context, final Object reply) {
- context.nextReply(reply);
- }
-
- public void processCancelRequest(final InterceptorContext context, final boolean mayInterrupt) {
- context.nextCancelRequest(mayInterrupt);
- }
-
- public void processCancelAcknowledge(final InterceptorContext context) {
- context.nextCancelAcknowledge();
- }
-
- public void processException(final InterceptorContext context, final RemoteExecutionException exception) {
- context.nextException(exception);
- }
-
- public void close() {
- }
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractServerInterceptor.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractServerInterceptor.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractServerInterceptor.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,10 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-/**
- * A simple base implementation of {@code ContextServiceInterceptor}. Use this class as a base for simple
- * implementations of that interface.
- */
-public abstract class AbstractServerInterceptor extends AbstractInterceptor implements ServerInterceptor {
- protected AbstractServerInterceptor() {
- }
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptor.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptor.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptor.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,16 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-/**
- * An interceptor that provides an additional service to a {@code Context}. A context service interceptor is created
- * for every context service for each context.
- */
-public interface ClientInterceptor<T> extends Interceptor {
-
- /**
- * Get the context service object associated with this handler. This instance is the end-user's interface into this
- * service. If no interface is available for this context service, return {@code null}.
- *
- * @return the context service object
- */
- T getContextService(InterceptorContext context);
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptorFactory.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptorFactory.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptorFactory.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,11 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.Context;
-import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
-
-/**
- *
- */
-public interface ClientInterceptorFactory {
- ClientInterceptor createInstance(Context<?, ?> context, ContextIdentifier contextIdentifier);
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Discovery.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Discovery.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Discovery.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,18 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-/**
- *
- */
-public interface Discovery {
- /**
- * Signal that the discovered route has gone offline.
- */
- void remove();
-
- /**
- * Change the cost of this route.
- *
- * @param cost the new cost
- */
- void updateCost(int cost);
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/EndpointProvider.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/EndpointProvider.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/EndpointProvider.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,10 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.Endpoint;
-
-/**
- *
- */
-public interface EndpointProvider {
- Endpoint createEndpoint(String name);
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptor.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptor.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptor.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,52 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.RemoteExecutionException;
-
-/**
- *
- */
-public interface Interceptor {
- /**
- * Process a request.
- *
- * @param context the context service interceptor context
- * @param request the outbound request
- */
- void processRequest(InterceptorContext context, Object request);
-
- /**
- * Process a request reply.
- *
- * @param context the context service interceptor context
- * @param reply the inbound reply
- */
- void processReply(InterceptorContext context, Object reply);
-
- /**
- * Process a request exception.
- *
- * @param context the context service interceptor context
- * @param exception the inbound exception
- */
- void processException(InterceptorContext context, RemoteExecutionException exception);
-
- /**
- * Process a cancellation request.
- *
- * @param context the context service interceptor context
- * @param mayInterrupt {@code true} if the operation can be interrupted
- */
- void processCancelRequest(InterceptorContext context, boolean mayInterrupt);
-
- /**
- * Process a cancellation acknowledgement.
- *
- * @param context the context service interceptor context
- */
- void processCancelAcknowledge(InterceptorContext context);
-
- /**
- * Close this interceptor.
- */
- void close();
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorContext.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorContext.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,18 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.RemoteExecutionException;
-
-/**
- *
- */
-public interface InterceptorContext {
- void nextRequest(Object request);
-
- void nextReply(Object reply);
-
- void nextException(RemoteExecutionException exception);
-
- void nextCancelRequest(boolean mayInterrupt);
-
- void nextCancelAcknowledge();
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorSpec.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorSpec.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorSpec.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,78 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import java.io.Serializable;
-import java.util.Comparator;
-
-/**
- *
- */
-public final class InterceptorSpec implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final int slot;
- private final Class<?> interceptorClass;
- private final Type type;
-
- public enum Type {
- PRIVATE,
- OPTIONAL,
- REQUIRED
- }
-
- public InterceptorSpec(final int slot, final Class<?> interceptorClass, final Type type) {
- if (type == null) throw new NullPointerException("'type' parameter is null");
- if (interceptorClass == null) throw new NullPointerException("'interceptorClass' parameter is null");
- if (slot < 0) throw new IllegalArgumentException("'slot' parameter must not be negative");
- this.slot = slot;
- this.interceptorClass = interceptorClass;
- this.type = type;
- }
-
- public int getSlot() {
- return slot;
- }
-
- public Class<?> getInterceptorClass() {
- return interceptorClass;
- }
-
- public Type getType() {
- return type;
- }
-
- private transient int hashCode;
- private transient boolean hashCodeDone;
-
- public int hashCode() {
- if (! hashCodeDone) {
- hashCode = slot ^ 37 * interceptorClass.hashCode() ^ 97 * type.hashCode();
- hashCodeDone = true;
- }
- return hashCode;
- }
-
- public boolean equals(Object obj) {
- if (! (obj instanceof InterceptorSpec)) {
- return false;
- }
- InterceptorSpec other = (InterceptorSpec) obj;
- return other.slot == slot && other.interceptorClass.equals(interceptorClass) && other.type == type;
- }
-
- public static final class ComparatorImpl implements Comparator<InterceptorSpec>, Serializable {
- private static final long serialVersionUID = 1L;
-
- private ComparatorImpl() {
- }
-
- public int compare(final InterceptorSpec o1, final InterceptorSpec o2) {
- return o2.slot - o1.slot;
- }
- }
-
- private static final Comparator<InterceptorSpec> comparator = new ComparatorImpl();
-
- public static Comparator<InterceptorSpec> getComparator() {
- return comparator;
- }
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ListenerFactoryContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ListenerFactoryContext.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ListenerFactoryContext.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,31 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.RemotingException;
-
-/**
- *
- */
-public interface ListenerFactoryContext {
- /**
- * Add an interceptor that the remote side is required to acknowledge.
- *
- * @param name
- */
- <T> T requireInterceptor(String name, Class<T> interceptorType) throws RemotingException;
-
- /**
- * Add an interceptor if the remote side agrees.
- *
- * @param name
- */
- <T> T offerInterceptor(String name, Class<T> interceptorType) throws RemotingException;
-
- /**
- * Add an interceptor to the local side. The remote side is not notified.
- *
- * @param name
- */
- <T> T addPrivateInterceptor(String name, Class<T> interceptorType) throws RemotingException;
-
-
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptor.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptor.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptor.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,8 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-/**
- * An interceptor that provides an additional service to a {@code Context}. A context service interceptor is created
- * for every context service for each context.
- */
-public interface ServerInterceptor extends Interceptor {
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptorFactory.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptorFactory.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptorFactory.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,10 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
-
-/**
- *
- */
-public interface ServerInterceptorFactory {
- ServerInterceptor createInstance(ContextIdentifier contextIdentifier);
-}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolContext.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolContext.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -3,7 +3,6 @@
import java.io.IOException;
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.util.ByteInput;
import org.jboss.cx.remoting.util.ByteOutput;
import org.jboss.cx.remoting.util.MessageInput;
@@ -16,8 +15,6 @@
/* CLIENT methods */
- void receiveServiceActivate(ServiceIdentifier serviceIdentifier);
-
void receiveReply(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier, Object reply);
void receiveException(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier, RemoteExecutionException exception);
@@ -30,8 +27,6 @@
void closeContext(ContextIdentifier remoteContextIdentifier);
- void receiveServiceRequest(ServiceIdentifier remoteServiceIdentifier, ServiceLocator<?, ?> locator);
-
void closeService(ServiceIdentifier remoteServiceIdentifier);
void receiveOpenedContext(ServiceIdentifier remoteServiceIdentifier, ContextIdentifier remoteContextIdentifier);
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandler.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandler.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,11 +1,8 @@
package org.jboss.cx.remoting.spi.protocol;
import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.util.MessageOutput;
/**
@@ -33,15 +30,6 @@
/* SERVER methods */
/**
- * Send a service activation response to the remote side. The service identifier will have been produced by
- * the protocol handler on the remote side.
- *
- * @param remoteServiceIdentifier the remote service identifier
- * @throws IOException if an I/O error occurs
- */
- void sendServiceActivate(ServiceIdentifier remoteServiceIdentifier) throws IOException;
-
- /**
* Send the reply to a request.
*
* @param remoteContextIdentifier the context that the request was received under
@@ -82,9 +70,27 @@
/* CLIENT methods */
/**
+ * Get the identifier for the root context for this session. The root context lives as long as the session is up.
+ * This identifier is used to invoke the root context listener from the local side to the remote side.
+ *
+ * @return the identifier for the root context
+ * @throws IOException if an I/O error occurs
+ */
+ ContextIdentifier getLocalRootContextIdentifier();
+
+ /**
+ * Get the identifier for the root context for this session. The root context lives as long as the session is up.
+ * This identifier is used to invoke the root context listener from the remote side to the local side.
+ *
+ * @return the identifier for the root context
+ * @throws IOException if an I/O error occurs
+ */
+ ContextIdentifier getRemoteRootContextIdentifier();
+
+ /**
* Get a new context identifier that will be used to send requests to the remote side. The service identifier
- * was previously acquired from the {@link #openService()} method. Should send a message to the remote side such
- * that the {@link ProtocolContext#receiveOpenedContext(ServiceIdentifier, ContextIdentifier)} method is called with
+ * was received from the remote side. Should send a message to the remote side such that the
+ * {@link ProtocolContext#receiveOpenedContext(ServiceIdentifier, ContextIdentifier)} method is called with
* the new service and context identifiers.
*
* @param serviceIdentifier the service identifier
@@ -113,7 +119,7 @@
RequestIdentifier openRequest(ContextIdentifier contextIdentifier) throws IOException;
/**
- * Get a new service identifier that will be used to request a service from the remote side.
+ * Get a new service identifier that may be transmitted to the remote side.
*
* @return the new service identifier
* @throws IOException if an I/O error occurs
@@ -121,16 +127,6 @@
ServiceIdentifier openService() throws IOException;
/**
- * Send a service activation request to the remote side. The service identifier will have been obtained from
- * the {@link #openService()} method on this {@code ProtocolHandler}.
- *
- * @param serviceIdentifier the service identifier
- * @param locator the locator for the new service
- * @throws IOException if an I/O error occurs
- */
- void sendServiceRequest(ServiceIdentifier serviceIdentifier, ServiceLocator<?, ?> locator) throws IOException;
-
- /**
* Send a notification that the client is no longer using the given service.
*
* @param serviceIdentifier the service identifier
@@ -162,6 +158,14 @@
/* SESSION methods */
/**
+ * Open a serviceless context. The context identifier may be transmitted to the remote side.
+ *
+ * @return a context identifier
+ * @throws IOException if an I/O error occurs
+ */
+ ContextIdentifier openContext() throws IOException;
+
+ /**
* Open a stream on this session. Since either side may open a stream, it is important that the client and
* server side take precautions to ensure that both the client and server will not initiate the same stream at
* the same time.
@@ -182,27 +186,9 @@
void closeStream(StreamIdentifier streamIdentifier) throws IOException;
/**
- * Read a stream identifier from a message.
- *
- * @param input
- * @return the new stream identifier
- * @throws IOException if an I/O error occurs
- */
- StreamIdentifier readStreamIdentifier(ObjectInput input) throws IOException;
-
- /**
- * Write a stream identifier to an object output stream.
- *
- * @param output the output to write to
- * @param identifier the identifier to write
- * @throws IOException if an I/O error occurs
- */
- void writeStreamIdentifier(ObjectOutput output, StreamIdentifier identifier) throws IOException;
-
- /**
* Send data over a stream. Returns a message output buffer that the message is written into. When the message
- * is fully written, the {@link org.jboss.cx.remoting.util.MessageOutput#commit()} method will be called to perform the transmission. The
- * supplied executor should be passed in to
+ * is fully written, the {@link org.jboss.cx.remoting.util.MessageOutput#commit()} method will be called to perform
+ * the transmission. The supplied executor should be passed in to
* {@link org.jboss.cx.remoting.spi.protocol.ProtocolContext#getMessageOutput(org.jboss.cx.remoting.util.ByteOutput, java.util.concurrent.Executor)},
* if that method is used for serialization.
*
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandlerFactory.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandlerFactory.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandlerFactory.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -25,7 +25,7 @@
*
* @param context the protocol context to use for inbound data
* @param remoteUri the URI of the remote side
- * @param attributeMap
+ * @param attributeMap the attributes for the underlying protocol to apply
* @return the protocol handler for outbound data
*
* @throws IOException if the handler could not be created
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -3,14 +3,12 @@
import java.net.URI;
import java.util.concurrent.ConcurrentMap;
import org.jboss.cx.remoting.Endpoint;
-import org.jboss.cx.remoting.EndpointShutdownListener;
-import org.jboss.cx.remoting.InterceptorDeploymentSpec;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.ServiceDeploymentSpec;
import org.jboss.cx.remoting.Session;
+import org.jboss.cx.remoting.Context;
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.ContextSource;
import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.cx.remoting.spi.Discovery;
-import org.jboss.cx.remoting.spi.Registration;
import org.jboss.cx.remoting.spi.protocol.ProtocolRegistration;
import org.jboss.cx.remoting.spi.protocol.ProtocolRegistrationSpec;
@@ -28,39 +26,23 @@
return delegate.getAttributes();
}
- public void shutdown() {
- delegate.shutdown();
- }
-
- public Session openSession(final URI remoteUri, AttributeMap attributeMap) throws RemotingException {
+ public Session openSession(final URI remoteUri, final AttributeMap attributeMap) throws RemotingException {
return delegate.openSession(remoteUri, attributeMap);
}
- public Discovery discover(final String endpointName, final URI nextHop, final int cost) throws RemotingException {
- return delegate.discover(endpointName, nextHop, cost);
- }
-
- public Registration deployInterceptorType(final InterceptorDeploymentSpec spec) throws RemotingException {
- return delegate.deployInterceptorType(spec);
- }
-
public String getName() {
return delegate.getName();
}
- public <I, O> Registration deployService(ServiceDeploymentSpec<I, O> spec) throws RemotingException, IllegalArgumentException {
- return delegate.deployService(spec);
- }
-
public ProtocolRegistration registerProtocol(final ProtocolRegistrationSpec spec) throws RemotingException, IllegalArgumentException {
return delegate.registerProtocol(spec);
}
- public void addShutdownListener(final EndpointShutdownListener listener) {
- delegate.addShutdownListener(listener);
+ public <I, O> Context<I, O> createContext(final RequestListener<I, O> requestListener) {
+ return delegate.createContext(requestListener);
}
- public void removeShutdownListener(final EndpointShutdownListener listener) {
- delegate.removeShutdownListener(listener);
+ public <I, O> ContextSource<I, O> createService(final RequestListener<I, O> requestListener) {
+ return delegate.createService(requestListener);
}
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/RequestContextWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/RequestContextWrapper.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/RequestContextWrapper.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -34,4 +34,7 @@
delegate.addCancelHandler(requestCancelHandler);
}
+ public void execute(final Runnable command) {
+ delegate.execute(command);
+ }
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/SessionWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/SessionWrapper.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/SessionWrapper.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,10 +1,9 @@
package org.jboss.cx.remoting.spi.wrapper;
import java.util.concurrent.ConcurrentMap;
-import org.jboss.cx.remoting.ContextSource;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.Session;
+import org.jboss.cx.remoting.Context;
/**
*
@@ -32,7 +31,7 @@
return delegate.getRemoteEndpointName();
}
- public <I, O> ContextSource<I, O> openService(final ServiceLocator<I, O> locator) throws RemotingException {
- return delegate.openService(locator);
+ public <I, O> Context<I, O> getRootContext() {
+ return delegate.getRootContext();
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -2,30 +2,23 @@
import java.io.IOException;
import java.net.URI;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.jboss.cx.remoting.Endpoint;
import org.jboss.cx.remoting.EndpointShutdownListener;
-import org.jboss.cx.remoting.InterceptorDeploymentSpec;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.ServiceDeploymentSpec;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.Session;
+import org.jboss.cx.remoting.Context;
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.ContextSource;
import org.jboss.cx.remoting.util.CollectionUtil;
import org.jboss.cx.remoting.version.Version;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.cx.remoting.spi.Discovery;
-import org.jboss.cx.remoting.spi.Registration;
import org.jboss.cx.remoting.spi.protocol.ProtocolContext;
import org.jboss.cx.remoting.spi.protocol.ProtocolHandler;
import org.jboss.cx.remoting.spi.protocol.ProtocolHandlerFactory;
@@ -48,7 +41,8 @@
private final Endpoint userEndpoint = new UserEndpoint();
private final OrderedExecutorFactory orderedExecutorFactory;
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.UP);
- private final ExecutorService executor;
+ private final Executor executor;
+ private final RequestListener<?, ?> rootRequestListener;
static {
Logger.getLogger("org.jboss.cx.remoting").info("JBoss Remoting version %s", Version.VERSION);
@@ -59,16 +53,16 @@
DOWN,
}
- protected CoreEndpoint(final String name) {
+ protected CoreEndpoint(final String name, final RequestListener<?, ?> rootRequestListener) {
this.name = name;
// todo - make this configurable
executor = Executors.newCachedThreadPool();
orderedExecutorFactory = new OrderedExecutorFactory(executor);
+ this.rootRequestListener = rootRequestListener;
}
private final ConcurrentMap<Object, Object> endpointMap = CollectionUtil.concurrentMap();
private final ConcurrentMap<String, CoreProtocolRegistration> protocolMap = CollectionUtil.concurrentMap();
- private final ConcurrentMap<ServiceKey, CoreDeployedService<?, ?>> services = CollectionUtil.concurrentMap();
private final Set<CoreSession> sessions = CollectionUtil.synchronizedSet(CollectionUtil.<CoreSession>hashSet());
// accesses protected by {@code shutdownListeners} - always lock AFTER {@code state}
private final List<EndpointShutdownListener> shutdownListeners = CollectionUtil.arrayList();
@@ -94,89 +88,10 @@
sessions.notifyAll();
}
- @SuppressWarnings ({"unchecked"})
- <I, O> CoreDeployedService<I, O> locateDeployedService(ServiceLocator<I, O> locator) {
- state.requireHold(State.UP);
- try {
- final String name = locator.getServiceGroupName();
- final String type = locator.getServiceType();
- // first try the quick (exact) lookup
- if (name.indexOf('*') == -1) {
- final CoreDeployedService<I, O> service = (CoreDeployedService<I, O>) services.get(new ServiceKey(name, type));
- if (service != null) {
- return service;
- } else {
- return null;
- }
- }
- final Pattern pattern = createWildcardPattern(name);
- for (Map.Entry<ServiceKey,CoreDeployedService<?,?>> entry : services.entrySet()) {
- final CoreEndpoint.ServiceKey key = entry.getKey();
- final String entryName = key.getName();
- final String entryType = key.getType();
- if (entryType.equals(type) && pattern.matcher(entryName).matches()) {
- return (CoreDeployedService<I, O>) entry.getValue();
- }
- }
- return null;
- } finally {
- state.release();
- }
+ RequestListener<?, ?> getRootRequestListener() {
+ return rootRequestListener;
}
- private static final Pattern wildcardPattern = Pattern.compile("^([^*]+|\\*)+$");
-
- private static Pattern createWildcardPattern(final String string) {
- final Matcher matcher = wildcardPattern.matcher(string);
- final StringBuilder target = new StringBuilder(string.length() * 2);
- while (matcher.find()) {
- final String val = matcher.group(1);
- if ("*".equals(val)) {
- target.append(".*");
- } else {
- target.append(Pattern.quote(val));
- }
- }
- return Pattern.compile(target.toString());
- }
-
- private final class ServiceKey {
- private final String name;
- private final String type;
-
- private ServiceKey(final String name, final String type) {
- this.name = name;
- this.type = type;
- }
-
- private String getName() {
- return name;
- }
-
- private String getType() {
- return type;
- }
-
- public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- final ServiceKey that = (ServiceKey) o;
-
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
- if (type != null ? !type.equals(that.type) : that.type != null) return false;
-
- return true;
- }
-
- public int hashCode() {
- int result;
- result = (name != null ? name.hashCode() : 0);
- result = 31 * result + (type != null ? type.hashCode() : 0);
- return result;
- }
- }
-
public final class CoreProtocolServerContext implements ProtocolServerContext {
private CoreProtocolServerContext() {
}
@@ -184,8 +99,6 @@
public ProtocolContext establishSession(ProtocolHandler handler) {
final CoreSession session = new CoreSession(CoreEndpoint.this);
session.initializeServer(handler);
-
- //, handler);
return session.getProtocolContext();
}
}
@@ -262,38 +175,8 @@
coreSession.shutdown();
}
sessions.clear();
- executor.shutdown();
}
- public void addShutdownListener(EndpointShutdownListener listener) {
- final State currentState = state.getStateHold();
- try {
- switch (currentState) {
- case UP:
- synchronized(shutdownListeners) {
- shutdownListeners.add(listener);
- return;
- }
- default:
- // must be shut down!
- listener.handleShutdown(this);
- }
- } finally {
- state.release();
- }
- }
-
- public void removeShutdownListener(EndpointShutdownListener listener) {
- synchronized(shutdownListeners) {
- final Iterator<EndpointShutdownListener> i = shutdownListeners.iterator();
- while (i.hasNext()) {
- if (i.next() == listener) {
- i.remove();
- }
- }
- }
- }
-
public Session openSession(final URI uri, final AttributeMap attributeMap) throws RemotingException {
final String scheme = uri.getScheme();
if (scheme == null) {
@@ -325,29 +208,6 @@
return name;
}
- public <I, O> Registration deployService(final ServiceDeploymentSpec<I, O> spec) throws RemotingException {
- if (spec.getServiceName() == null) {
- throw new NullPointerException("spec.getServiceName() is null");
- }
- if (spec.getServiceType() == null) {
- throw new NullPointerException("spec.getServiceType() is null");
- }
- if (spec.getRequestListener() == null) {
- throw new NullPointerException("spec.getRequestListener() is null");
- }
- state.requireHold(State.UP);
- try {
- final CoreDeployedService<I, O> service = new CoreDeployedService<I, O>(spec.getServiceName(), spec.getServiceType(), spec.getRequestListener());
- if (services.putIfAbsent(new ServiceKey(spec.getServiceName(), spec.getServiceType()), service) != null) {
- throw new RemotingException("A service with the same name is already deployed");
- }
- // todo - return a registration instance
- return null;
- } finally {
- state.release();
- }
- }
-
public ProtocolRegistration registerProtocol(ProtocolRegistrationSpec spec) throws RemotingException, IllegalArgumentException {
if (spec.getScheme() == null) {
throw new NullPointerException("spec.getScheme() is null");
@@ -365,14 +225,13 @@
}
}
- public Registration deployInterceptorType(final InterceptorDeploymentSpec spec) throws RemotingException {
- // todo - interceptors
+ public <I, O> Context<I, O> createContext(RequestListener<I, O> requestListener) {
return null;
}
- public Discovery discover(String endpointName, URI nextHop, int cost) throws RemotingException {
- // todo - implement
+ public <I, O> ContextSource<I, O> createService(RequestListener<I, O> requestListener) {
return null;
}
+
}
}
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpointProvider.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpointProvider.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpointProvider.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,30 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.util.HashSet;
-import java.util.Set;
-import org.jboss.cx.remoting.Endpoint;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.spi.EndpointProvider;
-
-/**
- *
- */
-public final class CoreEndpointProvider implements EndpointProvider {
- private final LocalProtocol localProtocol = new LocalProtocol();
- private final Set<String> endpointNames = CollectionUtil.synchronizedSet(new HashSet<String>());
-
- public Endpoint createEndpoint(String name) {
- // todo - need a way to signal the removal of an endpoint
- if (! endpointNames.add(name)) {
- throw new IllegalArgumentException("Failed to create endpoint (endpoint with the same name already exists");
- }
- final Endpoint userEndpoint = new CoreEndpoint(name).getUserEndpoint();
- try {
- localProtocol.addToEndpoint(userEndpoint);
- } catch (RemotingException e) {
- throw new IllegalStateException("Cannot create endpoint", e);
- }
- return userEndpoint;
- }
-}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,12 +1,10 @@
package org.jboss.cx.remoting.core;
-import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.jboss.cx.remoting.RemoteExecutionException;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.spi.ServerInterceptorFactory;
import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
@@ -21,7 +19,7 @@
private final ConcurrentMap<RequestIdentifier,CoreInboundRequest<I, O>> requests = CollectionUtil.concurrentMap();
- public CoreInboundContext(final ContextIdentifier contextIdentifier, final CoreSession coreSession, final RequestListener<I, O> requestListener, final List<ServerInterceptorFactory> factoryList) {
+ public CoreInboundContext(final ContextIdentifier contextIdentifier, final CoreSession coreSession, final RequestListener<I, O> requestListener) {
this.contextIdentifier = contextIdentifier;
this.coreSession = coreSession;
this.requestListener = requestListener;
@@ -64,7 +62,7 @@
// Request mgmt
CoreInboundRequest<I, O> createInboundRequest(final RequestIdentifier requestIdentifier, final I request) {
- return new CoreInboundRequest<I, O>(requestIdentifier, request, this, requestListener);
+ return new CoreInboundRequest<I, O>(requestIdentifier, this, requestListener, coreSession.getExecutor());
}
CoreInboundRequest<I, O> getInboundRequest(RequestIdentifier requestIdentifier) {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -8,6 +8,12 @@
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
+import java.util.concurrent.Executor;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Iterator;
+import java.util.LinkedList;
/**
*
@@ -16,18 +22,23 @@
private static final Logger log = Logger.getLogger(CoreInboundRequest.class);
private final RequestIdentifier requestIdentifier;
- private final I request;
private final CoreInboundContext<I, O> context;
private final RequestListener<I,O> requestListener;
+ private final Executor executor;
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
private final UserRequestContext userRequestContext = new UserRequestContext();
- public CoreInboundRequest(final RequestIdentifier requestIdentifier, final I request, final CoreInboundContext<I, O> context, final RequestListener<I, O> requestListener) {
+ private boolean mayInterrupt;
+ private boolean cancel;
+ private Set<Thread> tasks;
+ private List<RequestCancelHandler<O>> cancelHandlers;
+
+ public CoreInboundRequest(final RequestIdentifier requestIdentifier, final CoreInboundContext<I, O> context, final RequestListener<I, O> requestListener, final Executor executor) {
this.requestIdentifier = requestIdentifier;
- this.request = request;
this.context = context;
this.requestListener = requestListener;
+ this.executor = executor;
}
private enum State {
@@ -61,6 +72,29 @@
}
public void receiveCancelRequest(final boolean mayInterrupt) {
+ synchronized(this) {
+ if (! cancel) {
+ cancel = true;
+ this.mayInterrupt = mayInterrupt;
+ if (mayInterrupt) {
+ for (Thread t : tasks) {
+ t.interrupt();
+ }
+ }
+ if (cancelHandlers != null) {
+ final Iterator<RequestCancelHandler<O>> i = cancelHandlers.iterator();
+ while (i.hasNext()) {
+ final RequestCancelHandler<O> handler = i.next();
+ i.remove();
+ executor.execute(new Runnable() {
+ public void run() {
+ handler.notifyCancel(userRequestContext, mayInterrupt);
+ }
+ });
+ }
+ }
+ }
+ }
}
public final class UserRequestContext implements RequestContext<O> {
@@ -88,7 +122,40 @@
}
public void addCancelHandler(final RequestCancelHandler<O> requestCancelHandler) {
- // todo - should be a list
+ final boolean mayInterrupt;
+ synchronized(CoreInboundRequest.this) {
+ if (!cancel) {
+ if (cancelHandlers == null) {
+ cancelHandlers = new LinkedList<RequestCancelHandler<O>>();
+ }
+ cancelHandlers.add(requestCancelHandler);
+ return;
+ }
+ // otherwise, unlock and notify now
+ mayInterrupt = CoreInboundRequest.this.mayInterrupt;
+ }
+ requestCancelHandler.notifyCancel(this, mayInterrupt);
}
+
+ public void execute(final Runnable command) {
+ executor.execute(new Runnable() {
+ public void run() {
+ final Thread thread = Thread.currentThread();
+ synchronized(CoreInboundRequest.this) {
+ if (tasks == null) {
+ tasks = new HashSet<Thread>();
+ }
+ tasks.add(thread);
+ }
+ try {
+ command.run();
+ } finally {
+ synchronized(CoreInboundRequest.this) {
+ tasks.remove(thread);
+ }
+ }
+ }
+ });
+ }
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -2,7 +2,6 @@
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.RequestListener;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
import org.jboss.cx.remoting.spi.protocol.ServiceIdentifier;
@@ -17,14 +16,13 @@
private final ServiceIdentifier serviceIdentifier;
private final RequestListener<I, O> requestListener;
- public CoreInboundService(final CoreEndpoint coreEndpoint, final CoreSession coreSession, final ServiceIdentifier serviceIdentifier, final ServiceLocator<I, O> locator) throws RemotingException {
+ public CoreInboundService(final CoreSession coreSession, final ServiceIdentifier serviceIdentifier, final RequestListener<I, O> requestListener) throws RemotingException {
this.coreSession = coreSession;
this.serviceIdentifier = serviceIdentifier;
- final CoreDeployedService<I, O> service = coreEndpoint.locateDeployedService(locator);
- requestListener = service.getRequestListener();
+ this.requestListener = requestListener;
}
- public void receivedOpenedContext(final ContextIdentifier remoteContextIdentifier) {
+ void receivedOpenedContext(final ContextIdentifier remoteContextIdentifier) {
coreSession.createServerContext(serviceIdentifier, remoteContextIdentifier, requestListener);
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,14 +1,10 @@
package org.jboss.cx.remoting.core;
-import java.util.List;
import org.jboss.cx.remoting.Context;
import org.jboss.cx.remoting.ContextSource;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.util.AtomicStateMachine;
-import org.jboss.cx.remoting.util.CollectionUtil;
import org.jboss.cx.remoting.log.Logger;
-import org.jboss.cx.remoting.spi.ClientInterceptorFactory;
import org.jboss.cx.remoting.spi.protocol.ServiceIdentifier;
/**
@@ -21,8 +17,6 @@
private CoreSession coreSession;
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.WAITING_FOR_REPLY);
private final ContextSource<I, O> userContextSource = new UserContextSource();
- private final List<ClientInterceptorFactory> interceptorFactories = CollectionUtil.arrayList();
- private final ServiceLocator<I,O> locator;
private enum State {
WAITING_FOR_REPLY,
@@ -31,10 +25,9 @@
DOWN
}
- protected CoreOutboundService(final CoreSession coreSession, final ServiceIdentifier serviceIdentifier, final ServiceLocator<I, O> locator) {
+ protected CoreOutboundService(final CoreSession coreSession, final ServiceIdentifier serviceIdentifier) {
this.coreSession = coreSession;
this.serviceIdentifier = serviceIdentifier;
- this.locator = locator;
}
// State mgmt
@@ -48,7 +41,6 @@
// Outbound protocol messages
void sendServiceRequest() throws RemotingException {
- coreSession.sendServiceRequest(serviceIdentifier, locator);
}
// Inbound protocol messages
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -14,12 +14,11 @@
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.ContextSource;
import org.jboss.cx.remoting.RemoteExecutionException;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.RequestListener;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.Session;
+import org.jboss.cx.remoting.Context;
import org.jboss.cx.remoting.core.stream.DefaultStreamDetector;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.util.AttributeMap;
@@ -39,6 +38,7 @@
import org.jboss.cx.remoting.spi.stream.StreamDetector;
import org.jboss.cx.remoting.spi.stream.StreamSerializer;
import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
+import org.jboss.cx.remoting.spi.wrapper.ContextWrapper;
/**
@@ -75,6 +75,8 @@
private ProtocolHandler protocolHandler;
/** The remote endpoint name. Set on CONNECTING -> UP */
private String remoteEndpointName;
+ /** The root client context. Set on CONNECTING -> UP */
+ private Context<?, ?> rootContext;
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.NEW);
@@ -91,15 +93,25 @@
// Initializers
+ @SuppressWarnings ({"unchecked"})
void initializeServer(final ProtocolHandler protocolHandler) {
if (protocolHandler == null) {
throw new NullPointerException("protocolHandler is null");
}
state.requireTransitionExclusive(State.NEW, State.CONNECTING);
- this.protocolHandler = protocolHandler;
- state.releaseExclusive();
+ try {
+ this.protocolHandler = protocolHandler;
+ final RequestListener<?, ?> listener = endpoint.getRootRequestListener();
+ if (listener != null) {
+ final ContextIdentifier contextIdentifier = protocolHandler.getRemoteRootContextIdentifier();
+ serverContexts.put(contextIdentifier, new CoreInboundContext(contextIdentifier, this, listener));
+ }
+ } finally {
+ state.releaseExclusive();
+ }
}
+ @SuppressWarnings ({"unchecked"})
void initializeClient(final ProtocolHandlerFactory protocolHandlerFactory, final URI remoteUri, final AttributeMap attributeMap) throws IOException {
if (protocolHandlerFactory == null) {
throw new NullPointerException("protocolHandlerFactory is null");
@@ -107,6 +119,11 @@
state.requireTransitionExclusive(State.NEW, State.CONNECTING);
try {
protocolHandler = protocolHandlerFactory.createHandler(protocolContext, remoteUri, attributeMap);
+ final RequestListener<?, ?> listener = endpoint.getRootRequestListener();
+ if (listener != null) {
+ final ContextIdentifier contextIdentifier = protocolHandler.getRemoteRootContextIdentifier();
+ serverContexts.put(contextIdentifier, new CoreInboundContext(contextIdentifier, this, listener));
+ }
} finally {
state.releaseExclusive();
}
@@ -132,22 +149,6 @@
return true;
}
- void sendServiceRequest(final ServiceIdentifier serviceIdentifier, final ServiceLocator<?,?> locator) throws RemotingException {
- try {
- protocolHandler.sendServiceRequest(serviceIdentifier, locator);
- } catch (IOException e) {
- throw new RemotingException("Failed to send a service request: " + e);
- }
- }
-
- void sendServiceActivate(final ServiceIdentifier serviceIdentifier) throws RemotingException {
- try {
- protocolHandler.sendServiceActivate(serviceIdentifier);
- } catch (IOException e) {
- throw new RemotingException("Failed to send a service activate: " + e);
- }
- }
-
void sendReply(final ContextIdentifier contextIdentifier, final RequestIdentifier requestIdentifier, final Object reply) throws RemotingException {
try {
protocolHandler.sendReply(contextIdentifier, requestIdentifier, reply);
@@ -210,6 +211,10 @@
return protocolHandler;
}
+ Executor getExecutor() {
+ return endpoint.getExecutor();
+ }
+
// Thread-local instance
private static final ThreadLocal<CoreSession> instance = new ThreadLocal<CoreSession>();
@@ -285,7 +290,7 @@
}
state.requireHold(State.UP);
try {
- final CoreInboundContext<I, O> context = new CoreInboundContext<I, O>(remoteContextIdentifier, this, requestListener, null);
+ final CoreInboundContext<I, O> context = new CoreInboundContext<I, O>(remoteContextIdentifier, this, requestListener);
log.trace("Adding new server (inbound) context, ID = %s", remoteContextIdentifier);
serverContexts.put(remoteContextIdentifier, context);
return context;
@@ -326,69 +331,6 @@
// Service mgmt
- <I, O> CoreOutboundService<I, O> createService(final ServiceLocator<I, O> locator) throws RemotingException {
- if (locator == null) {
- throw new NullPointerException("locator is null");
- }
- state.requireHold(State.UP);
- try {
- final ServiceIdentifier serviceIdentifier;
- try {
- serviceIdentifier = protocolHandler.openService();
- } catch (IOException e) {
- throw new RemotingException("Failed to open service: " + e.toString());
- }
- final CoreOutboundService<I, O> service = new CoreOutboundService<I, O>(this, serviceIdentifier, locator);
- log.trace("Adding new client service, ID = %s", serviceIdentifier);
- services.put(serviceIdentifier, new WeakReference<CoreOutboundService>(service));
- return service;
- } finally {
- state.release();
- }
- }
-
- <I, O> CoreInboundService<I, O> createServerService(final ServiceIdentifier serviceIdentifier, final ServiceLocator<I, O> locator) {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- if (locator == null) {
- throw new NullPointerException("locator is null");
- }
- state.requireHold(State.UP);
- try {
- final CoreInboundService<I, O> service;
- try {
- service = new CoreInboundService<I, O>(endpoint, this, serviceIdentifier, locator);
- } catch (RemotingException e) {
- try {
- sendServiceTerminate(serviceIdentifier);
- } catch (RemotingException e1) {
- log.trace("Failed to notify client of service termination: %s", e);
- }
- return null;
- }
- try {
- sendServiceActivate(serviceIdentifier);
- } catch (RemotingException e) {
- log.trace("Failed to notify client of service activation: %s", e);
- return null;
- }
- log.trace("Adding new server service, ID = %s", serviceIdentifier);
- serverServices.put(serviceIdentifier, service);
- return service;
- } finally {
- state.release();
- }
- }
-
- private void sendServiceTerminate(final ServiceIdentifier serviceIdentifier) throws RemotingException {
- try {
- protocolHandler.sendServiceTerminate(serviceIdentifier);
- } catch (IOException e) {
- throw new RemotingException("Failed to send service terminate: " + e.toString());
- }
- }
-
CoreOutboundService getService(final ServiceIdentifier serviceIdentifier) {
if (serviceIdentifier == null) {
throw new NullPointerException("serviceIdentifier is null");
@@ -445,23 +387,9 @@
return remoteEndpointName;
}
- public <I, O> ContextSource<I, O> openService(ServiceLocator<I, O> locator) throws RemotingException {
- if (locator == null) {
- throw new NullPointerException("locator is null");
- }
- if (locator.getServiceType() == null) {
- throw new NullPointerException("locator.getServiceType() is null");
- }
- state.waitForNotHold(State.CONNECTING);
- try {
- state.require(State.UP);
- final CoreOutboundService<I, O> service = createService(locator);
- service.sendServiceRequest();
- service.await();
- return service.getUserContextSource();
- } finally {
- state.release();
- }
+ @SuppressWarnings ({"unchecked"})
+ public <I, O> Context<I, O> getRootContext() {
+ return (Context<I, O>) rootContext;
}
}
@@ -511,20 +439,6 @@
}
}
- @SuppressWarnings({"unchecked"})
- public void receiveServiceRequest(ServiceIdentifier serviceIdentifier, ServiceLocator<?, ?> locator) {
- createServerService(serviceIdentifier, locator);
- }
-
- public void receiveServiceActivate(ServiceIdentifier serviceIdentifier) {
- final CoreOutboundService service = getService(serviceIdentifier);
- if (service != null) {
- service.receiveServiceActivate();
- } else {
- log.trace("Got service activate for an unknown service (%s)", serviceIdentifier);
- }
- }
-
public void receiveServiceTerminate(ServiceIdentifier serviceIdentifier) {
final CoreOutboundService service = getService(serviceIdentifier);
if (service != null) {
@@ -572,11 +486,20 @@
coreStream.receiveStreamData(data);
}
+ @SuppressWarnings ({"unchecked"})
public void openSession(String remoteEndpointName) {
state.waitForNotExclusive(State.NEW);
try {
state.requireTransition(State.CONNECTING, State.UP);
CoreSession.this.remoteEndpointName = remoteEndpointName;
+ final ContextIdentifier rootContextIdentifier = protocolHandler.getLocalRootContextIdentifier();
+ final CoreOutboundContext outboundContext = new CoreOutboundContext(CoreSession.this, rootContextIdentifier);
+ rootContext = new ContextWrapper(outboundContext.getUserContext()) {
+ public void close() throws RemotingException {
+ throw new RemotingException("close() not allowed on root context");
+ }
+ };
+ contexts.put(rootContextIdentifier, new WeakReference<CoreOutboundContext>(outboundContext));
} finally {
state.releaseExclusive();
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalProtocol.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalProtocol.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalProtocol.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -9,7 +9,6 @@
import org.jboss.cx.remoting.Endpoint;
import org.jboss.cx.remoting.RemoteExecutionException;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.util.MessageOutput;
import org.jboss.cx.remoting.util.AttributeMap;
import org.jboss.cx.remoting.util.CollectionUtil;
@@ -121,32 +120,6 @@
log.trace("Closing stream for local protocol");
}
- public StreamIdentifier readStreamIdentifier(ObjectInput input) throws IOException {
- throw new UnsupportedOperationException("streams");
- }
-
- public void writeStreamIdentifier(ObjectOutput output, StreamIdentifier identifier) throws IOException {
- throw new UnsupportedOperationException("streams");
- }
-
- public StreamIdentifier readStreamIdentifier(MessageInput input) throws IOException {
- throw new UnsupportedOperationException("streams");
- }
-
- public void writeStreamIdentifier(MessageOutput output, StreamIdentifier identifier) throws IOException {
- throw new UnsupportedOperationException("streams");
- }
-
- public void sendServiceRequest(ServiceIdentifier serviceIdentifier, ServiceLocator<?, ?> locator) throws IOException {
- log.trace("Sending service request for local protocol");
- remoteContext.receiveServiceRequest(serviceIdentifier, locator);
- }
-
- public void sendServiceActivate(ServiceIdentifier serviceIdentifier) throws IOException {
- log.trace("Sending service activation for local protocol");
- remoteContext.receiveServiceActivate(serviceIdentifier);
- }
-
public void sendReply(ContextIdentifier remoteContextIdentifier, RequestIdentifier requestIdentifier, Object reply) throws IOException {
log.trace("Sending stream for local protocol");
remoteContext.receiveReply(remoteContextIdentifier, requestIdentifier, reply);
@@ -170,11 +143,23 @@
public void sendServiceTerminate(ServiceIdentifier remoteServiceIdentifier) throws IOException {
}
+ public ContextIdentifier getLocalRootContextIdentifier() {
+ return null;
+ }
+
+ public ContextIdentifier getRemoteRootContextIdentifier() {
+ return null;
+ }
+
public void sendCancelRequest(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier, boolean mayInterrupt) throws IOException {
log.trace("Sending cancel request for local protocol");
remoteContext.receiveCancelRequest(contextIdentifier, requestIdentifier, mayInterrupt);
}
+ public ContextIdentifier openContext() throws IOException {
+ return null;
+ }
+
public MessageOutput sendStreamData(StreamIdentifier streamIdentifier, final Executor streamExeceutor) throws IOException {
throw new UnsupportedOperationException("streams");
}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -0,0 +1,25 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.RequestContext;
+import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.util.ServiceURI;
+import org.jboss.cx.remoting.service.ServiceRequest;
+import org.jboss.cx.remoting.service.ServiceReply;
+import java.net.URI;
+
+/**
+ *
+ */
+public final class ServiceLocatorListener<I, O> implements RequestListener<ServiceRequest<I, O>, ServiceReply<I, O>> {
+
+
+ public void handleRequest(final RequestContext<ServiceReply<I, O>> requestContext, final ServiceRequest<I, O> request) throws RemoteExecutionException, InterruptedException {
+ final URI uri = request.getUri();
+ final ServiceURI serviceURI = new ServiceURI(uri);
+ final String endpointName = serviceURI.getEndpointName();
+ final String groupName = serviceURI.getGroupName();
+ final String serviceType = serviceURI.getServiceType();
+
+ }
+}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -44,13 +44,13 @@
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(factoryClass);
- coreSession.getProtocolHandler().writeStreamIdentifier(out, streamIdentifier);
+ out.writeObject(streamIdentifier);
}
@SuppressWarnings ({"unchecked"})
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
coreSession = CoreSession.getInstance();
factoryClass = (Class<? extends StreamSerializerFactory>) in.readObject();
- streamIdentifier = coreSession.getProtocolHandler().readStreamIdentifier(in);
+ streamIdentifier = (StreamIdentifier) in.readObject();
}
}
Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionImpl.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionImpl.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionImpl.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -15,7 +15,6 @@
import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
import org.jboss.cx.remoting.spi.protocol.StreamIdentifier;
import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.ServiceLocator;
import java.util.LinkedList;
import java.util.Set;
@@ -185,6 +184,14 @@
});
}
+ public ContextIdentifier getLocalRootContextIdentifier() {
+ return null;
+ }
+
+ public ContextIdentifier getRemoteRootContextIdentifier() {
+ return null;
+ }
+
public ContextIdentifier openContext(final ServiceIdentifier serviceIdentifier) throws IOException {
final ContextIdentifier contextIdentifier = null;
outgoingQueue.add(new OutputAction() {
@@ -218,26 +225,6 @@
return null;
}
- public void sendServiceRequest(final ServiceIdentifier serviceIdentifier, final ServiceLocator<?, ?> locator) throws IOException {
- outgoingQueue.add(new OutputAction() {
- public void run(ByteOutput target) throws IOException {
- final MessageOutput msgOutput = protocolContext.getMessageOutput(target);
- write(msgOutput, MsgType.SERVICE_REQUEST);
- write(msgOutput, serviceIdentifier);
- msgOutput.writeObject(locator.getRequestType());
- msgOutput.writeObject(locator.getReplyType());
- msgOutput.writeUTF(locator.getServiceType());
- msgOutput.writeUTF(locator.getServiceGroupName());
- final Set<String> interceptors = locator.getAvailableInterceptors();
- msgOutput.writeInt(interceptors.size());
- for (String name : interceptors) {
- msgOutput.writeUTF(name);
- }
- msgOutput.commit();
- }
- });
- }
-
public void closeService(final ServiceIdentifier serviceIdentifier) throws IOException {
outgoingQueue.add(new OutputAction() {
public void run(ByteOutput target) throws IOException {
@@ -273,6 +260,10 @@
});
}
+ public ContextIdentifier openContext() throws IOException {
+ return null;
+ }
+
public StreamIdentifier openStream() throws IOException {
return null;
}
Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,7 +1,6 @@
package org.jboss.cx.remoting.jrpp;
import java.io.IOException;
-import java.io.ObjectInput;
import java.io.ObjectOutput;
import static java.lang.Math.min;
import java.util.Enumeration;
@@ -11,6 +10,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.AttributeKey;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoBuffer;
@@ -20,13 +20,12 @@
import org.apache.mina.handler.multiton.SingleSessionIoHandler;
import org.jboss.cx.remoting.CommonKeys;
import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.util.AttributeMap;
import org.jboss.cx.remoting.util.MessageOutput;
import org.jboss.cx.remoting.util.MessageInput;
import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.jrpp.id.IdentifierManager;
+import org.jboss.cx.remoting.util.WeakHashSet;
import org.jboss.cx.remoting.jrpp.id.JrppContextIdentifier;
import org.jboss.cx.remoting.jrpp.id.JrppRequestIdentifier;
import org.jboss.cx.remoting.jrpp.id.JrppServiceIdentifier;
@@ -71,7 +70,6 @@
private final ProtocolHandler protocolHandler;
private final SingleSessionIoHandler ioHandler;
- private final IdentifierManager identifierManager;
private final AttributeMap attributeMap;
private IoSession ioSession;
@@ -79,6 +77,18 @@
private ProtocolContext protocolContext;
private IOException failureReason;
+ private boolean client;
+
+ private final AtomicInteger streamIdSequence = new AtomicInteger(0);
+ private final AtomicInteger contextIdSequence = new AtomicInteger(1);
+ private final AtomicInteger serviceIdSequence = new AtomicInteger(0);
+ private final AtomicInteger requestIdSequence = new AtomicInteger(0);
+
+ private final Set<StreamIdentifier> liveStreamSet = CollectionUtil.synchronizedSet(new WeakHashSet<StreamIdentifier>());
+ private final Set<ContextIdentifier> liveContextSet = CollectionUtil.synchronizedSet(new WeakHashSet<ContextIdentifier>());
+ private final Set<RequestIdentifier> liveRequestSet = CollectionUtil.synchronizedSet(new WeakHashSet<RequestIdentifier>());
+ private final Set<ServiceIdentifier> liveServiceSet = CollectionUtil.synchronizedSet(new WeakHashSet<ServiceIdentifier>());
+
/**
* The negotiated protocol version. Value is set to {@code min(PROTOCOL_VERSION, remote PROTOCOL_VERSION)}.
*/
@@ -118,7 +128,6 @@
public JrppConnection(final AttributeMap attributeMap) {
this.attributeMap = attributeMap;
ioHandler = new IoHandlerImpl();
- identifierManager = new IdentifierManager();
protocolHandler = new RemotingProtocolHandler();
}
@@ -128,6 +137,7 @@
ioSession.setAttribute(JRPP_CONNECTION, this);
this.ioSession = ioSession;
this.protocolContext = protocolContext;
+ client = true;
} finally {
state.releaseExclusive();
}
@@ -140,6 +150,7 @@
this.ioSession = ioSession;
final ProtocolContext protocolContext = protocolServerContext.establishSession(protocolHandler);
this.protocolContext = protocolContext;
+ client = false;
} finally {
state.releaseExclusive();
}
@@ -361,6 +372,42 @@
}
}
+ private JrppContextIdentifier getNewContextIdentifier() {
+ for (;;) {
+ final JrppContextIdentifier contextIdentifier = new JrppContextIdentifier(client, contextIdSequence.getAndIncrement());
+ if (liveContextSet.add(contextIdentifier)) {
+ return contextIdentifier;
+ }
+ }
+ }
+
+ private JrppRequestIdentifier getNewRequestIdentifier() {
+ for (;;) {
+ final JrppRequestIdentifier requestIdentifier = new JrppRequestIdentifier(client, requestIdSequence.getAndIncrement());
+ if (liveRequestSet.add(requestIdentifier)) {
+ return requestIdentifier;
+ }
+ }
+ }
+
+ private JrppStreamIdentifier getNewStreamIdentifier() {
+ for (;;) {
+ final JrppStreamIdentifier streamIdentifier = new JrppStreamIdentifier(client, streamIdSequence.getAndIncrement());
+ if (liveStreamSet.add(streamIdentifier)) {
+ return streamIdentifier;
+ }
+ }
+ }
+
+ private JrppServiceIdentifier getNewServiceIdentifier() {
+ for (;;) {
+ final JrppServiceIdentifier serviceIdentifier = new JrppServiceIdentifier(client, serviceIdSequence.getAndIncrement());
+ if (liveServiceSet.add(serviceIdentifier)) {
+ return serviceIdentifier;
+ }
+ }
+ }
+
private static IoBuffer newBuffer(final int initialSize, final boolean autoexpand) {
return IoBuffer.allocate(initialSize + 4).setAutoExpand(autoexpand).skip(4);
}
@@ -368,7 +415,7 @@
public final class RemotingProtocolHandler implements ProtocolHandler {
public ContextIdentifier openContext(ServiceIdentifier serviceIdentifier) throws IOException {
- final ContextIdentifier contextIdentifier = new JrppContextIdentifier(identifierManager.getIdentifier());
+ final ContextIdentifier contextIdentifier = getNewContextIdentifier();
final IoBuffer buffer = newBuffer(60, false);
final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
write(output, MessageType.OPEN_CONTEXT);
@@ -379,15 +426,15 @@
}
public RequestIdentifier openRequest(ContextIdentifier contextIdentifier) throws IOException {
- return new JrppRequestIdentifier(identifierManager.getIdentifier());
+ return getNewRequestIdentifier();
}
public StreamIdentifier openStream() throws IOException {
- return new JrppStreamIdentifier(identifierManager.getIdentifier());
+ return getNewStreamIdentifier();
}
public ServiceIdentifier openService() throws IOException {
- return new JrppServiceIdentifier(identifierManager.getIdentifier());
+ return getNewServiceIdentifier();
}
public void closeSession() throws IOException {
@@ -437,52 +484,6 @@
}
}
- public StreamIdentifier readStreamIdentifier(ObjectInput input) throws IOException {
- return new JrppStreamIdentifier(input);
- }
-
- public void writeStreamIdentifier(ObjectOutput output, StreamIdentifier identifier) throws IOException {
- write(output, identifier);
- }
-
- public void sendServiceRequest(ServiceIdentifier serviceIdentifier, ServiceLocator<?, ?> locator) throws IOException {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- if (locator == null) {
- throw new NullPointerException("locator is null");
- }
- if (! state.in(State.UP)) {
- throw new IllegalStateException("JrppConnection is not in the UP state!");
- }
- final IoBuffer buffer = newBuffer(500, true);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
- write(output, MessageType.SERVICE_REQUEST);
- write(output, serviceIdentifier);
- output.writeObject(locator.getRequestType());
- output.writeObject(locator.getReplyType());
- output.writeUTF(locator.getServiceType());
- output.writeUTF(locator.getServiceGroupName());
- final Set<String> interceptors = locator.getAvailableInterceptors();
- final int cnt = interceptors.size();
- output.writeInt(cnt);
- for (String name : interceptors) {
- output.writeUTF(name);
- }
- output.commit();
- }
-
- public void sendServiceActivate(ServiceIdentifier serviceIdentifier) throws IOException {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- final IoBuffer buffer = newBuffer(60, false);
- final MessageOutput output = protocolContext.getMessageOutput(new IoBufferByteOutput(buffer, ioSession));
- write(output, MessageType.SERVICE_ACTIVATE);
- write(output, serviceIdentifier);
- output.commit();
- }
-
public void sendReply(ContextIdentifier remoteContextIdentifier, RequestIdentifier requestIdentifier, Object reply) throws IOException {
if (remoteContextIdentifier == null) {
throw new NullPointerException("remoteContextIdentifier is null");
@@ -560,6 +561,14 @@
output.commit();
}
+ public ContextIdentifier getLocalRootContextIdentifier() {
+ return null;
+ }
+
+ public ContextIdentifier getRemoteRootContextIdentifier() {
+ return null;
+ }
+
public void sendCancelRequest(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier, boolean mayInterrupt) throws IOException {
if (contextIdentifier == null) {
throw new NullPointerException("contextIdentifier is null");
@@ -576,6 +585,10 @@
output.commit();
}
+ public ContextIdentifier openContext() throws IOException {
+ return null;
+ }
+
public MessageOutput sendStreamData(StreamIdentifier streamIdentifier, Executor streamExecutor) throws IOException {
if (streamIdentifier == null) {
throw new NullPointerException("streamIdentifier is null");
@@ -633,22 +646,6 @@
}
}
- private ContextIdentifier readCtxtId(MessageInput input) throws IOException {
- return new JrppContextIdentifier(input.readShort());
- }
-
- private ServiceIdentifier readSvcId(MessageInput input) throws IOException {
- return new JrppServiceIdentifier(input.readShort());
- }
-
- private StreamIdentifier readStrId(MessageInput input) throws IOException {
- return new JrppStreamIdentifier(input.readShort());
- }
-
- private RequestIdentifier readReqId(MessageInput input) throws IOException {
- return new JrppRequestIdentifier(input.readShort());
- }
-
public void messageReceived(Object message) throws Exception {
final boolean trace = log.isTrace();
final MessageInput input = protocolContext.getMessageInput(new IoBufferByteInput((IoBuffer) message));
@@ -809,56 +806,56 @@
case UP: {
switch (type) {
case OPEN_CONTEXT: {
- final ServiceIdentifier serviceIdentifier = readSvcId(input);
- final ContextIdentifier contextIdentifier = readCtxtId(input);
+ final ServiceIdentifier serviceIdentifier = (ServiceIdentifier) input.readObject();
+ final ContextIdentifier contextIdentifier = (ContextIdentifier) input.readObject();
protocolContext.receiveOpenedContext(serviceIdentifier, contextIdentifier);
return;
}
case CANCEL_ACK: {
- final ContextIdentifier contextIdentifier = readCtxtId(input);
- final RequestIdentifier requestIdentifier = readReqId(input);
+ final ContextIdentifier contextIdentifier = (ContextIdentifier) input.readObject();
+ final RequestIdentifier requestIdentifier = (RequestIdentifier) input.readObject();
protocolContext.receiveCancelAcknowledge(contextIdentifier, requestIdentifier);
return;
}
case CANCEL_REQ: {
- final ContextIdentifier contextIdentifier = readCtxtId(input);
- final RequestIdentifier requestIdentifier = readReqId(input);
+ final ContextIdentifier contextIdentifier = (ContextIdentifier) input.readObject();
+ final RequestIdentifier requestIdentifier = (RequestIdentifier) input.readObject();
final boolean mayInterrupt = input.readBoolean();
protocolContext.receiveCancelRequest(contextIdentifier, requestIdentifier, mayInterrupt);
return;
}
case CLOSE_CONTEXT: {
- final ContextIdentifier contextIdentifier = readCtxtId(input);
+ final ContextIdentifier contextIdentifier = (ContextIdentifier) input.readObject();
protocolContext.closeContext(contextIdentifier);
return;
}
case CLOSE_SERVICE: {
- final ServiceIdentifier serviceIdentifier = readSvcId(input);
+ final ServiceIdentifier serviceIdentifier = (ServiceIdentifier) input.readObject();
protocolContext.closeService(serviceIdentifier);
return;
}
case CLOSE_STREAM: {
- final StreamIdentifier streamIdentifier = readStrId(input);
+ final StreamIdentifier streamIdentifier = (StreamIdentifier) input.readObject();
protocolContext.closeStream(streamIdentifier);
return;
}
case EXCEPTION: {
- final ContextIdentifier contextIdentifier = readCtxtId(input);
- final RequestIdentifier requestIdentifier = readReqId(input);
+ final ContextIdentifier contextIdentifier = (ContextIdentifier) input.readObject();
+ final RequestIdentifier requestIdentifier = (RequestIdentifier) input.readObject();
final RemoteExecutionException exception = (RemoteExecutionException) input.readObject();
protocolContext.receiveException(contextIdentifier, requestIdentifier, exception);
return;
}
case REPLY: {
- final ContextIdentifier contextIdentifier = readCtxtId(input);
- final RequestIdentifier requestIdentifier = readReqId(input);
+ final ContextIdentifier contextIdentifier = (ContextIdentifier) input.readObject();
+ final RequestIdentifier requestIdentifier = (RequestIdentifier) input.readObject();
final Object reply = input.readObject();
protocolContext.receiveReply(contextIdentifier, requestIdentifier, reply);
return;
}
case REQUEST: {
- final ContextIdentifier contextIdentifier = readCtxtId(input);
- final RequestIdentifier requestIdentifier = readReqId(input);
+ final ContextIdentifier contextIdentifier = (ContextIdentifier) input.readObject();
+ final RequestIdentifier requestIdentifier = (RequestIdentifier) input.readObject();
final Object request = input.readObject();
if (trace) {
log.trace("Received request - body is %s", request);
@@ -866,38 +863,13 @@
protocolContext.receiveRequest(contextIdentifier, requestIdentifier, request);
return;
}
- case SERVICE_ACTIVATE: {
- final ServiceIdentifier serviceIdentifier = readSvcId(input);
- protocolContext.receiveServiceActivate(serviceIdentifier);
- return;
- }
- case SERVICE_REQUEST: {
- final ServiceIdentifier serviceIdentifier = readSvcId(input);
- final Class<?> requestType = (Class<?>) input.readObject();
- final Class<?> replyType = (Class<?>) input.readObject();
- final String serviceType = input.readUTF();
- final String serviceGroupName = input.readUTF();
- final Set<String> interceptors = CollectionUtil.hashSet();
- int c = input.readInt();
- for (int i = 0; i < c; i ++) {
- interceptors.add(input.readUTF());
- }
- final ServiceLocator<?, ?> locator = ServiceLocator.DEFAULT
- .setRequestType(requestType)
- .setReplyType(replyType)
- .setServiceType(serviceType)
- .setServiceGroupName(serviceGroupName)
- .setAvailableInterceptors(interceptors);
- protocolContext.receiveServiceRequest(serviceIdentifier, locator);
- return;
- }
- case SERVICE_TERMINATE: {
- final ServiceIdentifier serviceIdentifier = readSvcId(input);
+ case SERVICE_TERMINATE: {
+ final ServiceIdentifier serviceIdentifier = (ServiceIdentifier) input.readObject();
protocolContext.receiveServiceTerminate(serviceIdentifier);
return;
}
case STREAM_DATA: {
- final StreamIdentifier streamIdentifier = readStrId(input);
+ final StreamIdentifier streamIdentifier = (StreamIdentifier) input.readObject();
protocolContext.receiveStreamData(streamIdentifier, input);
return;
}
@@ -931,8 +903,6 @@
EXCEPTION,
REPLY,
REQUEST,
- SERVICE_ACTIVATE,
- SERVICE_REQUEST,
SERVICE_TERMINATE,
STREAM_DATA,
}
Deleted: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/IdentifierManager.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/IdentifierManager.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/IdentifierManager.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -1,27 +0,0 @@
-package org.jboss.cx.remoting.jrpp.id;
-
-import java.util.BitSet;
-
-/**
- *
- */
-public final class IdentifierManager {
- private final BitSet bitSet = new BitSet(64);
-
- public synchronized short getIdentifier() {
- final int id = bitSet.nextClearBit(1);
- if (id > 0xffff) {
- return 0;
- } else {
- return (short) id;
- }
- }
-
- public synchronized void freeIdentifier(short id) {
- bitSet.clear(id & 0xffff);
- }
-
- public void getIdentifier(final short id) {
-
- }
-}
Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppContextIdentifier.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppContextIdentifier.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppContextIdentifier.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -9,12 +9,11 @@
*/
@SuppressWarnings ({"EqualsAndHashcode"})
public final class JrppContextIdentifier extends JrppSubChannelIdentifier implements ContextIdentifier {
- public JrppContextIdentifier(short id) throws IOException {
- super(id);
+ public JrppContextIdentifier() {
}
- public JrppContextIdentifier(ObjectInputStream ois) throws IOException {
- super(ois);
+ public JrppContextIdentifier(final boolean client, final int id) {
+ super(client, id);
}
public boolean equals(Object obj) {
Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppRequestIdentifier.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppRequestIdentifier.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppRequestIdentifier.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -9,12 +9,11 @@
*/
@SuppressWarnings ({"EqualsAndHashcode"})
public final class JrppRequestIdentifier extends JrppSubChannelIdentifier implements RequestIdentifier {
- public JrppRequestIdentifier(short id) throws IOException {
- super(id);
+ public JrppRequestIdentifier() {
}
- public JrppRequestIdentifier(MessageInput input) throws IOException {
- super(input);
+ public JrppRequestIdentifier(final boolean client, final int id) {
+ super(client, id);
}
public boolean equals(Object obj) {
Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppServiceIdentifier.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppServiceIdentifier.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppServiceIdentifier.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -9,12 +9,11 @@
*/
@SuppressWarnings ({"EqualsAndHashcode"})
public final class JrppServiceIdentifier extends JrppSubChannelIdentifier implements ServiceIdentifier {
- public JrppServiceIdentifier(short id) throws IOException {
- super(id);
+ public JrppServiceIdentifier() {
}
- public JrppServiceIdentifier(MessageInput input) throws IOException {
- super(input);
+ public JrppServiceIdentifier(final boolean client, final int id) {
+ super(client, id);
}
public boolean equals(Object obj) {
Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppStreamIdentifier.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppStreamIdentifier.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppStreamIdentifier.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -9,12 +9,11 @@
*/
@SuppressWarnings ({"EqualsAndHashcode"})
public final class JrppStreamIdentifier extends JrppSubChannelIdentifier implements StreamIdentifier {
- public JrppStreamIdentifier(short id) throws IOException {
- super(id);
+ public JrppStreamIdentifier() {
}
- public JrppStreamIdentifier(ObjectInput input) throws IOException {
- super(input);
+ public JrppStreamIdentifier(final boolean client, final int id) {
+ super(client, id);
}
public boolean equals(Object obj) {
Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppSubChannelIdentifier.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppSubChannelIdentifier.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppSubChannelIdentifier.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -2,43 +2,48 @@
import java.io.IOException;
import java.io.ObjectInput;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.io.Externalizable;
+import java.io.ObjectOutput;
/**
*
*/
-public abstract class JrppSubChannelIdentifier {
- private final short id;
- private final AtomicBoolean dead = new AtomicBoolean(false);
+public abstract class JrppSubChannelIdentifier implements Externalizable {
+ private /*final*/ boolean client;
+ private /*final*/ int id;
- public JrppSubChannelIdentifier(short id) throws IOException {
+ protected JrppSubChannelIdentifier() {
+ }
+
+ protected JrppSubChannelIdentifier(final boolean client, final int id) {
+ if (id < 0) {
+ throw new IllegalArgumentException("id must be >= 0");
+ }
+ this.client = client;
this.id = id;
}
- public JrppSubChannelIdentifier(ObjectInput input) throws IOException {
- id = input.readShort();
+ public final void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(id << 1 | (client ? 0 : 1));
}
- public void release(IdentifierManager manager) {
- if (!dead.getAndSet(true)) {
- manager.freeIdentifier(id);
- }
+ public final void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ int i = in.readInt();
+ id = i >>> 1;
+ client = (i & 1) == 0;
}
- public short getId() {
- if (dead.get()) {
- throw new IllegalStateException("Read channel ID after close");
- }
+ public int getId() {
return id;
}
public boolean equals(Object obj) {
if (!(obj instanceof JrppSubChannelIdentifier)) return false;
JrppSubChannelIdentifier other = (JrppSubChannelIdentifier) obj;
- return !(dead.get() || other.dead.get()) && other.id == id;
+ return other.id == id && other.client == client;
}
public int hashCode() {
- return id;
+ return id << 1 | (client ? 0 : 1);
}
}
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-02-25 13:35:25 UTC (rev 3501)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-02-26 00:40:52 UTC (rev 3502)
@@ -2,13 +2,12 @@
import java.io.IOException;
import java.net.URI;
-import org.jboss.cx.remoting.core.CoreEndpointProvider;
import org.jboss.cx.remoting.util.AttributeHashMap;
import org.jboss.cx.remoting.util.AttributeMap;
import org.jboss.cx.remoting.log.Logger;
-import org.jboss.cx.remoting.spi.EndpointProvider;
import org.jboss.cx.remoting.spi.wrapper.ContextSourceWrapper;
import org.jboss.cx.remoting.spi.wrapper.SessionWrapper;
+import org.jboss.cx.remoting.core.CoreEndpoint;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
@@ -22,75 +21,16 @@
public final class Remoting {
private static final Logger log = Logger.getLogger(Remoting.class);
- private static final class EndpointProviderHolder {
- private static final EndpointProvider provider = new CoreEndpointProvider();
- }
-
public static Endpoint createEndpoint(String name) {
- return EndpointProviderHolder.provider.createEndpoint(name);
+ return null;
}
public static Session createEndpointAndSession(String endpointName, URI remoteUri, final String userName, final char[] password) throws RemotingException {
- final Endpoint endpoint = createEndpoint(endpointName);
- boolean ok = false;
- final CallbackHandler callbackHandler = new CallbackHandler() {
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
- for (Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- ((NameCallback)callback).setName(userName);
- } else if (callback instanceof PasswordCallback) {
- ((PasswordCallback)callback).setPassword(password);
- } else {
- throw new UnsupportedCallbackException(callback);
- }
- }
- }
- };
- final AttributeMap attributeMap = new AttributeHashMap();
- attributeMap.put(CommonKeys.AUTH_CALLBACK_HANDLER, callbackHandler);
- try {
- final Session session = new SessionWrapper(endpoint.openSession(remoteUri, attributeMap)) {
- public void close() throws RemotingException {
- try {
- super.close();
- } finally {
- endpoint.shutdown();
- }
- }
- };
- ok = true;
- return session;
- } finally {
- if (! ok) {
- endpoint.shutdown();
- }
- }
+ return null;
}
public static <I, O> ContextSource<I, O> createEndpointAndOpenService(String endpointName, URI remoteUri, String userName, char[] password, Class<I> requestType, Class<O> replyType, String serviceType, String serviceGroupName) throws RemotingException {
- final Session session = createEndpointAndSession(endpointName, remoteUri, userName, password);
- boolean ok = false;
- try {
- final ContextSource<I, O> service = new ContextSourceWrapper<I, O>(session.openService(ServiceLocator.DEFAULT.setRequestType(requestType).setReplyType(replyType).setServiceGroupName(serviceGroupName).setServiceType(serviceType))) {
- public void close() {
- try {
- super.close();
- } finally {
- try {
- session.close();
- } catch (RemotingException e) {
- log.error(e, "Failed to close Remoting session");
- }
- }
- }
- };
- ok = true;
- return service;
- } finally {
- if (! ok) {
- session.close();
- }
- }
+ return null;
}
// privates
17 years, 8 months
JBoss Remoting SVN: r3501 - remoting2/branches/2.x/src/main/org/jboss/remoting/callback.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-02-25 08:35:25 -0500 (Mon, 25 Feb 2008)
New Revision: 3501
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/callback/CallbackStore.java
Log:
Fix non-UTF-8 characters
Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/callback/CallbackStore.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/callback/CallbackStore.java 2008-02-25 03:02:04 UTC (rev 3500)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/callback/CallbackStore.java 2008-02-25 13:35:25 UTC (rev 3501)
@@ -76,7 +76,7 @@
public static final String FILE_PATH_KEY = "StoreFilePath";
/**
- * Key for setting the file suffix to use for the callback objects written to disk. The default value is �ser�.
+ * Key for setting the file suffix to use for the callback objects written to disk. The default value is "ser".
*/
public static final String FILE_SUFFIX_KEY = "StoreFileSuffix";
17 years, 8 months
JBoss Remoting SVN: r3500 - remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/socket/shutdown.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-02-24 22:02:04 -0500 (Sun, 24 Feb 2008)
New Revision: 3500
Modified:
remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/socket/shutdown/SocketShutdownTestCase.java
Log:
JBREM-674: Corrected typo: "socket.check_connection=trye".
Modified: remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/socket/shutdown/SocketShutdownTestCase.java
===================================================================
--- remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/socket/shutdown/SocketShutdownTestCase.java 2008-02-25 02:56:06 UTC (rev 3499)
+++ remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/socket/shutdown/SocketShutdownTestCase.java 2008-02-25 03:02:04 UTC (rev 3500)
@@ -47,7 +47,7 @@
String serverCommand = command + ShutdownTestServer.class.getName() + " " + getTransport();
serverCommand += " socket.check_connection=true&";
String clientCommand = command + OpenClient.class.getName() + " " + getTransport();
- clientCommand += " socket.check_connection=trye&";
+ clientCommand += " socket.check_connection=true&";
Executor serverExecutor = new Executor(serverCommand, true);
log.info("starting server");
serverExecutor.start();
17 years, 8 months
JBoss Remoting SVN: r3499 - remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-02-24 21:56:06 -0500 (Sun, 24 Feb 2008)
New Revision: 3499
Removed:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRegistration.java
Log:
Remove unused class
Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRegistration.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRegistration.java 2008-02-24 08:03:37 UTC (rev 3498)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRegistration.java 2008-02-25 02:56:06 UTC (rev 3499)
@@ -1,112 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import org.jboss.cx.remoting.spi.Registration;
-
-/**
- *
- */
-public abstract class AbstractRegistration implements Registration {
- protected final Object sync = new Object();
- private int runCount;
- private boolean started;
- private boolean dead;
- private boolean stopping;
-
- protected AbstractRegistration() {
- }
-
- public final void start() {
- synchronized(sync) {
- if (started || stopping) {
- throw new IllegalStateException("Registration not stopped");
- }
- if (dead) {
- throw new IllegalStateException("Registration has been unregistered and may not be started again");
- }
- started = true;
- runCount = 0;
- }
- }
-
- public final void stop() {
- synchronized(sync) {
- if (! started) {
- throw new IllegalStateException("Registration not started");
- }
- started = false;
- if (runCount > 0) {
- stopping = true;
- }
- }
- signalShutdown();
- synchronized(sync) {
- if (stopping) {
- boolean intr = Thread.interrupted();
- try {
- while (runCount > 0) {
- try {
- sync.wait();
- } catch (InterruptedException e) {
- intr = Thread.interrupted();
- }
- }
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- stopping = false;
- }
- }
- }
-
- public final void unregister() {
- synchronized(sync) {
- if (dead) {
- throw new IllegalStateException("Registration already unregistered");
- }
- if (started) {
- stop();
- }
- dead = true;
- }
- remove();
- }
-
- /**
- * Signal the shutdown of this registration.
- */
- protected abstract void signalShutdown();
-
- /**
- * Remove this registration after successful unregister.
- */
- protected abstract void remove();
-
- protected final void acquireForRun() {
- synchronized(sync) {
- if (! started) {
- throw new IllegalStateException("Registration not started");
- }
- runCount++;
- }
- }
-
- protected final void freeForRun() {
- synchronized(sync) {
- if (runCount == 0) {
- throw new IllegalStateException("More frees than acquires for registration");
- }
- runCount--;
- if (stopping) {
- sync.notify();
- }
- }
- }
-
- protected final boolean isStopped() {
- synchronized(sync) {
- return ! started && ! stopping && ! dead;
- }
- }
-}
17 years, 8 months
JBoss Remoting SVN: r3498 - remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-02-24 03:03:37 -0500 (Sun, 24 Feb 2008)
New Revision: 3498
Modified:
remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/ProducerConsumerSoakTestParent.java
Log:
JBREM-911: Prints free memory.
Modified: remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/ProducerConsumerSoakTestParent.java
===================================================================
--- remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/ProducerConsumerSoakTestParent.java 2008-02-24 07:49:49 UTC (rev 3497)
+++ remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/ProducerConsumerSoakTestParent.java 2008-02-24 08:03:37 UTC (rev 3498)
@@ -88,6 +88,8 @@
public void testSoak() throws Throwable
{
log.info("entering " + getName());
+ System.gc();
+ log.info("free space: " + Runtime.getRuntime().freeMemory());
// Start server.
setupServer();
@@ -152,6 +154,7 @@
shutdownServer();
System.gc();
+ log.info("free space: " + Runtime.getRuntime().freeMemory());
log.info(getName() + " PASSES");
}
17 years, 8 months
JBoss Remoting SVN: r3497 - in remoting2/branches/2.x/src/tests/org/jboss/test/remoting: soak and 1 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-02-24 02:49:49 -0500 (Sun, 24 Feb 2008)
New Revision: 3497
Added:
remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/
remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/ProducerConsumerSoakTestParent.java
remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/SoakConstants.java
remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/bisocket/
remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/bisocket/BisocketProducerConsumerSoakTest.java
Log:
JBREM-911: Simple soak test.
Added: remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/ProducerConsumerSoakTestParent.java
===================================================================
--- remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/ProducerConsumerSoakTestParent.java (rev 0)
+++ remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/ProducerConsumerSoakTestParent.java 2008-02-24 07:49:49 UTC (rev 3497)
@@ -0,0 +1,290 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.test.remoting.soak;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.management.MBeanServer;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.jboss.logging.XLevel;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.ServerInvocationHandler;
+import org.jboss.remoting.ServerInvoker;
+import org.jboss.remoting.callback.Callback;
+import org.jboss.remoting.callback.HandleCallbackException;
+import org.jboss.remoting.callback.InvokerCallbackHandler;
+import org.jboss.remoting.transport.Connector;
+import org.jboss.remoting.transport.PortUtil;
+import org.jboss.remoting.transport.bisocket.Bisocket;
+
+
+public abstract class ProducerConsumerSoakTestParent extends TestCase
+{
+ private static Logger log = Logger.getLogger(ProducerConsumerSoakTestParent.class);
+
+ private static boolean firstTime = true;
+
+ protected String host;
+ protected int port;
+ protected String locatorURI;
+ protected InvokerLocator serverLocator;
+ protected Connector connector;
+ protected TestInvocationHandler invocationHandler;
+
+
+ public void setUp() throws Exception
+ {
+ if (firstTime)
+ {
+ firstTime = false;
+ Logger.getLogger("org.jboss.remoting").setLevel(XLevel.INFO);
+ Logger.getLogger("org.jboss.test.remoting").setLevel(Level.INFO);
+ String pattern = "[%d{ABSOLUTE}] [%t] %5p (%F:%L) - %m%n";
+ PatternLayout layout = new PatternLayout(pattern);
+ ConsoleAppender consoleAppender = new ConsoleAppender(layout);
+ Logger.getRootLogger().addAppender(consoleAppender);
+ }
+ }
+
+
+ public void tearDown()
+ {
+ }
+
+
+ public void testSoak() throws Throwable
+ {
+ log.info("entering " + getName());
+
+ // Start server.
+ setupServer();
+
+ // Create clients.
+ InvokerLocator clientLocator = new InvokerLocator(locatorURI);
+ HashMap clientConfig = new HashMap();
+ clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
+ addExtraClientConfig(clientConfig);
+ Client clients[] = new Client[SoakConstants.SENDERS];
+ TestCallbackHandler callbackHandler = new TestCallbackHandler();
+ Sender senders[] = new Sender[SoakConstants.SENDERS];
+
+ for (int i = 0; i < SoakConstants.SENDERS; i++)
+ {
+ clients[i] = new Client(clientLocator, clientConfig);
+ clients[i].connect();
+ log.info("client " + i + " is connected");
+ HashMap callbackMetadata = new HashMap();
+
+ for (int j = 0; j < SoakConstants.CALLBACK_LISTENERS; j++)
+ {
+ callbackMetadata.clear();
+ addExtraCallbackConfig(callbackMetadata);
+ clients[i].addListener(callbackHandler, callbackMetadata, null, true);
+ }
+ log.info("callback handlers installed for client " + i);
+
+ senders[i] = new Sender(i, clients[i]);
+ }
+
+ Timer timer = new Timer(true);
+ timer.schedule(new IntervalTimerTask(), 60000, 60000);
+
+ for (int i = 0; i < SoakConstants.SENDERS; i++)
+ {
+ senders[i].start();
+ }
+ log.info("senders atarted");
+
+ int invocations = 0;
+ for (int i = 0; i < SoakConstants.SENDERS; i++)
+ {
+ senders[i].join();
+ invocations += senders[i].counter;
+ }
+
+ log.info("senders done");
+ log.info("invocations made: " + invocations);
+ log.info("invocations received: " + invocationHandler.counter);
+ log.info("callbacks received: " + callbackHandler.counter);
+
+ assertEquals(invocations, invocationHandler.counter);
+ assertEquals(invocations * SoakConstants.SENDERS * SoakConstants.CALLBACK_LISTENERS,
+ callbackHandler.counter);
+
+ for (int i = 0; i < SoakConstants.SENDERS; i++)
+ {
+ clients[i].removeListener(callbackHandler);
+ clients[i].disconnect();
+ }
+
+ shutdownServer();
+ System.gc();
+ log.info(getName() + " PASSES");
+ }
+
+
+ protected abstract String getTransport();
+
+
+ protected void addExtraClientConfig(Map config) {}
+ protected void addExtraServerConfig(Map config) {}
+ protected void addExtraCallbackConfig(Map config) {}
+
+
+ protected void setupServer() throws Exception
+ {
+ host = InetAddress.getLocalHost().getHostAddress();
+ port = PortUtil.findFreePort(host);
+ locatorURI = getTransport() + "://" + host + ":" + port;
+ serverLocator = new InvokerLocator(locatorURI);
+ log.info("Starting remoting server with locator uri of: " + locatorURI);
+ HashMap config = new HashMap();
+ config.put(InvokerLocator.FORCE_REMOTE, "true");
+ addExtraServerConfig(config);
+ connector = new Connector(serverLocator, config);
+ connector.create();
+ invocationHandler = new TestInvocationHandler();
+ connector.addInvocationHandler("test", invocationHandler);
+ connector.start();
+ }
+
+
+ protected void shutdownServer() throws Exception
+ {
+ if (connector != null)
+ connector.stop();
+ }
+
+
+ static class TestInvocationHandler implements ServerInvocationHandler
+ {
+ HashSet listeners = new HashSet();
+ int counter;
+ Object lock = new Object();
+
+ public void addListener(InvokerCallbackHandler callbackHandler)
+ {
+ listeners.add(callbackHandler);
+ }
+
+ public Object invoke(final InvocationRequest invocation) throws Throwable
+ {
+ synchronized (lock)
+ {
+ counter++;
+ if ((counter + 1) % SoakConstants.INTERVAL == 0)
+ log.info("invocations received: " + (counter + 1));
+ }
+
+ Object o = invocation.getParameter();
+ Callback c = new Callback(o);
+ Iterator it = listeners.iterator();
+ while(it.hasNext())
+ {
+ InvokerCallbackHandler handler = (InvokerCallbackHandler) it.next();
+ handler.handleCallback(c);
+ }
+
+ return o;
+ }
+
+ public void removeListener(InvokerCallbackHandler callbackHandler) {}
+ public void setMBeanServer(MBeanServer server) {}
+ public void setInvoker(ServerInvoker invoker) {}
+ }
+
+
+ static class TestCallbackHandler implements InvokerCallbackHandler
+ {
+ int counter;
+ Object lock = new Object();
+
+ public void handleCallback(Callback callback) throws HandleCallbackException
+ {
+ synchronized (lock)
+ {
+ counter++;
+ if ((counter + 1) % SoakConstants.INTERVAL == 0)
+ log.info("callbacks received: " + (counter + 1));
+ }
+ }
+ }
+
+ static class Sender extends Thread
+ {
+ String name;
+ Client client;
+ int counter;
+ Object lock = new Object();
+ long start;
+
+ public Sender(int id, Client client)
+ {
+ name = "sender:" + id;
+ this.client = client;
+ }
+
+ public void run()
+ {
+ try
+ {
+ start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start <= SoakConstants.DURATION)
+ {
+ counter++;
+ if ((counter + 1) % SoakConstants.INTERVAL == 0)
+ log.info(name + " invcations made: : " + (counter + 1));
+ client.invoke(name + ":" + counter);
+ }
+ }
+ catch (Throwable t)
+ {
+ log.error(this, t);
+ }
+ }
+ }
+
+
+ static class IntervalTimerTask extends TimerTask
+ {
+ int counter;
+
+ public void run()
+ {
+ log.info("MINUTES ELAPSED: " + ++counter);
+ }
+ }
+}
\ No newline at end of file
Added: remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/SoakConstants.java
===================================================================
--- remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/SoakConstants.java (rev 0)
+++ remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/SoakConstants.java 2008-02-24 07:49:49 UTC (rev 3497)
@@ -0,0 +1,42 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.test.remoting.soak;
+
+/**
+ *
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version $Revision: 1.1 $
+ * <p>
+ * Copyright Feb 24, 2008
+ * </p>
+ */
+public class SoakConstants
+{
+ public static final int SENDERS = 5;
+
+ public static final int CALLBACK_LISTENERS = 5;
+
+ public static final int DURATION = 30000;
+
+ public static final int INTERVAL = 1000;
+}
+
Added: remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/bisocket/BisocketProducerConsumerSoakTest.java
===================================================================
--- remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/bisocket/BisocketProducerConsumerSoakTest.java (rev 0)
+++ remoting2/branches/2.x/src/tests/org/jboss/test/remoting/soak/bisocket/BisocketProducerConsumerSoakTest.java 2008-02-24 07:49:49 UTC (rev 3497)
@@ -0,0 +1,45 @@
+
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.test.remoting.soak.bisocket;
+
+import java.util.Map;
+
+import org.jboss.remoting.transport.bisocket.Bisocket;
+import org.jboss.test.remoting.soak.ProducerConsumerSoakTestParent;
+
+public class BisocketProducerConsumerSoakTest extends ProducerConsumerSoakTestParent
+{
+ int counter;
+
+ protected String getTransport()
+ {
+ return "bisocket";
+ }
+
+ protected void addExtraCallbackConfig(Map config)
+ {
+ config.put(Bisocket.IS_CALLBACK_SERVER, "true");
+ config.put("dummy", Integer.toString(counter++));
+ }
+}
+
17 years, 8 months