JBoss Remoting SVN: r4293 - remoting3/trunk/version/src/main/java/org/jboss/cx/remoting/version.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-06-16 09:16:21 -0400 (Mon, 16 Jun 2008)
New Revision: 4293
Modified:
remoting3/trunk/version/src/main/java/org/jboss/cx/remoting/version/Version.java
Log:
Add constructor
Modified: remoting3/trunk/version/src/main/java/org/jboss/cx/remoting/version/Version.java
===================================================================
--- remoting3/trunk/version/src/main/java/org/jboss/cx/remoting/version/Version.java 2008-06-16 13:15:23 UTC (rev 4292)
+++ remoting3/trunk/version/src/main/java/org/jboss/cx/remoting/version/Version.java 2008-06-16 13:16:21 UTC (rev 4293)
@@ -4,6 +4,10 @@
*
*/
public final class Version {
+
+ private Version() {
+ }
+
public static final String VERSION = "3.0.0-M3";
public static void main(String[] args) {
16 years, 6 months
JBoss Remoting SVN: r4292 - remoting3/trunk.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-06-16 09:15:23 -0400 (Mon, 16 Jun 2008)
New Revision: 4292
Modified:
remoting3/trunk/build.xml
Log:
Fix typo
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-06-12 16:34:08 UTC (rev 4291)
+++ remoting3/trunk/build.xml 2008-06-16 13:15:23 UTC (rev 4292)
@@ -1131,7 +1131,7 @@
<path id="version.classpath">
<pathelement location="version/target/main/classes"/>
</path>
- <java classpathref="version.classpath" classname="org.jboss.cx.remoting.version.Verssion" outputproperty="version"/>
+ <java classpathref="version.classpath" classname="org.jboss.cx.remoting.version.Version" outputproperty="version"/>
<property name="version" value="UNKNOWN"/>
</target>
16 years, 6 months
JBoss Remoting SVN: r4291 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting and 13 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-06-12 12:34:08 -0400 (Thu, 12 Jun 2008)
New Revision: 4291
Added:
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/Action.java
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/RemotingHttpSession.java
Removed:
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionContext.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSink.java
remoting3/trunk/build.xml
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServer.java
remoting3/trunk/log-jul/src/main/java/org/jboss/cx/remoting/log/Logger.java
remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ObjectMessageInput.java
Log:
Point commit...
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -32,12 +32,13 @@
*
* @param remoteUri the URI of the server to connect to
* @param attributeMap the attribute map to use to configure this session
+ * @param rootListener the root request listener for this end of the session
* @return a new session
*
* @throws RemotingException if there is a problem creating the session, or if the request or reply type does not
* match the remote service
*/
- Session openSession(URI remoteUri, AttributeMap attributeMap) throws RemotingException;
+ Session openSession(URI remoteUri, AttributeMap attributeMap, RequestListener<?, ?> rootListener) throws RemotingException;
/**
* Open an inbound session from another endpoint. Used by protocol handlers.
@@ -45,9 +46,10 @@
* You must have the TODO permission to invoke this method.
*
* @param handler the protocol handler to use
+ * @param rootListener the root request listener for this end of the session
* @return the protocol context
*/
- ProtocolContext openIncomingSession(ProtocolHandler handler) throws RemotingException;
+ ProtocolContext openSession(ProtocolHandler handler, RequestListener<?, ?> rootListener) throws RemotingException;
/**
* Get the name of this endpoint.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -9,7 +9,8 @@
/**
* A marshaller/unmarshaller for transmitting data over a wire protocol of some sort. Each marshaller instance is
- * guaranteed to be used by only one thread. Marshallers are not pooled or reused in any way.
+ * guaranteed to be used by only one thread. Marshallers are not pooled or reused in any way. Any pooling of marshallers
+ * must be done by implementations of this class and/or {@link org.jboss.cx.remoting.spi.marshal.MarshallerFactory}.
*/
public interface Marshaller extends Serializable {
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -29,12 +29,12 @@
return delegate.getAttributes();
}
- public Session openSession(final URI remoteUri, final AttributeMap attributeMap) throws RemotingException {
- return delegate.openSession(remoteUri, attributeMap);
+ public Session openSession(final URI remoteUri, final AttributeMap attributeMap, final RequestListener<?, ?> rootListener) throws RemotingException {
+ return delegate.openSession(remoteUri, attributeMap, rootListener);
}
- public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws RemotingException {
- return delegate.openIncomingSession(handler);
+ public ProtocolContext openSession(final ProtocolHandler handler, final RequestListener<?, ?> rootListener) throws RemotingException {
+ return delegate.openSession(handler, rootListener);
}
public String getName() {
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSink.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSink.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSink.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -2,11 +2,12 @@
import java.io.Closeable;
import java.io.IOException;
+import java.io.Flushable;
/**
*
*/
-public interface ObjectSink<T> extends Closeable {
+public interface ObjectSink<T> extends Flushable, Closeable {
void accept(T instance) throws IOException;
/**
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/build.xml 2008-06-12 16:34:08 UTC (rev 4291)
@@ -1131,7 +1131,7 @@
<path id="version.classpath">
<pathelement location="version/target/main/classes"/>
</path>
- <java classpathref="version.classpath" classname="org.jboss.cx.remoting.version.Version" outputproperty="version"/>
+ <java classpathref="version.classpath" classname="org.jboss.cx.remoting.version.Verssion" outputproperty="version"/>
<property name="version" value="UNKNOWN"/>
</target>
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -18,7 +18,7 @@
this.clientIdentifer = clientIdentifer;
}
- public ClientIdentifier getContextIdentifer() {
+ public ClientIdentifier getClientIdentifer() {
return clientIdentifer;
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -54,7 +54,6 @@
}
private String name;
- private RequestListener<?, ?> rootListener;
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
private final Set<SessionListener> sessionListeners = CollectionUtil.synchronizedSet(new LinkedHashSet<SessionListener>());
@@ -96,21 +95,10 @@
return name;
}
- public void setRootListener(final RequestListener<?, ?> rootListener) {
- this.rootListener = rootListener;
- }
-
- public RequestListener<?, ?> getRootListener() {
- return rootListener;
- }
-
// Lifecycle
public void create() {
// todo security check
- if (rootListener == null) {
- throw new NullPointerException("rootListener is null");
- }
}
public void start() {
@@ -132,7 +120,6 @@
}
public void destroy() {
- rootListener = null;
executor = null;
}
@@ -143,7 +130,7 @@
return endpointMap;
}
- public Session openSession(final URI uri, final AttributeMap attributeMap) throws RemotingException {
+ public Session openSession(final URI uri, final AttributeMap attributeMap, final RequestListener<?, ?> rootListener) throws RemotingException {
if (uri == null) {
throw new NullPointerException("uri is null");
}
@@ -184,11 +171,11 @@
}
}
- public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws RemotingException {
+ public ProtocolContext openSession(final ProtocolHandler handler, final RequestListener<?, ?> rootListener) throws RemotingException {
state.requireHold(State.UP);
try {
final CoreSession session = new CoreSession(CoreEndpoint.this);
- session.initializeServer(handler, createClient(rootListener));
+ session.initializeServer(handler, rootListener == null ? null : createClient(rootListener));
sessions.add(session);
return session.getProtocolContext();
} finally {
@@ -216,7 +203,7 @@
public <I, O> Client<I, O> createClient(RequestListener<I, O> requestListener) {
final CoreInboundClient<I, O> inbound = new CoreInboundClient<I, O>(requestListener, executor);
final CoreOutboundClient<I, O> outbound = new CoreOutboundClient<I, O>(executor);
- inbound.initialize(outbound.getContextClient());
+ inbound.initialize(outbound.getClientInitiator());
outbound.initialize(inbound.getClientResponder());
return outbound.getUserContext();
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -62,7 +62,9 @@
this.clientInitiator = clientInitiator;
state.releaseDowngrade();
try {
- requestListener.handleClientOpen(clientContext);
+ if (requestListener != null) {
+ requestListener.handleClientOpen(clientContext);
+ }
} finally {
state.release();
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -58,7 +58,7 @@
return userClient;
}
- ClientInitiator getContextClient() {
+ ClientInitiator getClientInitiator() {
return clientInitiator;
}
@@ -73,6 +73,13 @@
}
}
+ void invalidate() {
+ // An extra instance was mistakenly created; we'll kill this one and quietly step away
+ state.transition(State.DOWN);
+ userClient = null;
+ clientResponder = null;
+ }
+
public final class UserClient extends AbstractRealClient<I, O> {
private UserClient() {
@@ -128,9 +135,9 @@
try {
final QueueExecutor queueExecutor = new QueueExecutor();
final CoreOutboundRequest<I, O> outboundRequest = new CoreOutboundRequest<I, O>();
- final RequestResponder<I> requestTerminus = clientResponder.createNewRequest(outboundRequest.getReplier());
- outboundRequest.setRequester(requestTerminus);
- requestTerminus.handleRequest(request, queueExecutor);
+ final RequestResponder<I> requestResponder = clientResponder.createNewRequest(outboundRequest.getReplier());
+ outboundRequest.setRequestResponder(requestResponder);
+ requestResponder.handleRequest(request, queueExecutor);
final FutureReply<O> futureReply = outboundRequest.getFutureReply();
futureReply.addCompletionNotifier(new RequestCompletionHandler<O>() {
public void notifyComplete(final FutureReply<O> futureReply) {
@@ -149,7 +156,7 @@
try {
final CoreOutboundRequest<I, O> outboundRequest = new CoreOutboundRequest<I, O>();
final RequestResponder<I> requestTerminus = clientResponder.createNewRequest(outboundRequest.getReplier());
- outboundRequest.setRequester(requestTerminus);
+ outboundRequest.setRequestResponder(requestTerminus);
requestTerminus.handleRequest(request, executor);
return outboundRequest.getFutureReply();
} finally {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -41,7 +41,7 @@
return requestResponder;
}
- public void setRequester(final RequestResponder<I> requestResponder) {
+ public void setRequestResponder(final RequestResponder<I> requestResponder) {
this.requestResponder = requestResponder;
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -107,7 +107,7 @@
public Client<I, O> createContext() throws RemotingException {
final CoreOutboundClient<I, O> client = new CoreOutboundClient<I, O>(executor);
- final ClientResponder<I, O> clientResponder = serviceResponder.createNewClient(client.getContextClient());
+ final ClientResponder<I, O> clientResponder = serviceResponder.createNewClient(client.getClientInitiator());
client.initialize(clientResponder);
return client.getUserContext();
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -36,6 +36,8 @@
import org.jboss.cx.remoting.spi.stream.StreamDetector;
import org.jboss.cx.remoting.spi.stream.StreamSerializer;
import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.util.AttributeMap;
import org.jboss.cx.remoting.util.CollectionUtil;
@@ -58,6 +60,8 @@
// stream serialization detectors - immutable (for now?)
private final List<StreamDetector> streamDetectors;
+ private List<ObjectResolver> resolvers = new ArrayList<ObjectResolver>();
+ private MarshallerFactory marshallerFactory;
// Contexts and services that are available on the remote end of this session
// In these paris, the Server points to the ProtocolHandler, and the Client points to...whatever
@@ -85,6 +89,7 @@
private Client<?, ?> rootClient;
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.NEW);
+ private ObjectResolver resolver; // todo - initialize to a composite resolver
// Constructors
@@ -107,6 +112,23 @@
}
}
+ public MarshallerFactory getMarshallerFactory() {
+ return marshallerFactory;
+ }
+
+ public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
+ this.marshallerFactory = marshallerFactory;
+ }
+
+ public void addFirstResolver(ObjectResolver resolver) {
+ resolvers.add(0, resolver);
+ }
+
+ public void addLastResolver(ObjectResolver resolver) {
+ resolvers.add(resolver);
+ }
+
+
// Initializers
private <I, O> void doInitialize(final ProtocolHandler protocolHandler, final Client<I, O> rootClient) {
@@ -132,7 +154,7 @@
}
final ProtocolClientResponderImpl<I, O> contextServer = new ProtocolClientResponderImpl<I,O>(remoteIdentifier);
final CoreOutboundClient<I, O> coreOutboundClient = new CoreOutboundClient<I, O>(executor);
- clientContexts.put(remoteIdentifier, new ClientContextPair<I, O>(coreOutboundClient.getContextClient(), contextServer, remoteIdentifier));
+ clientContexts.put(remoteIdentifier, new ClientContextPair<I, O>(coreOutboundClient.getClientInitiator(), contextServer, remoteIdentifier));
coreOutboundClient.initialize(contextServer);
this.rootClient = coreOutboundClient.getUserContext();
log.trace("Initialized session with remote context %s", remoteIdentifier);
@@ -291,7 +313,7 @@
if (target == null) {
throw new NullPointerException("target is null");
}
- return new ObjectMessageOutputImpl(target, streamDetectors, endpoint.getOrderedExecutor());
+ return marshallerFactory.createRootMarshaller(resolver, getClass().getClassLoader() /* todo this is WRONG */).getMessageOutput(target);
}
public ObjectMessageOutput getMessageOutput(ByteMessageOutput target, Executor streamExecutor) throws IOException {
@@ -301,14 +323,14 @@
if (streamExecutor == null) {
throw new NullPointerException("streamExecutor is null");
}
- return new ObjectMessageOutputImpl(target, streamDetectors, streamExecutor);
+ return marshallerFactory.createRootMarshaller(resolver, getClass().getClassLoader() /* todo this is WRONG */).getMessageOutput(target);
}
public ObjectMessageInput getMessageInput(ByteMessageInput source) throws IOException {
if (source == null) {
throw new NullPointerException("source is null");
}
- return new ObjectMessageInputImpl(source);
+ return marshallerFactory.createRootMarshaller(resolver, getClass().getClassLoader() /* todo this is WRONG */).getMessageInput(source);
}
public String getLocalEndpointName() {
@@ -548,223 +570,6 @@
}
}
- // message output
-
- private final class ObjectMessageOutputImpl extends JBossObjectOutputStream implements ObjectMessageOutput {
- private final ByteMessageOutput target;
- private final List<StreamDetector> streamDetectors;
- private final List<StreamSerializer> streamSerializers = new ArrayList<StreamSerializer>();
- private final Executor streamExecutor;
-
- private ObjectMessageOutputImpl(final ByteMessageOutput target, final List<StreamDetector> streamDetectors, final Executor streamExecutor) throws IOException {
- super(new OutputStream() {
- public void write(int b) throws IOException {
- target.write(b);
- }
-
- public void write(byte b[]) throws IOException {
- target.write(b);
- }
-
- public void write(byte b[], int off, int len) throws IOException {
- target.write(b, off, len);
- }
-
- public void flush() throws IOException {
- target.flush();
- }
-
- public void close() throws IOException {
- target.close();
- }
- }, true);
- if (target == null) {
- throw new NullPointerException("target is null");
- }
- if (streamDetectors == null) {
- throw new NullPointerException("streamDetectors is null");
- }
- if (streamExecutor == null) {
- throw new NullPointerException("streamExecutor is null");
- }
- enableReplaceObject(true);
- this.target = target;
- this.streamDetectors = streamDetectors;
- this.streamExecutor = streamExecutor;
- }
-
- public void commit() throws IOException {
- close();
- target.commit();
- for (StreamSerializer serializer : streamSerializers) {
- try {
- serializer.handleOpen();
- } catch (Exception ex) {
- // todo - log
- }
- }
- streamSerializers.clear();
- }
-
- public int getBytesWritten() throws IOException {
- flush();
- return target.getBytesWritten();
- }
-
- private final <I, O> ClientMarker doContextReplace(ClientResponder<I, O> clientResponder) throws IOException {
- final ClientIdentifier clientIdentifier = protocolHandler.openClient();
- final ProtocolClientInitiatorImpl<I, O> contextClient = new ProtocolClientInitiatorImpl<I, O>(clientIdentifier);
- new ServerContextPair<I, O>(contextClient, clientResponder);
- return new ClientMarker(clientIdentifier);
- }
-
- private final <I, O> ClientSourceMarker doContextSourceReplace(ServiceResponder<I, O> serviceResponder) throws IOException {
- final ServiceIdentifier serviceIdentifier = protocolHandler.openService();
- final ProtocolServiceInitiatorImpl serviceClient = new ProtocolServiceInitiatorImpl(serviceIdentifier);
- new ServerServicePair<I, O>(serviceClient, serviceResponder);
- return new ClientSourceMarker(serviceIdentifier);
- }
-
- protected Object replaceObject(Object obj) throws IOException {
- final Object testObject = super.replaceObject(obj);
- if (testObject instanceof AbstractRealClient) {
- return doContextReplace(((AbstractRealClient<?, ?>) obj).getContextServer());
- } else if (testObject instanceof AbstractRealClientSource) {
- return doContextSourceReplace(((AbstractRealClientSource<?, ?>) obj).getServiceServer());
- }
- for (StreamDetector detector : streamDetectors) {
- final StreamSerializerFactory factory = detector.detectStream(testObject);
- if (factory != null) {
- final StreamIdentifier streamIdentifier = protocolHandler.openStream();
- if (streamIdentifier == null) {
- throw new NullPointerException("streamIdentifier is null");
- }
- final CoreStream stream = new CoreStream(CoreSession.this, streamExecutor, streamIdentifier, factory, testObject);
- if (streams.putIfAbsent(streamIdentifier, stream) != null) {
- throw new IOException("Duplicate stream identifier encountered: " + streamIdentifier);
- }
- streamSerializers.add(stream.getStreamSerializer());
- log.trace("Writing stream marker for object: %s", testObject);
- return new StreamMarker(factory.getClass(), streamIdentifier);
- }
- }
- return testObject;
- }
- }
-
- // message input
-
- private final class ObjectInputImpl extends JBossObjectInputStream {
-
- private ClassLoader classLoader;
-
- public ObjectInputImpl(final InputStream is) throws IOException {
- super(is);
- enableResolveObject(true);
- }
-
- public Object resolveObject(Object obj) throws IOException {
- final Object testObject = super.resolveObject(obj);
- if (testObject instanceof StreamMarker) {
- StreamMarker marker = (StreamMarker) testObject;
- final StreamIdentifier streamIdentifier = marker.getStreamIdentifier();
- if (streamIdentifier == null) {
- throw new NullPointerException("streamIdentifier is null");
- }
- final StreamSerializerFactory streamSerializerFactory;
- try {
- streamSerializerFactory = marker.getFactoryClass().newInstance();
- } catch (InstantiationException e) {
- throw new IOException("Failed to instantiate a stream: " + e);
- } catch (IllegalAccessException e) {
- throw new IOException("Failed to instantiate a stream: " + e);
- }
- final CoreStream stream = new CoreStream(CoreSession.this, endpoint.getOrderedExecutor(), streamIdentifier, streamSerializerFactory);
- if (streams.putIfAbsent(streamIdentifier, stream) != null) {
- throw new IOException("Duplicate stream received");
- }
- return stream.getRemoteSerializer().getRemoteInstance();
- } else {
- return testObject;
- }
- }
-
- protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
- final String name = desc.getName();
- if (classLoader != null) {
- if (primitiveTypes.containsKey(name)) {
- return primitiveTypes.get(name);
- } else {
- return Class.forName(name, false, classLoader);
- }
- } else {
- return super.resolveClass(desc);
- }
- }
-
- protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
- return super.resolveProxyClass(interfaces);
- }
-
- public Object readObject(final ClassLoader loader) throws ClassNotFoundException, IOException {
- classLoader = loader;
- try {
- return readObject();
- } finally {
- classLoader = null;
- }
- }
- }
-
- private final class ObjectMessageInputImpl extends DelegatingObjectInput implements ObjectMessageInput {
- private CoreSession.ObjectInputImpl objectInput;
-
- private ObjectMessageInputImpl(final ObjectInputImpl objectInput) throws IOException {
- super(objectInput);
- this.objectInput = objectInput;
- }
-
- private ObjectMessageInputImpl(final ByteMessageInput source) throws IOException {
- this(new ObjectInputImpl(new InputStream() {
- public int read(byte b[]) throws IOException {
- return source.read(b);
- }
-
- public int read(byte b[], int off, int len) throws IOException {
- return source.read(b, off, len);
- }
-
- public int read() throws IOException {
- return source.read();
- }
-
- public void close() throws IOException {
- source.close();
- }
-
- public int available() throws IOException {
- return source.remaining();
- }
- }));
- }
-
- public Object readObject() throws ClassNotFoundException, IOException {
- return objectInput.readObject();
- }
-
- public Object readObject(ClassLoader loader) throws ClassNotFoundException, IOException {
- return objectInput.readObject(loader);
- }
-
- public int remaining() {
- try {
- return objectInput.available();
- } catch (IOException e) {
- throw new IllegalStateException("Available failed", e);
- }
- }
- }
-
private final class WeakProtocolContextServerReference<I, O> extends WeakReference<ProtocolClientResponderImpl<I, O>> {
private final ClientContextPair<I, O> contextPair;
@@ -1034,22 +839,4 @@
}
}
}
-
- private static final Map<String, Class<?>> primitiveTypes = new HashMap<String, Class<?>>();
-
- private static <T> void add(Class<T> type) {
- primitiveTypes.put(type.getName(), type);
- }
-
- static {
- add(void.class);
- add(boolean.class);
- add(byte.class);
- add(short.class);
- add(int.class);
- add(long.class);
- add(float.class);
- add(double.class);
- add(char.class);
- }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -45,7 +45,7 @@
throw new RemotingException("No such local endpoint '" + otherEndpoint + "'");
}
final LocalProtocolHandler otherProtocolHandler = new LocalProtocolHandler(ourProtocolContext, otherEndpointName);
- final ProtocolContext otherProtocolContext = otherEndpoint.openIncomingSession(otherProtocolHandler);
+ final ProtocolContext otherProtocolContext = otherEndpoint.openSession(otherProtocolHandler, null);
final LocalProtocolHandler ourProtocolHandler = new LocalProtocolHandler(otherProtocolContext, endpointName);
otherProtocolContext.receiveRemoteSideReady(endpointName);
ourProtocolContext.receiveRemoteSideReady(otherEndpointName);
Deleted: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionContext.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionContext.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionContext.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -1,14 +0,0 @@
-package org.jboss.cx.remoting.http;
-
-/**
- *
- */
-public interface RemotingHttpSessionContext {
-
- /**
- * Get a channel context that can be used to transport HTTP messages for this session.
- *
- * @return the channel context
- */
- RemotingHttpChannelContext getChannelContext();
-}
Added: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/Action.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/Action.java (rev 0)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/Action.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -0,0 +1,8 @@
+package org.jboss.cx.remoting.http.impl;
+
+/**
+ *
+ */
+public interface Action {
+
+}
Added: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/RemotingHttpSession.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/RemotingHttpSession.java (rev 0)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/RemotingHttpSession.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -0,0 +1,31 @@
+package org.jboss.cx.remoting.http.impl;
+
+import java.util.Queue;
+import org.jboss.cx.remoting.util.CollectionUtil;
+import org.jboss.cx.remoting.util.ByteMessageInput;
+import org.jboss.cx.remoting.http.RemotingHttpChannelContext;
+import org.jboss.cx.remoting.http.HttpMessageWriter;
+
+/**
+ *
+ */
+public final class RemotingHttpSession {
+ private final Queue<Action> outboundQueue = CollectionUtil.linkedList();
+
+ private final class ChannelContext implements RemotingHttpChannelContext {
+
+ public void processInboundMessage(final ByteMessageInput input) {
+ }
+
+ public HttpMessageWriter waitForOutgoingHttpMessage(final int millis) {
+ synchronized (outboundQueue) {
+ if (outboundQueue.element() != null) {
+ while (! outboundQueue.isEmpty()) {
+ Action action = outboundQueue.remove();
+ }
+ }
+ }
+ return null;
+ }
+ }
+}
Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServer.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServer.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServer.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -15,6 +15,7 @@
import org.apache.mina.handler.multiton.SingleSessionIoHandlerFactory;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.jboss.cx.remoting.Endpoint;
+import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.jrpp.mina.FramingIoFilter;
import org.jboss.cx.remoting.spi.protocol.ProtocolContext;
import org.jboss.cx.remoting.util.AttributeMap;
@@ -31,6 +32,8 @@
private SocketAddress socketAddress;
/** Protocol support object. Set before {@code create}. */
private JrppProtocolSupport protocolSupport;
+ /** Root request listener. Set before {@code create}. */
+ private RequestListener<?, ?> rootListener;
// calculated properties
@@ -79,6 +82,14 @@
this.endpoint = endpoint;
}
+ public RequestListener<?, ?> getRootListener() {
+ return rootListener;
+ }
+
+ public void setRootListener(final RequestListener<?, ?> rootListener) {
+ this.rootListener = rootListener;
+ }
+
// Lifecycle
@SuppressWarnings ({"unchecked"})
@@ -118,7 +129,7 @@
public SingleSessionIoHandler getHandler(IoSession ioSession) throws IOException {
final JrppConnection connection = new JrppConnection(attributeMap);
connection.initializeServer(ioSession);
- final ProtocolContext protocolContext = endpoint.openIncomingSession(connection.getProtocolHandler());
+ final ProtocolContext protocolContext = endpoint.openSession(connection.getProtocolHandler(), rootListener);
connection.start(protocolContext);
return connection.getIoHandler();
}
Modified: remoting3/trunk/log-jul/src/main/java/org/jboss/cx/remoting/log/Logger.java
===================================================================
--- remoting3/trunk/log-jul/src/main/java/org/jboss/cx/remoting/log/Logger.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/log-jul/src/main/java/org/jboss/cx/remoting/log/Logger.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -8,6 +8,9 @@
*/
public final class Logger {
public static final class Level extends java.util.logging.Level {
+
+ private static final long serialVersionUID = 9150446594030531854L;
+
protected Level(final String name, final int value) {
super(name, value);
}
Modified: remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java
===================================================================
--- remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -47,7 +47,7 @@
}
public void start() throws RemotingException {
- session = endpoint.openSession(destination, attributeMap);
+ session = endpoint.openSession(destination, attributeMap, null);
}
public void stop() throws RemotingException {
Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -23,11 +23,11 @@
public static void main(String[] args) throws IOException, RemoteExecutionException, URISyntaxException {
Security.addProvider(new Provider());
final StringRot13RequestListener listener = new StringRot13RequestListener();
- final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
+ final Endpoint endpoint = Remoting.createEndpoint("simple");
try {
- final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), AttributeMap.EMPTY);
+ final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), listener, AttributeMap.EMPTY);
try {
- Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
+ Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY, null);
try {
final Client<String,String> client = session.getRootClient();
try {
Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -26,11 +26,11 @@
public static void main(String[] args) throws IOException, RemoteExecutionException, URISyntaxException {
Security.addProvider(new Provider());
final StreamingRot13RequestListener listener = new StreamingRot13RequestListener();
- final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
+ final Endpoint endpoint = Remoting.createEndpoint("simple");
try {
- final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), AttributeMap.EMPTY);
+ final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), listener, AttributeMap.EMPTY);
try {
- Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
+ Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY, listener);
try {
final Client<Reader,Reader> client = session.getRootClient();
try {
Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -17,7 +17,7 @@
public static void main(String[] args) throws IOException, RemoteExecutionException {
Security.addProvider(new Provider());
final StringRot13RequestListener listener = new StringRot13RequestListener();
- final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
+ final Endpoint endpoint = Remoting.createEndpoint("simple");
try {
final Client<String,String> client = endpoint.createClient(listener);
try {
Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -20,7 +20,7 @@
public static void main(String[] args) throws IOException, RemoteExecutionException {
Security.addProvider(new Provider());
final StreamingRot13RequestListener listener = new StreamingRot13RequestListener();
- final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
+ final Endpoint endpoint = Remoting.createEndpoint("simple");
try {
final Client<Reader,Reader> client = endpoint.createClient(listener);
try {
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -21,12 +21,11 @@
// lifecycle lock
private static final Object lifecycle = new Object();
- public static <I, O> Endpoint createEndpoint(String name, RequestListener<I, O> listener) throws IOException {
+ public static <I, O> Endpoint createEndpoint(String name) throws IOException {
synchronized (lifecycle) {
boolean ok = false;
final CoreEndpoint coreEndpoint = new CoreEndpoint();
coreEndpoint.setName(name);
- coreEndpoint.setRootListener(listener);
coreEndpoint.create();
try {
coreEndpoint.start();
@@ -81,7 +80,7 @@
}
}
- public static JrppServer addJrppServer(Endpoint endpoint, SocketAddress address, AttributeMap attributeMap) throws IOException {
+ public static JrppServer addJrppServer(Endpoint endpoint, SocketAddress address, RequestListener<?, ?> rootRequestListener, AttributeMap attributeMap) throws IOException {
synchronized (lifecycle) {
boolean ok = false;
final JrppServer jrppServer = new JrppServer();
@@ -89,6 +88,7 @@
jrppServer.setSocketAddress(address);
jrppServer.setAttributeMap(attributeMap);
jrppServer.setEndpoint(endpoint);
+ jrppServer.setRootListener(rootRequestListener);
jrppServer.create();
try {
jrppServer.start();
Modified: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ObjectMessageInput.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ObjectMessageInput.java 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ObjectMessageInput.java 2008-06-12 16:34:08 UTC (rev 4291)
@@ -16,14 +16,4 @@
* @throws IOException if an I/O error occurs
*/
Object readObject() throws ClassNotFoundException, IOException;
-
- /**
- * Read an object using the given classloader.
- *
- * @param loader the classloader to use
- * @return the object from the message
- * @throws ClassNotFoundException if the class of the object could not be resolved by the classloader
- * @throws IOException if an I/O error occurs
- */
- Object readObject(ClassLoader loader) throws ClassNotFoundException, IOException;
}
16 years, 6 months
JBoss Remoting SVN: r4290 - remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-06-11 18:09:16 -0400 (Wed, 11 Jun 2008)
New Revision: 4290
Modified:
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ConcurrentReferenceHashMap.java
Log:
Merge -r5968:5969 from https://svn.jboss.org/repos/jbosscache/experimental/jsr166/src/jsr166y/Co...
Modified: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ConcurrentReferenceHashMap.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ConcurrentReferenceHashMap.java 2008-06-11 04:28:48 UTC (rev 4289)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ConcurrentReferenceHashMap.java 2008-06-11 22:09:16 UTC (rev 4290)
@@ -31,11 +31,11 @@
* An advanced hash table supporting configurable garbage collection semantics
* of keys and values, optional referential-equality, full concurrency of
* retrievals, and adjustable expected concurrency for updates.
- *
+ *
* This table is designed around specific advanced use-cases. If there is any
* doubt whether this table is for you, you most likely should be using
* {@link java.util.concurrent.ConcurrentHashMap} instead.
- *
+ *
* This table supports strong, weak, and soft keys and values. By default keys
* are weak, and values are strong. Such a configuration offers similar behavior
* to {@link java.util.WeakHashMap}, entries of this table are periodically
@@ -48,13 +48,13 @@
* <tt>isEmpty</tt> might return a value greater than the observed number of
* entries. In order to support a high level of concurrency, stale entries are
* only reclaimed during blocking (usually mutating) operations.
- *
+ *
* Enabling soft keys allows entries in this table to remain until their space
* is absolutely needed by the garbage collector. This is unlike weak keys which
* can be reclaimed as soon as they are no longer referenced by a normal strong
* reference. The primary use case for soft keys is a cache, which ideally
* occupies memory that is not in use for as long as possible.
- *
+ *
* By default, values are held using a normal strong reference. This provides
* the commonly desired guarantee that a value will always have at least the
* same life-span as it's key. For this reason, care should be taken to ensure
@@ -62,11 +62,11 @@
* preventing reclamation. If this is unavoidable, then it is recommended to use
* the same reference type in use for the key. However, it should be noted that
* non-strong values may disappear before their corresponding key.
- *
+ *
* While this table does allow the use of both strong keys and values, it is
* recommended to use {@link java.util.concurrent.ConcurrentHashMap} for such a
* configuration, since it is optimized for that case.
- *
+ *
* Just like {@link java.util.concurrent.ConcurrentHashMap}, this class obeys
* the same functional specification as {@link java.util.Hashtable}, and
* includes versions of methods corresponding to each method of
@@ -76,7 +76,7 @@
* prevents all access. This class is fully interoperable with
* <tt>Hashtable</tt> in programs that rely on its thread safety but not on
* its synchronization details.
- *
+ *
* <p>
* Retrieval operations (including <tt>get</tt>) generally do not block, so
* may overlap with update operations (including <tt>put</tt> and
@@ -89,7 +89,7 @@
* iterator/enumeration. They do <em>not</em> throw
* {@link ConcurrentModificationException}. However, iterators are designed to
* be used by only one thread at a time.
- *
+ *
* <p>
* The allowed concurrency among update operations is guided by the optional
* <tt>concurrencyLevel</tt> constructor argument (default <tt>16</tt>),
@@ -106,19 +106,19 @@
* any other kind of hash table is a relatively slow operation, so, when
* possible, it is a good idea to provide estimates of expected table sizes in
* constructors.
- *
+ *
* <p>
* This class and its views and iterators implement all of the <em>optional</em>
* methods of the {@link Map} and {@link Iterator} interfaces.
- *
+ *
* <p>
* Like {@link Hashtable} but unlike {@link HashMap}, this class does
* <em>not</em> allow <tt>null</tt> to be used as a key or value.
- *
+ *
* <p>
* This class is a member of the <a href="{@docRoot}/../technotes/guides/collections/index.html">
* Java Collections Framework</a>.
- *
+ *
* @author Doug Lea
* @author Jason T. Greene
* @param <K> the type of keys maintained by this map
@@ -139,27 +139,27 @@
*/
public static enum ReferenceType {
/** Indicates a normal Java strong reference should be used */
- STRONG,
+ STRONG,
/** Indicates a {@link WeakReference} should be used */
WEAK,
/** Indicates a {@link SoftReference} should be used */
SOFT
};
-
-
+
+
public static enum Option {
- /** Indicates that referential-equality (== instead of .equals()) should
+ /** Indicates that referential-equality (== instead of .equals()) should
* be used when locating keys. This offers similar behavior to {@link IdentityHashMap} */
IDENTITY_COMPARISONS
};
-
+
/* ---------------- Constants -------------- */
static final ReferenceType DEFAULT_KEY_TYPE = ReferenceType.WEAK;
-
+
static final ReferenceType DEFAULT_VALUE_TYPE = ReferenceType.STRONG;
-
-
+
+
/**
* The default initial capacity for this table,
* used when not otherwise specified in a constructor.
@@ -217,7 +217,7 @@
* The segments, each of which is a specialized hash table
*/
final Segment<K,V>[] segments;
-
+
boolean identityComparisons;
transient Set<K> keySet;
@@ -252,46 +252,90 @@
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
-
+
private int hashOf(Object key) {
- return hash(identityComparisons ?
+ return hash(identityComparisons ?
System.identityHashCode(key) : key.hashCode());
}
-
+
/* ---------------- Inner Classes -------------- */
-
+
static interface KeyReference {
int keyHash();
+ Object keyRef();
}
-
+
/**
* A weak-key reference which stores the key hash needed for reclamation.
*/
static final class WeakKeyReference<K> extends WeakReference<K> implements KeyReference {
final int hash;
- WeakKeyReference(K key, int hash, ReferenceQueue<K> refQueue) {
+ WeakKeyReference(K key, int hash, ReferenceQueue<Object> refQueue) {
super(key, refQueue);
this.hash = hash;
}
public final int keyHash() {
return hash;
}
+
+ public final Object keyRef() {
+ return this;
+ }
}
-
+
/**
* A soft-key reference which stores the key hash needed for reclamation.
*/
static final class SoftKeyReference<K> extends SoftReference<K> implements KeyReference {
final int hash;
- SoftKeyReference(K key, int hash, ReferenceQueue<K> refQueue) {
+ SoftKeyReference(K key, int hash, ReferenceQueue<Object> refQueue) {
super(key, refQueue);
this.hash = hash;
}
public final int keyHash() {
return hash;
}
+
+ public final Object keyRef() {
+ return this;
+ }
}
-
+
+ static final class WeakValueReference<V> extends WeakReference<V> implements KeyReference {
+ final Object keyRef;
+ final int hash;
+ WeakValueReference(V value, Object keyRef, int hash, ReferenceQueue<Object> refQueue) {
+ super(value, refQueue);
+ this.keyRef = keyRef;
+ this.hash = hash;
+ }
+
+ public final int keyHash() {
+ return hash;
+ }
+
+ public final Object keyRef() {
+ return keyRef;
+ }
+ }
+
+ static final class SoftValueReference<V> extends SoftReference<V> implements KeyReference {
+ final Object keyRef;
+ final int hash;
+ SoftValueReference(V value, Object keyRef, int hash, ReferenceQueue<Object> refQueue) {
+ super(value, refQueue);
+ this.keyRef = keyRef;
+ this.hash = hash;
+ }
+ public final int keyHash() {
+ return hash;
+ }
+
+ public final Object keyRef() {
+ return keyRef;
+ }
+ }
+
/**
* ConcurrentReferenceHashMap list entry. Note that this is never exported
* out as a user-visible Map.Entry.
@@ -310,56 +354,57 @@
volatile Object valueRef;
final HashEntry<K,V> next;
- HashEntry(K key, int hash, HashEntry<K,V> next, V value,
- ReferenceType keyType, ReferenceType valueType,
- ReferenceQueue<K> refQueue) {
- this.keyRef = newKeyReference(key, keyType, hash, refQueue);
+ HashEntry(K key, int hash, HashEntry<K,V> next, V value,
+ ReferenceType keyType, ReferenceType valueType,
+ ReferenceQueue<Object> refQueue) {
this.hash = hash;
this.next = next;
- this.valueRef = newValueReference(value, valueType);
+ this.keyRef = newKeyReference(key, keyType, refQueue);
+ this.valueRef = newValueReference(value, valueType, refQueue);
}
-
- final Object newKeyReference(K key, ReferenceType keyType, int hash,
- ReferenceQueue<K> refQueue) {
+
+ final Object newKeyReference(K key, ReferenceType keyType,
+ ReferenceQueue<Object> refQueue) {
if (keyType == ReferenceType.WEAK)
return new WeakKeyReference<K>(key, hash, refQueue);
if (keyType == ReferenceType.SOFT)
return new SoftKeyReference<K>(key, hash, refQueue);
-
+
return key;
}
-
- final Object newValueReference(V value, ReferenceType valueType) {
+
+ final Object newValueReference(V value, ReferenceType valueType,
+ ReferenceQueue<Object> refQueue) {
if (valueType == ReferenceType.WEAK)
- return new WeakReference<V>(value);
+ return new WeakValueReference<V>(value, keyRef, hash, refQueue);
if (valueType == ReferenceType.SOFT)
- return new SoftReference<V>(value);
-
+ return new SoftValueReference<V>(value, keyRef, hash, refQueue);
+
return value;
}
-
+
@SuppressWarnings("unchecked")
final K key() {
if (keyRef instanceof Reference)
return ((Reference<K>)keyRef).get();
-
+
return (K) keyRef;
}
-
+
final V value() {
return dereferenceValue(valueRef);
}
-
+
@SuppressWarnings("unchecked")
final V dereferenceValue(Object value) {
if (value instanceof Reference)
return ((Reference<V>)value).get();
-
+
return (V) value;
}
-
- final void setValue(V value, ReferenceType valueType) {
- this.valueRef = newValueReference(value, valueType);
+
+ final void setValue(V value, ReferenceType valueType, ReferenceQueue<Object> refQueue) {
+ this.valueRef = newValueReference(value, valueType, refQueue);
}
@SuppressWarnings("unchecked")
@@ -449,18 +494,18 @@
final float loadFactor;
/**
- * The collected weak-key reference queue for this segment.
+ * The collected weak-key reference queue for this segment.
* This should be (re)initialized whenever table is assigned,
*/
- transient volatile ReferenceQueue<K> refQueue;
-
+ transient volatile ReferenceQueue<Object> refQueue;
+
final ReferenceType keyType;
-
+
final ReferenceType valueType;
-
+
final boolean identityComparisons;
-
- Segment(int initialCapacity, float lf, ReferenceType keyType,
+
+ Segment(int initialCapacity, float lf, ReferenceType keyType,
ReferenceType valueType, boolean identityComparisons) {
loadFactor = lf;
this.keyType = keyType;
@@ -473,11 +518,11 @@
static final <K,V> Segment<K,V>[] newArray(int i) {
return new Segment[i];
}
-
+
private boolean keyEq(Object src, Object dest) {
return identityComparisons ? src == dest : src.equals(dest);
}
-
+
/**
* Sets table to new HashEntry array.
* Call only while holding lock or in constructor.
@@ -485,7 +530,7 @@
void setTable(HashEntry<K,V>[] newTable) {
threshold = (int)(newTable.length * loadFactor);
table = newTable;
- refQueue = new ReferenceQueue<K>();
+ refQueue = new ReferenceQueue<Object>();
}
/**
@@ -495,7 +540,7 @@
HashEntry<K,V>[] tab = table;
return tab[hash & (tab.length - 1)];
}
-
+
HashEntry<K,V> newHashEntry(K key, int hash, HashEntry<K, V> next, V value) {
return new HashEntry<K,V>(key, hash, next, value, keyType, valueType, refQueue);
}
@@ -526,8 +571,8 @@
if (e.hash == hash && keyEq(key, e.key())) {
Object opaque = e.valueRef;
if (opaque != null)
- return e.dereferenceValue(opaque);
-
+ return e.dereferenceValue(opaque);
+
return readValueUnderLock(e); // recheck
}
e = e.next;
@@ -556,12 +601,12 @@
for (HashEntry<K,V> e = tab[i]; e != null; e = e.next) {
Object opaque = e.valueRef;
V v;
-
- if (opaque == null)
+
+ if (opaque == null)
v = readValueUnderLock(e); // recheck
- else
+ else
v = e.dereferenceValue(opaque);
-
+
if (value.equals(v))
return true;
}
@@ -581,7 +626,7 @@
boolean replaced = false;
if (e != null && oldValue.equals(e.value())) {
replaced = true;
- e.setValue(newValue, valueType);
+ e.setValue(newValue, valueType, refQueue);
}
return replaced;
} finally {
@@ -600,7 +645,7 @@
V oldValue = null;
if (e != null) {
oldValue = e.value();
- e.setValue(newValue, valueType);
+ e.setValue(newValue, valueType, refQueue);
}
return oldValue;
} finally {
@@ -617,9 +662,9 @@
if (c++ > threshold) {// ensure capacity
int reduced = rehash();
if (reduced > 0) // adjust from possible weak cleanups
- count = (c -= reduced) - 1; // write-volatile
+ count = (c -= reduced) - 1; // write-volatile
}
-
+
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
@@ -631,7 +676,7 @@
if (e != null) {
oldValue = e.value();
if (!onlyIfAbsent)
- e.setValue(value, valueType);
+ e.setValue(value, valueType, refQueue);
}
else {
oldValue = null;
@@ -718,19 +763,19 @@
/**
* Remove; match on key only if value null, else match both.
*/
- V remove(Object key, int hash, Object value, boolean weakRemove) {
+ V remove(Object key, int hash, Object value, boolean refRemove) {
lock();
try {
- if (!weakRemove)
+ if (!refRemove)
removeStale();
int c = count - 1;
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
- // a weak remove operation compares the WeakReference instance
- while (e != null && (!weakRemove || key != e.keyRef)
- && (e.hash != hash || !keyEq(key, e.key())))
+ // a ref remove operation compares the Reference instance
+ while (e != null && key != e.keyRef
+ && (refRemove || hash != e.hash || !keyEq(key, e.key())))
e = e.next;
V oldValue = null;
@@ -749,7 +794,7 @@
c--;
continue;
}
-
+
newFirst = newHashEntry(pKey, p.hash, newFirst, p.value());
}
tab[index] = newFirst;
@@ -761,14 +806,11 @@
unlock();
}
}
-
+
final void removeStale() {
- if (keyType == ReferenceType.STRONG)
- return;
-
KeyReference ref;
while ((ref = (KeyReference) refQueue.poll()) != null) {
- remove(ref, ref.keyHash(), null, true);
+ remove(ref.keyRef(), ref.keyHash(), null, true);
}
}
@@ -781,7 +823,7 @@
tab[i] = null;
++modCount;
// replace the reference queue to avoid unnecessary stale cleanups
- refQueue = new ReferenceQueue<K>();
+ refQueue = new ReferenceQueue<Object>();
count = 0; // write-volatile
} finally {
unlock();
@@ -797,7 +839,7 @@
/**
* Creates a new, empty map with the specified initial
* capacity, reference types, load factor and concurrency level.
- *
+ *
* Behavioral changing options such as {@link Option#IDENTITY_COMPARISONS}
* can also be specified.
*
@@ -817,7 +859,7 @@
* nonpositive.
*/
public ConcurrentReferenceHashMap(int initialCapacity,
- float loadFactor, int concurrencyLevel,
+ float loadFactor, int concurrencyLevel,
ReferenceType keyType, ReferenceType valueType,
EnumSet<Option> options) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
@@ -845,11 +887,11 @@
int cap = 1;
while (cap < c)
cap <<= 1;
-
+
identityComparisons = options != null && options.contains(Option.IDENTITY_COMPARISONS);
for (int i = 0; i < this.segments.length; ++i)
- this.segments[i] = new Segment<K,V>(cap, loadFactor,
+ this.segments[i] = new Segment<K,V>(cap, loadFactor,
keyType, valueType, identityComparisons);
}
@@ -871,10 +913,10 @@
*/
public ConcurrentReferenceHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
- this(initialCapacity, loadFactor, concurrencyLevel,
+ this(initialCapacity, loadFactor, concurrencyLevel,
DEFAULT_KEY_TYPE, DEFAULT_VALUE_TYPE, null);
}
-
+
/**
* Creates a new, empty map with the specified initial capacity
* and load factor and with the default reference types (weak keys,
@@ -896,7 +938,7 @@
/**
- * Creates a new, empty map with the specified initial capacity,
+ * Creates a new, empty map with the specified initial capacity,
* reference types and with default load factor (0.75) and concurrencyLevel (16).
*
* @param initialCapacity the initial capacity. The implementation
@@ -906,12 +948,12 @@
* @throws IllegalArgumentException if the initial capacity of
* elements is negative.
*/
- public ConcurrentReferenceHashMap(int initialCapacity,
+ public ConcurrentReferenceHashMap(int initialCapacity,
ReferenceType keyType, ReferenceType valueType) {
- this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
+ this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
keyType, valueType, null);
}
-
+
/**
* Creates a new, empty map with the specified initial capacity,
* and with default reference types (weak keys, strong values),
@@ -1251,15 +1293,15 @@
for (int i = 0; i < segments.length; ++i)
segments[i].clear();
}
-
+
/**
* Removes any stale entries whose keys have been finalized. Use of this
* method is normally not necessary since stale entries are automatically
* removed lazily, when blocking operations are required. However, there
* are some cases where this operation should be performed eagerly, such
- * as cleaning up old references to a ClassLoader in a multi-classloader
+ * as cleaning up old references to a ClassLoader in a multi-classloader
* environment.
- *
+ *
* Note: this method will acquire locks, one at a time, across all segments
* of this table, so if it is to be used, it should be used sparingly.
*/
@@ -1267,8 +1309,8 @@
for (int i = 0; i < segments.length; ++i)
segments[i].removeStale();
}
-
+
/**
* Returns a {@link Set} view of the keys contained in this map.
* The set is backed by the map, so changes to the map are
@@ -1393,13 +1435,13 @@
}
}
- public boolean hasNext() {
+ public boolean hasNext() {
while (nextEntry != null) {
- if (nextEntry.key() != null)
+ if (nextEntry.key() != null)
return true;
advance();
}
-
+
return false;
}
@@ -1407,12 +1449,12 @@
do {
if (nextEntry == null)
throw new NoSuchElementException();
-
+
lastReturned = nextEntry;
currentKey = lastReturned.key();
advance();
} while (currentKey == null); // Skip GC'd keys
-
+
return lastReturned;
}
@@ -1627,7 +1669,7 @@
K key = e.key();
if (key == null) // Skip GC'd keys
continue;
-
+
s.writeObject(key);
s.writeObject(e.value());
}
16 years, 6 months
JBoss Remoting SVN: r4289 - remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-06-11 00:28:48 -0400 (Wed, 11 Jun 2008)
New Revision: 4289
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/ClientSocketWrapper.java
Log:
JBREM-972, JBREM-973: In createStreams() took timeout reset out of finally clause.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/ClientSocketWrapper.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/ClientSocketWrapper.java 2008-06-11 03:15:34 UTC (rev 4288)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/ClientSocketWrapper.java 2008-06-11 04:28:48 UTC (rev 4289)
@@ -158,16 +158,10 @@
}
}
- try
- {
- out = createOutputStream(serializationType, socket, marshaller);
- in = createInputStream(serializationType, socket, unmarshaller);
- }
- finally
- {
- setTimeout(savedTimeout);
- log.debug("reset timeout: " + savedTimeout);
- }
+ out = createOutputStream(serializationType, socket, marshaller);
+ in = createInputStream(serializationType, socket, unmarshaller);
+ setTimeout(savedTimeout);
+ log.debug("reset timeout: " + savedTimeout);
}
protected InputStream createInputStream(String serializationType, Socket socket, UnMarshaller unmarshaller)
16 years, 6 months
JBoss Remoting SVN: r4288 - remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/socket/interrupt.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-06-10 23:15:34 -0400 (Tue, 10 Jun 2008)
New Revision: 4288
Modified:
remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/socket/interrupt/InterruptedExceptionTestCase.java
remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/socket/interrupt/MockInvokerInterruptTestCase.java
Log:
JBREM-954: Test for wrapping InterruptedException in a RuntimeException configuration.
Modified: remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/socket/interrupt/InterruptedExceptionTestCase.java
===================================================================
--- remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/socket/interrupt/InterruptedExceptionTestCase.java 2008-06-11 03:14:38 UTC (rev 4287)
+++ remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/socket/interrupt/InterruptedExceptionTestCase.java 2008-06-11 03:15:34 UTC (rev 4288)
@@ -90,7 +90,7 @@
}
- public void testInterruptedException() throws Throwable
+ public void testNotWrappedInterruptedExceptionDefault() throws Throwable
{
log.info("entering " + getName());
@@ -139,9 +139,133 @@
t2.wait(10000);
}
- // Verify exception is an InterruptedException wrapped in a RuntimeExceptio.
+ // Verify exception is an CannotConnectException (not wrapped in a RuntimeException).
Throwable t = t2.throwable;
log.info("throwable: " + t);
+ assertTrue(t instanceof CannotConnectException);
+ assertTrue(t.getCause() instanceof InterruptedException);
+
+ client.disconnect();
+ shutdownServer();
+ log.info(getName() + " PASSES");
+ }
+
+
+ public void testNotWrappedInterruptedExceptionConfigured() throws Throwable
+ {
+ log.info("entering " + getName());
+
+ // Start server.
+ setupServer();
+
+ // Create client.
+ InvokerLocator clientLocator = new InvokerLocator(locatorURI);
+ HashMap clientConfig = new HashMap();
+ clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
+ clientConfig.put(MicroSocketClientInvoker.MAX_POOL_SIZE_FLAG, "1");
+ clientConfig.put("numberOfCallRetries", "1");
+ clientConfig.put(MicroSocketClientInvoker.WRAP_INTERRUPTED_EXCEPTION, "false");
+ addExtraClientConfig(clientConfig);
+ Client client = new Client(clientLocator, clientConfig);
+ client.connect();
+ log.info("client is connected");
+
+ // Test connections.
+ assertEquals(FAST, client.invoke(FAST));
+ log.info("connection is good");
+
+ InvokerThread t1 = new InvokerThread(client, "abc");
+ InvokerThread t2 = new InvokerThread(client, "xyz");
+
+ // Start first invocation.
+ t1.start();
+ log.info("started first invocation");
+
+ // Give first invocation time to start.
+ Thread.sleep(5000);
+
+ // Start second invocation.
+ t2.start();
+ log.info("started second invocation");
+
+ // Give second invocation time to start.
+ Thread.sleep(5000);
+
+ // Interrupt second invocation as it waits for a semaphore.
+ t2.interrupt();
+ log.info("interrupted second invocation");
+
+ // Wait until second invocation throws an exception.
+ synchronized (t2)
+ {
+ t2.wait(10000);
+ }
+
+ // Verify exception is an CannotConnectException (not wrapped in a RuntimeException).
+ Throwable t = t2.throwable;
+ log.info("throwable: " + t);
+ assertTrue(t instanceof CannotConnectException);
+ assertTrue(t.getCause() instanceof InterruptedException);
+
+ client.disconnect();
+ shutdownServer();
+ log.info(getName() + " PASSES");
+ }
+
+
+ public void testWrappedInterruptedException() throws Throwable
+ {
+ log.info("entering " + getName());
+
+ // Start server.
+ setupServer();
+
+ // Create client.
+ InvokerLocator clientLocator = new InvokerLocator(locatorURI);
+ HashMap clientConfig = new HashMap();
+ clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
+ clientConfig.put(MicroSocketClientInvoker.MAX_POOL_SIZE_FLAG, "1");
+ clientConfig.put("numberOfCallRetries", "1");
+ clientConfig.put(MicroSocketClientInvoker.WRAP_INTERRUPTED_EXCEPTION, "true");
+ addExtraClientConfig(clientConfig);
+ Client client = new Client(clientLocator, clientConfig);
+ client.connect();
+ log.info("client is connected");
+
+ // Test connections.
+ assertEquals(FAST, client.invoke(FAST));
+ log.info("connection is good");
+
+ InvokerThread t1 = new InvokerThread(client, "abc");
+ InvokerThread t2 = new InvokerThread(client, "xyz");
+
+ // Start first invocation.
+ t1.start();
+ log.info("started first invocation");
+
+ // Give first invocation time to start.
+ Thread.sleep(5000);
+
+ // Start second invocation.
+ t2.start();
+ log.info("started second invocation");
+
+ // Give second invocation time to start.
+ Thread.sleep(5000);
+
+ // Interrupt second invocation as it waits for a semaphore.
+ t2.interrupt();
+ log.info("interrupted second invocation");
+
+ // Wait until second invocation throws an exception.
+ synchronized (t2)
+ {
+ t2.wait(10000);
+ }
+
+ // Verify exception is an InterruptedException wrapped in a RuntimeException.
+ Throwable t = t2.throwable;
+ log.info("throwable: " + t);
assertTrue(t instanceof RuntimeException);
assertFalse(t instanceof CannotConnectException);
assertTrue(t.getCause() instanceof InterruptedException);
Modified: remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/socket/interrupt/MockInvokerInterruptTestCase.java
===================================================================
--- remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/socket/interrupt/MockInvokerInterruptTestCase.java 2008-06-11 03:14:38 UTC (rev 4287)
+++ remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/socket/interrupt/MockInvokerInterruptTestCase.java 2008-06-11 03:15:34 UTC (rev 4288)
@@ -21,6 +21,9 @@
*/
package org.jboss.test.remoting.transport.socket.interrupt;
+import java.util.HashMap;
+import java.util.Map;
+
import junit.framework.TestCase;
import org.apache.log4j.Logger;
@@ -43,9 +46,11 @@
{
private static final Logger log = Logger.getLogger(MockInvokerInterruptTestCase.class);
- public void test000() throws Throwable
+ public void testWrappedException() throws Throwable
{
- InvokerLocator il = new InvokerLocator("unittest", "127.0.0.1", 9999, "mock", null);
+ Map parameters = new HashMap();
+ parameters.put(MicroSocketClientInvoker.WRAP_INTERRUPTED_EXCEPTION, "true");
+ InvokerLocator il = new InvokerLocator("unittest", "127.0.0.1", 9999, "mock", parameters);
CountDown startGate = new CountDown(1);
MockMicroSocketClientInvoker ci = new MockMicroSocketClientInvoker(il, startGate);
InvocationRequest ir = new InvocationRequest("", "", null, null, null, il);
@@ -70,7 +75,63 @@
assertTrue(re.getCause() instanceof InterruptedException);
}
}
+
+ public void testNotWrappedExceptionDefault() throws Throwable
+ {
+ InvokerLocator il = new InvokerLocator("unittest", "127.0.0.1", 9999, "mock", null);
+ CountDown startGate = new CountDown(1);
+ MockMicroSocketClientInvoker ci = new MockMicroSocketClientInvoker(il, startGate);
+ InvocationRequest ir = new InvocationRequest("", "", null, null, null, il);
+
+ Runnable interrupterRunnable = new ThreadInterrupter(Thread.currentThread(), startGate);
+ Thread interrupter = new Thread(interrupterRunnable);
+ interrupter.start();
+
+ ci.setMaxPoolSize(0);
+ ci.connect();
+ try
+ {
+ ci.invoke(ir);
+ }
+ catch(CannotConnectException cce)
+ {
+ log.info("got expected CannotConnectException");
+ }
+ catch (RuntimeException re)
+ {
+ fail("expected CannotConnectException");
+ }
+ }
+ public void testNotWrappedExceptionConfigured() throws Throwable
+ {
+ Map parameters = new HashMap();
+ parameters.put(MicroSocketClientInvoker.WRAP_INTERRUPTED_EXCEPTION, "false");
+ InvokerLocator il = new InvokerLocator("unittest", "127.0.0.1", 9999, "mock", parameters);
+ CountDown startGate = new CountDown(1);
+ MockMicroSocketClientInvoker ci = new MockMicroSocketClientInvoker(il, startGate);
+ InvocationRequest ir = new InvocationRequest("", "", null, null, null, il);
+
+ Runnable interrupterRunnable = new ThreadInterrupter(Thread.currentThread(), startGate);
+ Thread interrupter = new Thread(interrupterRunnable);
+ interrupter.start();
+
+ ci.setMaxPoolSize(0);
+ ci.connect();
+ try
+ {
+ ci.invoke(ir);
+ }
+ catch(CannotConnectException cce)
+ {
+ log.info("got expected CannotConnectException");
+ }
+ catch (RuntimeException re)
+ {
+ fail("expected CannotConnectException");
+ }
+ }
+
class MockMicroSocketClientInvoker extends MicroSocketClientInvoker
{
private CountDown startGate;
16 years, 6 months
JBoss Remoting SVN: r4287 - remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-06-10 23:14:38 -0400 (Tue, 10 Jun 2008)
New Revision: 4287
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/SocketClientInvoker.java
Log:
JBREM-954: Made wrapping InterruptedException in a RuntimeException configurable.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java 2008-06-11 01:17:12 UTC (rev 4286)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java 2008-06-11 03:14:38 UTC (rev 4287)
@@ -70,6 +70,12 @@
* used, which may also be a requirement.
*/
public static final String CLIENT_SOCKET_CLASS_FLAG = "clientSocketClass";
+
+ /**
+ * Configuration key for determining if an InterruptedException should be rethrown
+ * or wrapped in a RuntimeException.
+ */
+ public static final String WRAP_INTERRUPTED_EXCEPTION = "wrapInterruptedException";
/**
* Default value for enable TCP nodelay. Value is false.
@@ -215,6 +221,8 @@
//public long usedPooled;
public Object usedPoolLock;
+
+ protected boolean wrapInterruptedException = false;
// Constructors ---------------------------------------------------------------------------------
@@ -349,6 +357,16 @@
return numberOfRetries;
}
+ public boolean isWrapInterruptedException()
+ {
+ return wrapInterruptedException;
+ }
+
+ public void setWrapInterruptedException(boolean wrapInterruptedException)
+ {
+ this.wrapInterruptedException = wrapInterruptedException;
+ }
+
/**
* The name of of the server.
*/
@@ -550,18 +568,14 @@
{
socketWrapper = getConnection(marshaller, unmarshaller, timeLeft);
}
- catch (InterruptedException e)
- {
- semaphore.release();
- if (trace) log.trace(this + " released semaphore: " + semaphore.permits(), e);
- throw new RuntimeException(e);
- }
catch (Exception e)
{
// if (bailOut)
// return null;
semaphore.release();
if (trace) log.trace(this + " released semaphore: " + semaphore.permits());
+ if (e instanceof InterruptedException && isWrapInterruptedException())
+ throw new RuntimeException(e);
throw new CannotConnectException(
"Can not get connection to server. Problem establishing " +
"socket connection for " + locator, e);
@@ -724,7 +738,7 @@
throw (ClassNotFoundException)ex;
}
- if (ex instanceof InterruptedException)
+ if (ex instanceof InterruptedException && isWrapInterruptedException())
{
log.debug(this, ex);
throw new RuntimeException(ex);
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/SocketClientInvoker.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/SocketClientInvoker.java 2008-06-11 01:17:12 UTC (rev 4286)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/SocketClientInvoker.java 2008-06-11 03:14:38 UTC (rev 4287)
@@ -118,7 +118,7 @@
throw new MarshalException("Socket timed out. Waited " + socketWrapper.getTimeout() + " milliseconds for response while calling on " +
getLocator(), ex);
}
- else if (ex instanceof InterruptedException)
+ else if (ex instanceof InterruptedException && isWrapInterruptedException())
{
log.debug(this, ex);
throw new RuntimeException(ex);
16 years, 6 months
JBoss Remoting SVN: r4286 - remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/bisocket.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-06-10 21:17:12 -0400 (Tue, 10 Jun 2008)
New Revision: 4286
Modified:
remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/bisocket/CloseControlSocketTestCase.java
Log:
JBREM-995: Added 2000 ms wait in testControlSocketClosedWhenReplaced().
Modified: remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/bisocket/CloseControlSocketTestCase.java
===================================================================
--- remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/bisocket/CloseControlSocketTestCase.java 2008-06-11 01:16:26 UTC (rev 4285)
+++ remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/bisocket/CloseControlSocketTestCase.java 2008-06-11 01:17:12 UTC (rev 4286)
@@ -271,6 +271,7 @@
callbackInvoker.createControlConnection(listenerId, false);
// Verify old control socket is closed and has been replaced.
+ Thread.sleep(2000);
assertTrue(oldControlSocket.isClosed());
field = BisocketClientInvoker.class.getDeclaredField("controlSocket");
field.setAccessible(true);
16 years, 6 months
JBoss Remoting SVN: r4285 - remoting2/branches/2.2/src/tests/org/jboss/test/remoting/callback/asynch.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-06-10 21:16:26 -0400 (Tue, 10 Jun 2008)
New Revision: 4285
Modified:
remoting2/branches/2.2/src/tests/org/jboss/test/remoting/callback/asynch/AsynchCallbackTestClientRoot.java
Log:
JBREM-995: Extended wait times.
Modified: remoting2/branches/2.2/src/tests/org/jboss/test/remoting/callback/asynch/AsynchCallbackTestClientRoot.java
===================================================================
--- remoting2/branches/2.2/src/tests/org/jboss/test/remoting/callback/asynch/AsynchCallbackTestClientRoot.java 2008-06-10 05:48:55 UTC (rev 4284)
+++ remoting2/branches/2.2/src/tests/org/jboss/test/remoting/callback/asynch/AsynchCallbackTestClientRoot.java 2008-06-11 01:16:26 UTC (rev 4285)
@@ -51,6 +51,7 @@
public void testSynchronousCallback() throws Throwable
{
+ log.info("entering " + getName());
String transport = getTransport();
String host = InetAddress.getLocalHost().getHostName();
int port = AsynchCallbackTestServerRoot.port;
@@ -71,7 +72,7 @@
Boolean done = (Boolean) client.invoke(AsynchCallbackTestServerRoot.GET_STATUS);
log.info("done 1: " + done);
assertFalse(done.booleanValue());
- Thread.sleep(2000);
+ Thread.sleep(8000);
done = (Boolean) client.invoke(AsynchCallbackTestServerRoot.GET_STATUS);
log.info("done 2: " + done);
assertTrue(done.booleanValue());
@@ -81,11 +82,13 @@
log.info("disconnecting");
client.disconnect();
log.info("disconnected");
+ log.info(getName() + " PASSES");
}
public void testASynchronousCallbackClientSide() throws Throwable
{
+ log.info("entering " + getName());
String transport = getTransport();
String host = InetAddress.getLocalHost().getHostName();
int port = AsynchCallbackTestServerRoot.port;
@@ -101,7 +104,7 @@
client.addListener(callbackHandler, null, null, true);
log.info("client added callback handler");
client.invokeOneway(AsynchCallbackTestServerRoot.ASYNCHRONOUS_CLIENT_SIDE_TEST);
- Thread.sleep(500);
+ Thread.sleep(4000);
// Should have returned.
Boolean done = (Boolean) client.invoke(AsynchCallbackTestServerRoot.GET_STATUS);
assertTrue(done.booleanValue());
@@ -109,11 +112,13 @@
client.invoke(AsynchCallbackTestServerRoot.RESET);
client.removeListener(callbackHandler);
client.disconnect();
+ log.info(getName() + " PASSES");
}
public void testASynchronousCallbackServerSide() throws Throwable
{
+ log.info("entering " + getName());
String transport = getTransport();
String host = InetAddress.getLocalHost().getHostName();
int port = AsynchCallbackTestServerRoot.port;
@@ -129,11 +134,17 @@
client.addListener(callbackHandler, null, null, true);
log.info("client added callback handler");
client.invokeOneway(AsynchCallbackTestServerRoot.ASYNCHRONOUS_SERVER_SIDE_TEST);
- Thread.sleep(500);
+ Thread.sleep(4000);
+
// Should have returned.
Boolean done = (Boolean) client.invoke(AsynchCallbackTestServerRoot.GET_STATUS);
assertTrue(done.booleanValue());
assertTrue(callbackHandler.receivedCallback);
+ Thread.sleep(5000);
+
+ // Callback should be handled.
+ assertTrue(callbackHandler.done);
+
String threadCount = (String) client.invoke(AsynchCallbackTestServerRoot.GET_THREAD_COUNT);
assertEquals(AsynchCallbackTestServerRoot.THREAD_COUNT, threadCount);
String queueSize = (String) client.invoke(AsynchCallbackTestServerRoot.GET_QUEUE_SIZE);
@@ -141,6 +152,7 @@
client.invoke(AsynchCallbackTestServerRoot.RESET);
client.removeListener(callbackHandler);
client.disconnect();
+ log.info(getName() + " PASSES");
}
@@ -154,6 +166,7 @@
static class SampleCallbackHandler implements InvokerCallbackHandler
{
boolean receivedCallback;
+ boolean done;
public void handleCallback(Callback callback) throws HandleCallbackException
{
@@ -161,11 +174,12 @@
receivedCallback = true;
try
{
- Thread.sleep(2000);
+ Thread.sleep(5000);
}
catch (InterruptedException e)
{
}
+ done = true;
}
}
}
16 years, 6 months
JBoss Remoting SVN: r4284 - remoting2/branches/2.2/src/main/org/jboss/remoting/transport/servlet/web.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-06-10 01:48:55 -0400 (Tue, 10 Jun 2008)
New Revision: 4284
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/servlet/web/ServerInvokerServlet.java
Log:
JBREM-981: getInvokerFromInvokerUrl() calls InvokereLocator.validateLocator().
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/transport/servlet/web/ServerInvokerServlet.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/transport/servlet/web/ServerInvokerServlet.java 2008-06-10 05:47:14 UTC (rev 4283)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/transport/servlet/web/ServerInvokerServlet.java 2008-06-10 05:48:55 UTC (rev 4284)
@@ -42,6 +42,7 @@
import javax.servlet.http.HttpServletResponse;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.net.MalformedURLException;
import java.util.Iterator;
/**
@@ -181,6 +182,16 @@
{
return null;
}
+ try
+ {
+ InvokerLocator validatedLocator = new InvokerLocator(locatorUrl);
+ locatorUrl = InvokerLocator.validateLocator(validatedLocator).getLocatorURI();
+ }
+ catch (MalformedURLException e)
+ {
+ log.warn("malformed URL: " + locatorUrl);
+ return null;
+ }
ServerInvoker[] serverInvokers = InvokerRegistry.getServerInvokers();
if (serverInvokers != null && serverInvokers.length > 0)
@@ -189,7 +200,7 @@
{
ServerInvoker svrInvoker = serverInvokers[x];
InvokerLocator locator = svrInvoker.getLocator();
- if (locatorUrl.equalsIgnoreCase(locator.getOriginalURI()))
+ if (locatorUrl.equalsIgnoreCase(locator.getLocatorURI()))
{
return (ServletServerInvokerMBean) svrInvoker;
}
16 years, 6 months