Author: david.lloyd(a)jboss.com
Date: 2008-11-04 12:50:07 -0500 (Tue, 04 Nov 2008)
New Revision: 4646
Modified:
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
Log:
Make this into a real test (even if it fails)
Modified:
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java 2008-11-04
09:15:17 UTC (rev 4645)
+++
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java 2008-11-04
17:50:07 UTC (rev 4646)
@@ -28,14 +28,36 @@
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
+import java.net.URI;
+import java.io.IOException;
import junit.framework.TestCase;
import org.jboss.remoting.core.EndpointImpl;
import org.jboss.remoting.test.support.LoggingHelper;
+import org.jboss.remoting.SimpleCloseable;
+import org.jboss.remoting.LocalServiceConfiguration;
+import org.jboss.remoting.RequestListener;
+import org.jboss.remoting.ClientContext;
+import org.jboss.remoting.ServiceContext;
+import org.jboss.remoting.RequestContext;
+import org.jboss.remoting.RemoteExecutionException;
+import org.jboss.remoting.ClientSource;
+import org.jboss.remoting.Client;
import org.jboss.xnio.BufferAllocator;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.Xnio;
import org.jboss.xnio.CloseableExecutor;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.IoHandlerFactory;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.Channels;
import org.jboss.xnio.nio.NioXnio;
+import org.jboss.river.RiverMarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
/**
*
@@ -45,28 +67,79 @@
LoggingHelper.init();
}
+ public static final Logger log = Logger.getLogger(ConnectionTestCase.class);
+
public void testConnection() throws Throwable {
final String REQUEST = "request";
final String REPLY = "reply";
final List<Throwable> problems = Collections.synchronizedList(new
LinkedList<Throwable>());
final CloseableExecutor closeableExecutor =
IoUtils.closeableExecutor(Executors.newCachedThreadPool(), 500L, TimeUnit.MILLISECONDS);
try {
- final BufferAllocator<ByteBuffer> allocator = new
BufferAllocator<ByteBuffer>() {
- public ByteBuffer allocate() {
- return ByteBuffer.allocate(1024);
- }
-
- public void free(final ByteBuffer buffer) {
- }
- };
+ final BufferAllocator<ByteBuffer> allocator =
Buffers.createHeapByteBufferAllocator(1024);
final Xnio xnio = NioXnio.create();
try {
- final EndpointImpl endpoint = new EndpointImpl();
- endpoint.setExecutor(closeableExecutor);
- endpoint.start();
+ final EndpointImpl remoteEndpoint = new EndpointImpl();
+ remoteEndpoint.setExecutor(closeableExecutor);
+ remoteEndpoint.start();
try {
+ final EndpointImpl endpoint = new EndpointImpl();
+ endpoint.setExecutor(closeableExecutor);
+ endpoint.start();
+ try {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final MultiplexConfiguration configuration = new
MultiplexConfiguration();
+ configuration.setAllocator(allocator);
+ configuration.setExecutor(closeableExecutor);
+ configuration.setLinkMetric(10);
+ configuration.setMarshallerFactory(new
RiverMarshallerFactory());
+ final MarshallingConfiguration marshallingConfiguration = new
MarshallingConfiguration();
+
configuration.setMarshallingConfiguration(marshallingConfiguration);
+ final IoHandlerFactory<AllocatedMessageChannel>
handlerFactory = MultiplexProtocol.createServer(remoteEndpoint, configuration);
+ final ChannelSource<AllocatedMessageChannel> channelSource
=
Channels.convertStreamToAllocatedMessage(xnio.createPipeServer(Channels.convertStreamToAllocatedMessage(handlerFactory,
16384, 16384)), 16384, 16384);
+ final IoFuture<SimpleCloseable> future =
MultiplexProtocol.connect(endpoint, configuration, channelSource);
+ future.get();
+ final LocalServiceConfiguration<Object, Object>
localServiceConfiguration = new LocalServiceConfiguration<Object, Object>(new
RequestListener<Object, Object>() {
+ public void handleClientOpen(final ClientContext context) {
+ log.debug("Client open");
+ }
+
+ public void handleServiceOpen(final ServiceContext context)
{
+ }
+
+ public void handleRequest(final RequestContext<Object>
context, final Object request) throws RemoteExecutionException {
+ try {
+ context.sendReply(REPLY);
+ } catch (IOException e) {
+ log.error(e, "Failed to send reply");
+ problems.add(e);
+ }
+ }
+
+ public void handleServiceClose(final ServiceContext context)
{
+ }
+
+ public void handleClientClose(final ClientContext context) {
+ log.debug("Client closed");
+ latch.countDown();
+ }
+ }, Object.class, Object.class);
+
localServiceConfiguration.setServiceType("connection.test");
+ localServiceConfiguration.setGroupName("testgroup");
+ localServiceConfiguration.setMetric(10);
+ remoteEndpoint.registerService(localServiceConfiguration);
+ final IoFuture<ClientSource<Object,Object>>
futureClientSource = endpoint.locateService(new URI("jrs:connection.test::"),
Object.class, Object.class);
+ assertEquals(IoFuture.Status.DONE, futureClientSource.await(1L,
TimeUnit.SECONDS));
+ final ClientSource<Object, Object> clientSource =
futureClientSource.get();
+ final Client<Object,Object> client =
clientSource.createClient();
+ final IoFuture<Object> futureReply = client.send(REQUEST);
+ assertEquals(IoFuture.Status.DONE, futureReply.await(1L,
TimeUnit.SECONDS));
+ assertEquals(REPLY, futureReply.get());
+ assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ } finally {
+ endpoint.stop();
+ }
} finally {
- endpoint.stop();
+ remoteEndpoint.stop();
}
} finally {
IoUtils.safeClose(xnio);
Show replies by date