[jboss-remoting-commits] JBoss Remoting SVN: r4515 - in remoting3/trunk: protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic and 1 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Wed Aug 27 11:30:55 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-08-27 11:30:54 -0400 (Wed, 27 Aug 2008)
New Revision: 4515

Removed:
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistryImpl.java
Modified:
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConnectionListener.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
   remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
   remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
Pause here for a moment before ripping out marshalling stuff...

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java	2008-08-13 06:02:45 UTC (rev 4514)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java	2008-08-27 15:30:54 UTC (rev 4515)
@@ -22,13 +22,13 @@
 
 package org.jboss.cx.remoting.protocol.basic;
 
-import org.jboss.cx.remoting.spi.AbstractHandleableCloseable;
+import org.jboss.cx.remoting.spi.AbstractSimpleCloseable;
 import java.util.concurrent.Executor;
 
 /**
  *
  */
-public abstract class AbstractConnection extends AbstractHandleableCloseable<Connection> implements Connection {
+public abstract class AbstractConnection extends AbstractSimpleCloseable {
     /**
      * Basic constructor.
      *

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-08-13 06:02:45 UTC (rev 4514)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-08-27 15:30:54 UTC (rev 4515)
@@ -52,7 +52,10 @@
 import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_FAILED;
 import static org.jboss.cx.remoting.protocol.basic.MessageType.CANCEL_ACK;
 import static org.jboss.cx.remoting.protocol.basic.MessageType.VERSION;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_ADVERTISE;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_UNADVERTISE;
 import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.Endpoint;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
@@ -71,16 +74,20 @@
 public final class BasicHandler implements IoHandler<AllocatedMessageChannel> {
 
     private static final Logger log = Logger.getLogger(BasicHandler.class);
-    private static final int LOCAL_VERSION = 1;
+    private static final int LOCAL_VERSION = 0x00000100;
 
+    // running on remote node
+    private final ConcurrentMap<Integer, ReplyHandler> outstandingRequests = concurrentMap();
+
     // clients whose requests get forwarded to the remote side
     private final ConcurrentMap<Integer, RequestHandler> remoteClients = concurrentMap();
-    // running on remote node
-    private final ConcurrentMap<Integer, ReplyHandler> outstandingRequests = concurrentMap();
     // forwarded to remote side (handled on this side)
     private final ConcurrentMap<Integer, Handle<RequestHandler>> forwardedClients = concurrentMap();
 
-    private final ServiceRegistry registry;
+    // services forwarded to us
+    private final ConcurrentMap<Integer, RequestHandlerSource> remoteServices = concurrentMap();
+    // forwarded to remote side (handled on this side)
+    private final ConcurrentMap<Integer, Handle<RequestHandlerSource>> forwardedServices = concurrentMap();
 
     private final boolean server;
     private final BufferAllocator<ByteBuffer> allocator;
@@ -94,13 +101,13 @@
     private final ClassLoader classLoader;
     private List<String> localMarshallerList = Collections.singletonList("java-serialization");
     private volatile String marshallerType;
+    private volatile int metric;
     private final AtomicBoolean initialized = new AtomicBoolean(false);
 
-    public BasicHandler(final boolean server, final BufferAllocator<ByteBuffer> allocator, final Executor executor, final MarshallerFactory<ByteBuffer> marshallerFactory, final ServiceRegistry registry) {
+    public BasicHandler(final boolean server, final BufferAllocator<ByteBuffer> allocator, final Executor executor, final MarshallerFactory<ByteBuffer> marshallerFactory) {
         this.server = server;
         this.allocator = allocator;
         this.executor = executor;
-        this.registry = registry;
         final RequestHandlerImpl endpoint = new RequestHandlerImpl(0, allocator);
         remoteClients.put(Integer.valueOf(0), endpoint);
         this.marshallerFactory = marshallerFactory;
@@ -116,11 +123,11 @@
     /**
      * Sequence number of local clients forwarded to the remote side.
      */
-    private final AtomicInteger localClientIdSeq = new AtomicInteger(1);
+    private final AtomicInteger localClientIdSeq = new AtomicInteger();
     /**
      * Sequence number of remote clients opened locally from services from the remote side.
      */
-    private final AtomicInteger remoteClientIdSeq = new AtomicInteger(1);
+    private final AtomicInteger remoteClientIdSeq = new AtomicInteger();
 
     public void handleOpened(final AllocatedMessageChannel channel) {
         if (isnew.getAndSet(false)) {
@@ -335,6 +342,26 @@
                     registry.unbind(buffer.getInt());
                     break;
                 }
+                case SERVICE_ADVERTISE: {
+                    final int serviceId = buffer.getInt();
+                    final String serviceType = readUTFZ(buffer);
+                    final String groupName = readUTFZ(buffer);
+                    final String endpointName = readUTFZ(buffer);
+                    final int baseMetric = buffer.getInt();
+                    Endpoint endpoint;
+                    int id;
+                    final RequestHandlerSource handlerSource = new RequestHandlerSourceImpl(allocator, id);
+                    final int calcMetric = baseMetric + metric;
+                    if (calcMetric > 0) {
+                        endpoint.registerRemoteService(serviceType, groupName, endpointName, handlerSource, calcMetric);
+                    }
+                    break;
+                }
+                case SERVICE_UNADVERTISE: {
+                    final int serviceId = buffer.getInt();
+                    IoUtils.safeClose(remoteServices.get(Integer.valueOf(serviceId)));
+                    break;
+                }
                 default: {
                     log.trace("Received invalid message type %d", Integer.valueOf(msgType));
                 }

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java	2008-08-13 06:02:45 UTC (rev 4514)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java	2008-08-27 15:30:54 UTC (rev 4515)
@@ -23,6 +23,7 @@
 package org.jboss.cx.remoting.protocol.basic;
 
 import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.SimpleCloseable;
 import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
 import org.jboss.cx.remoting.spi.remote.Handle;
 import org.jboss.cx.remoting.core.marshal.JavaSerializationMarshallerFactory;
@@ -55,10 +56,10 @@
      * @param allocator the buffer allocator to use
      * @return a handler factory for passing to an XNIO server
      */
-    public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Executor executor, final BufferAllocator<ByteBuffer> allocator, final ServiceRegistry serviceRegistry) {
+    public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Executor executor, final BufferAllocator<ByteBuffer> allocator) {
         return new IoHandlerFactory<AllocatedMessageChannel>() {
             public IoHandler<? super AllocatedMessageChannel> createHandler() {
-                return new BasicHandler(true, allocator, executor, new JavaSerializationMarshallerFactory(executor), serviceRegistry);
+                return new BasicHandler(true, allocator, executor, new JavaSerializationMarshallerFactory(executor));
             }
         };
     }
@@ -69,15 +70,14 @@
      * @param executor the executor to use for invocations
      * @param channelSource the XNIO channel source to use to establish the connection
      * @param allocator the buffer allocator to use
-     * @param serviceRegistry the service registry
      * @return a handle which may be used to close the connection
      * @throws IOException if an error occurs
      */
-    public static IoFuture<Connection> connect(final Executor executor, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator, final ServiceRegistry serviceRegistry) throws IOException {
-        final BasicHandler basicHandler = new BasicHandler(false, allocator, executor, new JavaSerializationMarshallerFactory(executor), serviceRegistry);
+    public static IoFuture<SimpleCloseable> connect(final Executor executor, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
+        final BasicHandler basicHandler = new BasicHandler(false, allocator, executor, new JavaSerializationMarshallerFactory(executor));
         final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(basicHandler);
-        return new AbstractConvertingIoFuture<Connection, AllocatedMessageChannel>(futureChannel) {
-            protected Connection convert(final AllocatedMessageChannel channel) throws RemotingException {
+        return new AbstractConvertingIoFuture<SimpleCloseable, AllocatedMessageChannel>(futureChannel) {
+            protected SimpleCloseable convert(final AllocatedMessageChannel channel) throws RemotingException {
                 return new AbstractConnection(executor) {
                     public Handle<RequestHandlerSource> getServiceForId(final int id) throws IOException {
                         return basicHandler.getRemoteService(id).getHandle();

Deleted: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java	2008-08-13 06:02:45 UTC (rev 4514)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java	2008-08-27 15:30:54 UTC (rev 4515)
@@ -1,36 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.cx.remoting.protocol.basic;
-
-import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
-import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.HandleableCloseable;
-import org.jboss.cx.remoting.RemotingException;
-import java.io.IOException;
-
-/**
- *
- */
-public interface Connection extends HandleableCloseable<Connection> {
-    Handle<RequestHandlerSource> getServiceForId(int id) throws IOException;
-}

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConnectionListener.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConnectionListener.java	2008-08-13 06:02:45 UTC (rev 4514)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConnectionListener.java	2008-08-27 15:30:54 UTC (rev 4515)
@@ -22,9 +22,11 @@
 
 package org.jboss.cx.remoting.protocol.basic;
 
+import org.jboss.cx.remoting.SimpleCloseable;
+
 /**
  *
  */
 public interface ConnectionListener {
-    void handleOpened(Connection connection);
+    void handleOpened(SimpleCloseable connection);
 }

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java	2008-08-13 06:02:45 UTC (rev 4514)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java	2008-08-27 15:30:54 UTC (rev 4515)
@@ -26,12 +26,19 @@
  *
  */
 public final class MessageType {
+    // Initial version & marshaller negotiation
     public static final int VERSION            = 0;
+    // One-way request, no return value may be sent
     public static final int REQUEST_ONEWAY     = 1;
+    // Two-way request, return value is expected
     public static final int REQUEST            = 2;
+    // Reply
     public static final int REPLY              = 3;
+    // Attempt to cancel a request
     public static final int CANCEL_REQUEST     = 4;
+    // Acknowledge that a request was cancelled
     public static final int CANCEL_ACK         = 5;
+    // Request failed due to exception
     public static final int REQUEST_FAILED     = 6;
     // Remote side called .close() on a forwarded RequestHandler
     public static final int CLIENT_CLOSE       = 7;

Deleted: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java	2008-08-13 06:02:45 UTC (rev 4514)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java	2008-08-27 15:30:54 UTC (rev 4515)
@@ -1,43 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.cx.remoting.protocol.basic;
-
-import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
-import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.RemotingException;
-import java.io.IOException;
-
-/**
- *
- */
-public interface ServiceRegistry {
-    int bind(RequestHandlerSource requestHandlerSource) throws IOException;
-
-    void bind(RequestHandlerSource requestHandlerSource, int id) throws IOException;
-
-    void unbind(int id) throws RemotingException;
-
-    void clear();
-
-    Handle<RequestHandlerSource> lookup(int id) throws IOException;
-}

Deleted: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistryImpl.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistryImpl.java	2008-08-13 06:02:45 UTC (rev 4514)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistryImpl.java	2008-08-27 15:30:54 UTC (rev 4515)
@@ -1,112 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.cx.remoting.protocol.basic;
-
-import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
-import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.xnio.IoUtils;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.Iterator;
-import java.io.IOException;
-
-/**
- *
- */
-public final class ServiceRegistryImpl implements ServiceRegistry {
-
-    private static final int START = 32768;
-
-    private final ConcurrentMap<Integer, Handle<RequestHandlerSource>> map = CollectionUtil.concurrentMap();
-    private final AtomicInteger dynamicSequence = new AtomicInteger(START);
-    private final ServiceRegistry parent;
-
-    public ServiceRegistryImpl(final ServiceRegistry parent) {
-        this.parent = parent;
-    }
-
-    public ServiceRegistryImpl() {
-        parent = null;
-    }
-
-    public int bind(final RequestHandlerSource requestHandlerSource) throws IOException {
-        final Handle<RequestHandlerSource> handle = requestHandlerSource.getHandle();
-        boolean ok = false;
-        try {
-            for (;;) {
-                final int seqVal = dynamicSequence.getAndIncrement();
-                if (seqVal < 0) {
-                    dynamicSequence.compareAndSet(seqVal, START);
-                    continue;
-                }
-                if (map.putIfAbsent(Integer.valueOf(seqVal), handle) != null) {
-                    ok = true;
-                    return seqVal;
-                }
-            }
-        } finally {
-            if (! ok) {
-                IoUtils.safeClose(handle);
-            }
-        }
-    }
-
-    public void bind(final RequestHandlerSource requestHandlerSource, final int id) throws IOException {
-        final Handle<RequestHandlerSource> handle = requestHandlerSource.getHandle();
-        boolean ok = false;
-        try {
-            if (map.putIfAbsent(Integer.valueOf(id), handle) != null) {
-                throw new RemotingException("Service already bound to that ID");
-            }
-            ok = true;
-        } finally {
-            if (! ok) {
-                IoUtils.safeClose(handle);
-            }
-        }
-    }
-
-    public void unbind(final int id) throws RemotingException {
-        map.remove(Integer.valueOf(id));
-    }
-
-    public void clear() {
-        Iterator<Handle<RequestHandlerSource>> it = map.values().iterator();
-        while (it.hasNext()) {
-            IoUtils.safeClose(it.next());
-            it.remove();
-        }
-    }
-
-    public Handle<RequestHandlerSource> lookup(final int id) throws IOException {
-        final Handle<RequestHandlerSource> handle = map.get(Integer.valueOf(id));
-        return handle != null || parent == null ? handle.getResource().getHandle() : parent.lookup(id);
-    }
-
-    protected void finalize() throws Throwable {
-        clear();
-        super.finalize();
-    }
-}

Modified: remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java	2008-08-13 06:02:45 UTC (rev 4514)
+++ remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java	2008-08-27 15:30:54 UTC (rev 4515)
@@ -39,6 +39,7 @@
 import org.jboss.cx.remoting.ClientSource;
 import org.jboss.cx.remoting.Client;
 import org.jboss.cx.remoting.AbstractRequestListener;
+import org.jboss.cx.remoting.SimpleCloseable;
 import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
 import org.jboss.cx.remoting.spi.remote.Handle;
 import org.jboss.xnio.BufferAllocator;
@@ -83,7 +84,6 @@
                 endpoint.setExecutor(closeableExecutor);
                 endpoint.start();
                 try {
-                    final ServiceRegistry serviceRegistry = new ServiceRegistryImpl();
                     try {
                         final Handle<RequestHandlerSource> requestHandlerSourceHandle = endpoint.createRequestHandlerSource(new AbstractRequestListener<Object, Object>() {
                             public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
@@ -96,7 +96,7 @@
                         }, null, null);
                         try {
                             serviceRegistry.bind(requestHandlerSourceHandle.getResource(), 13);
-                            final IoHandlerFactory<AllocatedMessageChannel> handlerFactory = BasicProtocol.createServer(closeableExecutor, allocator, serviceRegistry);
+                            final IoHandlerFactory<AllocatedMessageChannel> handlerFactory = BasicProtocol.createServer(closeableExecutor, allocator);
                             final IoHandlerFactory<StreamChannel> newHandlerFactory = Channels.convertStreamToAllocatedMessage(handlerFactory, 32768, 32768);
                             final Closeable tcpServerCloseable = xnio.createTcpServer(newHandlerFactory, new InetSocketAddress(12345)).create();
                             try {
@@ -104,8 +104,8 @@
                                 try {
                                     final TcpClient tcpClient = connector.createChannelSource(new InetSocketAddress("localhost", 12345));
                                     final ChannelSource<AllocatedMessageChannel> channelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
-                                    final IoFuture<Connection> futureCloseable = BasicProtocol.connect(closeableExecutor, channelSource, allocator, serviceRegistry);
-                                    final Connection connection = futureCloseable.get();
+                                    final IoFuture<SimpleCloseable> futureCloseable = BasicProtocol.connect(closeableExecutor, channelSource, allocator);
+                                    final SimpleCloseable connection = futureCloseable.get();
                                     try {
                                         final Handle<RequestHandlerSource> handleThirteen = connection.getServiceForId(13);
                                         try {

Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-08-13 06:02:45 UTC (rev 4514)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-08-27 15:30:54 UTC (rev 4515)
@@ -41,8 +41,8 @@
         }
     }
 
-    public static <I, O> ClientSource<I, O> createLocalClientSource(Endpoint endpoint, RequestListener<I, O> requestListener) throws IOException {
-        final Handle<RequestHandlerSource> handle = endpoint.createRequestHandlerSource(requestListener, INIT_ME, INIT_ME);
+    public static <I, O> ClientSource<I, O> createLocalClientSource(Endpoint endpoint, RequestListener<I, O> requestListener, final String serviceType, final String groupName) throws IOException {
+        final Handle<RequestHandlerSource> handle = endpoint.createRequestHandlerSource(requestListener, serviceType, groupName);
         try {
             return endpoint.createClientSource(handle.getResource());
         } finally {




More information about the jboss-remoting-commits mailing list