[jboss-remoting-commits] JBoss Remoting SVN: r5814 - in remoting3/trunk/jboss-remoting/src: test/java/org/jboss/remoting3/test and 1 other directory.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Tue Mar 9 14:55:39 EST 2010


Author: david.lloyd at jboss.com
Date: 2010-03-09 14:55:38 -0500 (Tue, 09 Mar 2010)
New Revision: 5814

Modified:
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
   remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
Log:
JBREM-1208 - make sure client open options are sent across

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java	2010-03-09 14:53:39 UTC (rev 5813)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java	2010-03-09 19:55:38 UTC (rev 5814)
@@ -25,7 +25,8 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.marshalling.Marshaller;
 import org.jboss.marshalling.MarshallerFactory;
 import org.jboss.marshalling.Marshalling;
 import org.jboss.marshalling.MarshallingConfiguration;
@@ -67,8 +68,6 @@
     private final IntKeyMap<OutboundStream> outboundStreams = new IntKeyMap<OutboundStream>();
     private final IntKeyMap<InboundStream> inboundStreams = new IntKeyMap<InboundStream>();
 
-    private final AtomicBoolean closed = new AtomicBoolean();
-
     RemoteConnectionHandler(final ConnectionHandlerContext connectionContext, final RemoteConnection remoteConnection, final MarshallerFactory marshallerFactory) {
         super(connectionContext.getConnectionProviderContext().getExecutor());
         this.connectionContext = connectionContext;
@@ -101,8 +100,17 @@
             buffer.put((byte) 0);
             Buffers.putModifiedUtf8(buffer, groupName);
             buffer.put((byte) 0);
-            buffer.flip();
-            remoteConnection.sendBlocking(buffer, true);
+            final ByteOutput output = Marshalling.createByteOutput(buffer);
+            final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+            try {
+                marshaller.start(output);
+                marshaller.writeObject(optionMap);
+                marshaller.finish();
+                buffer.flip();
+                remoteConnection.sendBlocking(buffer, true);
+            } finally {
+                IoUtils.safeClose(marshaller);
+            }
         } catch (IOException e) {
             result.setException(e);
         } catch (Throwable e) {

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java	2010-03-09 14:53:39 UTC (rev 5813)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java	2010-03-09 19:55:38 UTC (rev 5814)
@@ -25,10 +25,14 @@
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
+import org.jboss.marshalling.ByteInput;
+import org.jboss.marshalling.Marshalling;
 import org.jboss.marshalling.NioByteInput;
+import org.jboss.marshalling.Unmarshaller;
 import org.jboss.marshalling.util.IntKeyMap;
 import org.jboss.remoting3.ReplyException;
 import org.jboss.remoting3.ServiceNotFoundException;
+import org.jboss.remoting3.ServiceOpenException;
 import org.jboss.remoting3.ServiceURI;
 import org.jboss.remoting3.spi.LocalReplyHandler;
 import org.jboss.remoting3.spi.LocalRequestHandler;
@@ -60,11 +64,36 @@
                 final int id = buffer.getInt();
                 final String serviceType = Buffers.getModifiedUtf8Z(buffer);
                 final String groupName = Buffers.getModifiedUtf8Z(buffer);
-                final LocalRequestHandler handler;
-                handler = connectionHandler.getConnectionContext().openService(serviceType, groupName, OptionMap.EMPTY);
                 final Pool<ByteBuffer> bufferPool = connectionHandler.getBufferPool();
+                final ByteInput input = Marshalling.createByteInput(buffer);
+                final OptionMap optionMap;
                 final ByteBuffer outBuf = bufferPool.allocate();
                 try {
+                    try {
+                        final Unmarshaller unmarshaller = remoteConnectionHandler.getMarshallerFactory().createUnmarshaller(remoteConnectionHandler.getMarshallingConfiguration());
+                        try {
+                            unmarshaller.start(input);
+                            optionMap = (OptionMap) unmarshaller.readObject();
+                            unmarshaller.finish();
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (Exception e) {
+                        log.error("Failed to unmarshall service request option map: %s", e);
+                        outBuf.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
+                        outBuf.put(RemoteProtocol.SERVICE_ERROR);
+                        outBuf.putInt(id);
+                        outBuf.flip();
+                        try {
+                            connection.sendBlocking(outBuf, true);
+                        } catch (IOException e1) {
+                            // the channel has suddenly failed
+                            log.trace("Send failed: %s", e);
+                        }
+                        return;
+                    }
+                    final LocalRequestHandler handler;
+                    handler = connectionHandler.getConnectionContext().openService(serviceType, groupName, optionMap);
                     outBuf.putInt(RemoteConnectionHandler.LENGTH_PLACEHOLDER);
                     if (handler == null) {
                         // no matching service found
@@ -110,6 +139,24 @@
                 }
                 return;
             }
+            case RemoteProtocol.SERVICE_ERROR: {
+                final int id = buffer.getInt();
+                final OutboundClient client;
+                final IntKeyMap<OutboundClient> outboundClients = connectionHandler.getOutboundClients();
+                synchronized (outboundClients) {
+                    client = outboundClients.remove(id);
+                }
+                if (client == null) {
+                    log.trace("Received service-error for unknown client %d", Integer.valueOf(id));
+                    return;
+                }
+                synchronized (client) {
+                    // todo assert client state == waiting
+                    client.getResult().setException(new ServiceOpenException("Remote side failed to open service"));
+                    client.setState(OutboundClient.State.CLOSED);
+                }
+                return;
+            }
             case RemoteProtocol.SERVICE_CLIENT_OPENED: {
                 final int id = buffer.getInt();
                 final OutboundClient client;

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java	2010-03-09 14:53:39 UTC (rev 5813)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java	2010-03-09 19:55:38 UTC (rev 5814)
@@ -57,8 +57,9 @@
     static final byte SERVICE_REQUEST = 16;
     static final byte SERVICE_NOT_FOUND = 17;
     static final byte SERVICE_CLIENT_OPENED = 18;
-    static final byte CLIENT_CLOSE = 19;
-    static final byte CLIENT_ASYNC_CLOSE = 20; // close from the server side
+    static final byte SERVICE_ERROR = 19;
+    static final byte CLIENT_CLOSE = 20;
+    static final byte CLIENT_ASYNC_CLOSE = 21; // close from the server side
 
     static final byte STREAM_DATA = 32; // from source -> sink side
     static final byte STREAM_EXCEPTION = 33; // from source -> sink side

Modified: remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java	2010-03-09 14:53:39 UTC (rev 5813)
+++ remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java	2010-03-09 19:55:38 UTC (rev 5814)
@@ -23,6 +23,7 @@
 package org.jboss.remoting3.test;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
 import org.jboss.remoting3.Client;
 import org.jboss.remoting3.ClientConnector;
 import org.jboss.remoting3.ClientContext;
@@ -38,6 +39,7 @@
 import org.jboss.remoting3.ServiceNotFoundException;
 import org.jboss.xnio.IoUtils;
 import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.Options;
 import org.jboss.xnio.Xnio;
 import org.jboss.xnio.log.Logger;
 import org.testng.SkipException;
@@ -45,8 +47,7 @@
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
 
 @Test
 public abstract class InvocationTestBase {
@@ -265,6 +266,55 @@
         }
     }
 
+    public void testOptions() throws Throwable {
+        enter();
+        try {
+            final OptionMap optionMap = OptionMap.builder().set(Options.BROADCAST, true).getMap();
+            final AtomicReference<OptionMap> receivedOptions = new AtomicReference<OptionMap>();
+            final Registration registration = endpoint.serviceBuilder().setGroupName("foo").setServiceType("test1").setRequestType(InvocationTestObject.class).
+                    setReplyType(InvocationTestObject.class).setClientListener(new ClientListener<InvocationTestObject, InvocationTestObject>() {
+                public RequestListener<InvocationTestObject, InvocationTestObject> handleClientOpen(final ClientContext clientContext, final OptionMap optionMap) {
+                    receivedOptions.set(optionMap);
+                    clientContext.addCloseHandler(new CloseHandler<ClientContext>() {
+                        public void handleClose(final ClientContext closed) {
+                            log.info("Client closed");
+                        }
+                    });
+                    return new RequestListener<InvocationTestObject, InvocationTestObject>() {
+                        public void handleRequest(final RequestContext<InvocationTestObject> objectRequestContext, final InvocationTestObject request) throws RemoteExecutionException {
+                            // not invoked
+                        }
+                    };
+                }
+            }).register();
+            try {
+                final Connection connection = getConnection();
+                try {
+                    final Client<InvocationTestObject, InvocationTestObject> client = connection.openClient("test1", "*", InvocationTestObject.class, InvocationTestObject.class, getClass().getClassLoader(), optionMap).get();
+                    try {
+                        assertTrue(optionMap.contains(Options.BROADCAST), "Option disappeared from original map");
+                        assertTrue(optionMap.get(Options.BROADCAST).booleanValue(), "Option changed value from original map");
+                        final OptionMap map2 = receivedOptions.get();
+                        assertNotNull(map2, "Option map was not received");
+                        assertTrue(map2.contains(Options.BROADCAST), "Option does not appear in destination map");
+                        assertTrue(map2.get(Options.BROADCAST).booleanValue(), "Option changed value in destination map");
+                    } finally {
+                        IoUtils.safeClose(client);
+                        client.awaitClosedUninterruptibly();
+                    }
+                } finally {
+                    IoUtils.safeClose(connection);
+                    connection.awaitClosedUninterruptibly();
+                }
+            } finally {
+                IoUtils.safeClose(registration);
+                registration.awaitClosedUninterruptibly();
+            }
+        } finally {
+            exit();
+        }
+    }
+
     @AfterTest
     public void tearDown() throws IOException {
         enter();



More information about the jboss-remoting-commits mailing list