[jboss-remoting-commits] JBoss Remoting SVN: r4646 - remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Tue Nov 4 12:50:08 EST 2008


Author: david.lloyd at jboss.com
Date: 2008-11-04 12:50:07 -0500 (Tue, 04 Nov 2008)
New Revision: 4646

Modified:
   remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
Log:
Make this into a real test (even if it fails)

Modified: remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java	2008-11-04 09:15:17 UTC (rev 4645)
+++ remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java	2008-11-04 17:50:07 UTC (rev 4646)
@@ -28,14 +28,36 @@
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
+import java.net.URI;
+import java.io.IOException;
 import junit.framework.TestCase;
 import org.jboss.remoting.core.EndpointImpl;
 import org.jboss.remoting.test.support.LoggingHelper;
+import org.jboss.remoting.SimpleCloseable;
+import org.jboss.remoting.LocalServiceConfiguration;
+import org.jboss.remoting.RequestListener;
+import org.jboss.remoting.ClientContext;
+import org.jboss.remoting.ServiceContext;
+import org.jboss.remoting.RequestContext;
+import org.jboss.remoting.RemoteExecutionException;
+import org.jboss.remoting.ClientSource;
+import org.jboss.remoting.Client;
 import org.jboss.xnio.BufferAllocator;
 import org.jboss.xnio.IoUtils;
 import org.jboss.xnio.Xnio;
 import org.jboss.xnio.CloseableExecutor;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.IoHandlerFactory;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.Channels;
 import org.jboss.xnio.nio.NioXnio;
+import org.jboss.river.RiverMarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
 
 /**
  *
@@ -45,28 +67,79 @@
         LoggingHelper.init();
     }
 
+    public static final Logger log = Logger.getLogger(ConnectionTestCase.class);
+
     public void testConnection() throws Throwable {
         final String REQUEST = "request";
         final String REPLY = "reply";
         final List<Throwable> problems = Collections.synchronizedList(new LinkedList<Throwable>());
         final CloseableExecutor closeableExecutor = IoUtils.closeableExecutor(Executors.newCachedThreadPool(), 500L, TimeUnit.MILLISECONDS);
         try {
-            final BufferAllocator<ByteBuffer> allocator = new BufferAllocator<ByteBuffer>() {
-                public ByteBuffer allocate() {
-                    return ByteBuffer.allocate(1024);
-                }
-
-                public void free(final ByteBuffer buffer) {
-                }
-            };
+            final BufferAllocator<ByteBuffer> allocator = Buffers.createHeapByteBufferAllocator(1024);
             final Xnio xnio = NioXnio.create();
             try {
-                final EndpointImpl endpoint = new EndpointImpl();
-                endpoint.setExecutor(closeableExecutor);
-                endpoint.start();
+                final EndpointImpl remoteEndpoint = new EndpointImpl();
+                remoteEndpoint.setExecutor(closeableExecutor);
+                remoteEndpoint.start();
                 try {
+                    final EndpointImpl endpoint = new EndpointImpl();
+                    endpoint.setExecutor(closeableExecutor);
+                    endpoint.start();
+                    try {
+                        final CountDownLatch latch = new CountDownLatch(1);
+                        final MultiplexConfiguration configuration = new MultiplexConfiguration();
+                        configuration.setAllocator(allocator);
+                        configuration.setExecutor(closeableExecutor);
+                        configuration.setLinkMetric(10);
+                        configuration.setMarshallerFactory(new RiverMarshallerFactory());
+                        final MarshallingConfiguration marshallingConfiguration = new MarshallingConfiguration();
+                        configuration.setMarshallingConfiguration(marshallingConfiguration);
+                        final IoHandlerFactory<AllocatedMessageChannel> handlerFactory = MultiplexProtocol.createServer(remoteEndpoint, configuration);
+                        final ChannelSource<AllocatedMessageChannel> channelSource = Channels.convertStreamToAllocatedMessage(xnio.createPipeServer(Channels.convertStreamToAllocatedMessage(handlerFactory, 16384, 16384)), 16384, 16384);
+                        final IoFuture<SimpleCloseable> future = MultiplexProtocol.connect(endpoint, configuration, channelSource);
+                        future.get();
+                        final LocalServiceConfiguration<Object, Object> localServiceConfiguration = new LocalServiceConfiguration<Object, Object>(new RequestListener<Object, Object>() {
+                            public void handleClientOpen(final ClientContext context) {
+                                log.debug("Client open");
+                            }
+
+                            public void handleServiceOpen(final ServiceContext context) {
+                            }
+
+                            public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
+                                try {
+                                    context.sendReply(REPLY);
+                                } catch (IOException e) {
+                                    log.error(e, "Failed to send reply");
+                                    problems.add(e);
+                                }
+                            }
+
+                            public void handleServiceClose(final ServiceContext context) {
+                            }
+
+                            public void handleClientClose(final ClientContext context) {
+                                log.debug("Client closed");
+                                latch.countDown();
+                            }
+                        }, Object.class, Object.class);
+                        localServiceConfiguration.setServiceType("connection.test");
+                        localServiceConfiguration.setGroupName("testgroup");
+                        localServiceConfiguration.setMetric(10);
+                        remoteEndpoint.registerService(localServiceConfiguration);
+                        final IoFuture<ClientSource<Object,Object>> futureClientSource = endpoint.locateService(new URI("jrs:connection.test::"), Object.class, Object.class);
+                        assertEquals(IoFuture.Status.DONE, futureClientSource.await(1L, TimeUnit.SECONDS));
+                        final ClientSource<Object, Object> clientSource = futureClientSource.get();
+                        final Client<Object,Object> client = clientSource.createClient();
+                        final IoFuture<Object> futureReply = client.send(REQUEST);
+                        assertEquals(IoFuture.Status.DONE, futureReply.await(1L, TimeUnit.SECONDS));
+                        assertEquals(REPLY, futureReply.get());
+                        assertTrue(latch.await(1L, TimeUnit.SECONDS));
+                    } finally {
+                        endpoint.stop();
+                    }
                 } finally {
-                    endpoint.stop();
+                    remoteEndpoint.stop();
                 }
             } finally {
                 IoUtils.safeClose(xnio);




More information about the jboss-remoting-commits mailing list