Author: david.lloyd(a)jboss.com
Date: 2010-03-09 14:55:38 -0500 (Tue, 09 Mar 2010)
New Revision: 5814
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
Log:
JBREM-1208 - make sure client open options are sent across
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java 2010-03-09
14:53:39 UTC (rev 5813)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java 2010-03-09
19:55:38 UTC (rev 5814)
@@ -25,7 +25,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
@@ -67,8 +68,6 @@
private final IntKeyMap<OutboundStream> outboundStreams = new
IntKeyMap<OutboundStream>();
private final IntKeyMap<InboundStream> inboundStreams = new
IntKeyMap<InboundStream>();
- private final AtomicBoolean closed = new AtomicBoolean();
-
RemoteConnectionHandler(final ConnectionHandlerContext connectionContext, final
RemoteConnection remoteConnection, final MarshallerFactory marshallerFactory) {
super(connectionContext.getConnectionProviderContext().getExecutor());
this.connectionContext = connectionContext;
@@ -101,8 +100,17 @@
buffer.put((byte) 0);
Buffers.putModifiedUtf8(buffer, groupName);
buffer.put((byte) 0);
- buffer.flip();
- remoteConnection.sendBlocking(buffer, true);
+ final ByteOutput output = Marshalling.createByteOutput(buffer);
+ final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ marshaller.start(output);
+ marshaller.writeObject(optionMap);
+ marshaller.finish();
+ buffer.flip();
+ remoteConnection.sendBlocking(buffer, true);
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
} catch (IOException e) {
result.setException(e);
} catch (Throwable e) {
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java 2010-03-09
14:53:39 UTC (rev 5813)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java 2010-03-09
19:55:38 UTC (rev 5814)
@@ -25,10 +25,14 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
+import org.jboss.marshalling.ByteInput;
+import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.NioByteInput;
+import org.jboss.marshalling.Unmarshaller;
import org.jboss.marshalling.util.IntKeyMap;
import org.jboss.remoting3.ReplyException;
import org.jboss.remoting3.ServiceNotFoundException;
+import org.jboss.remoting3.ServiceOpenException;
import org.jboss.remoting3.ServiceURI;
import org.jboss.remoting3.spi.LocalReplyHandler;
import org.jboss.remoting3.spi.LocalRequestHandler;
@@ -60,11 +64,36 @@
final int id = buffer.getInt();
final String serviceType = Buffers.getModifiedUtf8Z(buffer);
final String groupName = Buffers.getModifiedUtf8Z(buffer);
- final LocalRequestHandler handler;
- handler =
connectionHandler.getConnectionContext().openService(serviceType, groupName,
OptionMap.EMPTY);
final Pool<ByteBuffer> bufferPool =
connectionHandler.getBufferPool();
+ final ByteInput input = Marshalling.createByteInput(buffer);
+ final OptionMap optionMap;
final ByteBuffer outBuf = bufferPool.allocate();
try {
+ try {
+ final Unmarshaller unmarshaller =
remoteConnectionHandler.getMarshallerFactory().createUnmarshaller(remoteConnectionHandler.getMarshallingConfiguration());
+ try {
+ unmarshaller.start(input);
+ optionMap = (OptionMap) unmarshaller.readObject();
+ unmarshaller.finish();
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (Exception e) {
+ log.error("Failed to unmarshall service request option map:
%s", e);
+ outBuf.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
+ outBuf.put(RemoteProtocol.SERVICE_ERROR);
+ outBuf.putInt(id);
+ outBuf.flip();
+ try {
+ connection.sendBlocking(outBuf, true);
+ } catch (IOException e1) {
+ // the channel has suddenly failed
+ log.trace("Send failed: %s", e);
+ }
+ return;
+ }
+ final LocalRequestHandler handler;
+ handler =
connectionHandler.getConnectionContext().openService(serviceType, groupName, optionMap);
outBuf.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
if (handler == null) {
// no matching service found
@@ -110,6 +139,24 @@
}
return;
}
+ case RemoteProtocol.SERVICE_ERROR: {
+ final int id = buffer.getInt();
+ final OutboundClient client;
+ final IntKeyMap<OutboundClient> outboundClients =
connectionHandler.getOutboundClients();
+ synchronized (outboundClients) {
+ client = outboundClients.remove(id);
+ }
+ if (client == null) {
+ log.trace("Received service-error for unknown client %d",
Integer.valueOf(id));
+ return;
+ }
+ synchronized (client) {
+ // todo assert client state == waiting
+ client.getResult().setException(new ServiceOpenException("Remote
side failed to open service"));
+ client.setState(OutboundClient.State.CLOSED);
+ }
+ return;
+ }
case RemoteProtocol.SERVICE_CLIENT_OPENED: {
final int id = buffer.getInt();
final OutboundClient client;
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java 2010-03-09
14:53:39 UTC (rev 5813)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java 2010-03-09
19:55:38 UTC (rev 5814)
@@ -57,8 +57,9 @@
static final byte SERVICE_REQUEST = 16;
static final byte SERVICE_NOT_FOUND = 17;
static final byte SERVICE_CLIENT_OPENED = 18;
- static final byte CLIENT_CLOSE = 19;
- static final byte CLIENT_ASYNC_CLOSE = 20; // close from the server side
+ static final byte SERVICE_ERROR = 19;
+ static final byte CLIENT_CLOSE = 20;
+ static final byte CLIENT_ASYNC_CLOSE = 21; // close from the server side
static final byte STREAM_DATA = 32; // from source -> sink side
static final byte STREAM_EXCEPTION = 33; // from source -> sink side
Modified:
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java 2010-03-09
14:53:39 UTC (rev 5813)
+++
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java 2010-03-09
19:55:38 UTC (rev 5814)
@@ -23,6 +23,7 @@
package org.jboss.remoting3.test;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
import org.jboss.remoting3.Client;
import org.jboss.remoting3.ClientConnector;
import org.jboss.remoting3.ClientContext;
@@ -38,6 +39,7 @@
import org.jboss.remoting3.ServiceNotFoundException;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.Options;
import org.jboss.xnio.Xnio;
import org.jboss.xnio.log.Logger;
import org.testng.SkipException;
@@ -45,8 +47,7 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
@Test
public abstract class InvocationTestBase {
@@ -265,6 +266,55 @@
}
}
+ public void testOptions() throws Throwable {
+ enter();
+ try {
+ final OptionMap optionMap = OptionMap.builder().set(Options.BROADCAST,
true).getMap();
+ final AtomicReference<OptionMap> receivedOptions = new
AtomicReference<OptionMap>();
+ final Registration registration =
endpoint.serviceBuilder().setGroupName("foo").setServiceType("test1").setRequestType(InvocationTestObject.class).
+ setReplyType(InvocationTestObject.class).setClientListener(new
ClientListener<InvocationTestObject, InvocationTestObject>() {
+ public RequestListener<InvocationTestObject, InvocationTestObject>
handleClientOpen(final ClientContext clientContext, final OptionMap optionMap) {
+ receivedOptions.set(optionMap);
+ clientContext.addCloseHandler(new CloseHandler<ClientContext>()
{
+ public void handleClose(final ClientContext closed) {
+ log.info("Client closed");
+ }
+ });
+ return new RequestListener<InvocationTestObject,
InvocationTestObject>() {
+ public void handleRequest(final
RequestContext<InvocationTestObject> objectRequestContext, final
InvocationTestObject request) throws RemoteExecutionException {
+ // not invoked
+ }
+ };
+ }
+ }).register();
+ try {
+ final Connection connection = getConnection();
+ try {
+ final Client<InvocationTestObject, InvocationTestObject> client
= connection.openClient("test1", "*", InvocationTestObject.class,
InvocationTestObject.class, getClass().getClassLoader(), optionMap).get();
+ try {
+ assertTrue(optionMap.contains(Options.BROADCAST), "Option
disappeared from original map");
+ assertTrue(optionMap.get(Options.BROADCAST).booleanValue(),
"Option changed value from original map");
+ final OptionMap map2 = receivedOptions.get();
+ assertNotNull(map2, "Option map was not received");
+ assertTrue(map2.contains(Options.BROADCAST), "Option does
not appear in destination map");
+ assertTrue(map2.get(Options.BROADCAST).booleanValue(),
"Option changed value in destination map");
+ } finally {
+ IoUtils.safeClose(client);
+ client.awaitClosedUninterruptibly();
+ }
+ } finally {
+ IoUtils.safeClose(connection);
+ connection.awaitClosedUninterruptibly();
+ }
+ } finally {
+ IoUtils.safeClose(registration);
+ registration.awaitClosedUninterruptibly();
+ }
+ } finally {
+ exit();
+ }
+ }
+
@AfterTest
public void tearDown() throws IOException {
enter();