Author: david.lloyd(a)jboss.com
Date: 2009-01-29 15:33:56 -0500 (Thu, 29 Jan 2009)
New Revision: 4844
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicConfiguration.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicHandlerReplyConsumer.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicProtocol.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicRequestHandler.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicServerReplyTransmitter.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicServerRequestConsumer.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/FutureBasicReply.java
remoting3/trunk/samples/src/test/
remoting3/trunk/samples/src/test/java/
remoting3/trunk/samples/src/test/java/org/
remoting3/trunk/samples/src/test/java/org/jboss/
remoting3/trunk/samples/src/test/java/org/jboss/remoting/
remoting3/trunk/samples/src/test/java/org/jboss/remoting/samples/
remoting3/trunk/samples/src/test/java/org/jboss/remoting/samples/protocol/
remoting3/trunk/samples/src/test/java/org/jboss/remoting/samples/protocol/basic/
remoting3/trunk/samples/src/test/java/org/jboss/remoting/samples/protocol/basic/BasicTestCase.java
Removed:
remoting3/trunk/protocol/basic/
Modified:
remoting3/trunk/build.xml
Log:
Move basic protocol to samples
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2009-01-29 19:19:16 UTC (rev 4843)
+++ remoting3/trunk/build.xml 2009-01-29 20:33:56 UTC (rev 4844)
@@ -437,126 +437,6 @@
</path>
</target>
- <!-- protocol.basic module -->
-
- <target name="protocol.basic.compile.depcheck">
- <mkdir dir="protocol/basic/target/main"/>
- <uptodate property="protocol/basic.compile.uptodate"
targetfile="protocol/basic/target/main/.lastcompile">
- <srcfiles dir="protocol/basic/src/main/java">
- <include name="**/"/>
- <include name="**/*.java"/>
- <exclude name="**/.*"/>
- </srcfiles>
- </uptodate>
- </target>
-
- <target name="protocol.basic.compile"
depends="protocol.basic.compile.depcheck"
unless="protocol.basic.compile.uptodate">
- <mkdir dir="protocol/basic/target/main/classes"/>
- <javac
- source="${javac.source}"
- target="${javac.target}"
- srcdir="protocol/basic/src/main/java"
- destdir="protocol/basic/target/main/classes"
- debug="true">
- <compilerarg value="-Xlint:unchecked"/>
- <classpath>
- <path refid="api.classpath"/>
- <path refid="core.classpath"/>
- </classpath>
- </javac>
- <touch file="protocol/basic/target/main/.lastcompile"
verbose="false"/>
- </target>
-
- <target name="protocol.basic.test.compile.depcheck">
- <mkdir dir="protocol/basic/target/test"/>
- <uptodate property="protocol.basic.compile.uptodate"
targetfile="protocol/basic/target/test/.lastcompile">
- <srcfiles dir="protocol/basic/src/test/java">
- <include name="**/"/>
- <include name="**/*.java"/>
- <exclude name="**/.*"/>
- </srcfiles>
- </uptodate>
- </target>
-
- <target name="protocol.basic.test.compile"
depends="lib.junit,protocol.basic.compile,protocol.basic.test.compile.depcheck"
unless="protocol.basic.test.compile.uptodate">
- <mkdir dir="protocol/basic/target/test/classes"/>
- <javac
- source="${javac.source}"
- target="${javac.target}"
- srcdir="protocol/basic/src/test/java"
- destdir="protocol/basic/target/test/classes"
- debug="true">
- <compilerarg value="-Xlint:unchecked"/>
- <classpath>
- <path refid="api.classpath"/>
- <path refid="core.classpath"/>
- <path refid="protocol.basic.classpath"/>
- <path refid="testing-support.classpath"/>
- <path refid="lib.junit.classpath"/>
- <path refid="lib.river.classpath"/>
- <path refid="lib.xnio-nio.classpath"/>
- </classpath>
- </javac>
- <touch file="protocol/basic/target/test/.lastcompile"
verbose="false"/>
- </target>
-
- <target name="protocol.basic.test.pseudotarget">
- <echo message="============================================="/>
- <echo message="${message}"/>
- <echo message="============================================="/>
- <mkdir dir="protocol/basic/target/test-results"/>
- <junit printsummary="true" fork="yes"
includeantruntime="true">
- <sysproperty key="build.home" value="${basedir}"/>
- <sysproperty key="ant.library.dir"
value="${ant.home}/lib"/>
- <sysproperty key="lib.junit.local"
value="${lib.junit.local}"/>
- <sysproperty key="lib.marshalling-api.local"
value="${lib.marshalling-api.local}"/>
- <sysproperty key="lib.river.local"
value="${lib.river.local}"/>
- <sysproperty key="lib.xnio-api.local"
value="${lib.xnio-api.local}"/>
- <sysproperty key="lib.xnio-nio.local"
value="${lib.xnio-nio.local}"/>
- <jvmarg line="${test.jvmargs}"/>
- <formatter type="plain" extension="${extension}"/>
- <classpath>
- <path refid="api.classpath"/>
- <path refid="core.classpath"/>
- <path refid="protocol.basic.classpath"/>
- <path refid="testing-support.classpath"/>
- <pathelement
location="protocol/basic/target/test/classes"/>
- <path refid="lib.junit.classpath"/>
- <path refid="lib.river.classpath"/>
- <path refid="lib.xnio-nio.classpath"/>
- </classpath>
- <batchtest fork="yes"
todir="protocol/basic/target/test-results"
- haltonfailure="no">
- <fileset dir="protocol/basic/target/test/classes">
- <include name="**/*TestCase.class"/>
- </fileset>
- </batchtest>
- </junit>
- </target>
-
- <target name="protocol.basic.test"
depends="lib.river,lib.xnio-nio,api,core,protocol.basic,testing-support,protocol.basic.test.compile">
- <antcall inheritall="true" inheritrefs="true"
target="protocol.basic.test.pseudotarget">
- <param name="extension" value=".txt"/>
- <param name="message" value="Running with no security
manager"/>
- <param name="test.jvmargs" value="-Ddummy=dummy"/>
- </antcall>
- <antcall inheritall="true" inheritrefs="true"
target="protocol.basic.test.pseudotarget">
- <param name="extension" value="-security.txt"/>
- <param name="message" value="Running with security
manager"/>
- <param name="test.jvmargs"
value="-Djava.security.manager=org.jboss.remoting.test.support.LoggingSecurityManager
-Djava.security.policy=${basedir}/testing-support/src/main/resources/testing.policy
-Dsecurity.debug=policy"/>
- </antcall>
- </target>
-
- <target name="protocol.basic.clean">
- <delete dir="protocol/basic/target"/>
- </target>
-
- <target name="protocol.basic" description="Build the protocol.basic
module" depends="lib.xnio-api,api,core,protocol.basic.compile">
- <path id="protocol.basic.classpath">
- <pathelement location="protocol/basic/target/main/classes"/>
- </path>
- </target>
-
<!-- samples module -->
<target name="samples.compile.depcheck">
@@ -781,7 +661,7 @@
<!-- core -->
- <target name="all-core" description="Build all core targets"
depends="api,core,protocol.basic,protocol.multiplex,samples,testing-support"/>
+ <target name="all-core" description="Build all core targets"
depends="api,core,protocol.multiplex,samples,testing-support"/>
<!-- JARs: These should be the second-to-last targets in the file -->
@@ -791,8 +671,8 @@
<target name="all" description="Build everything"
depends="all-core,all-jars,api-javadoc"/>
- <target name="clean" description="Clean out all build files"
depends="api.clean,core.clean,protocol.basic.clean,protocol.multiplex.clean,samples.clean,testing-support.clean"/>
+ <target name="clean" description="Clean out all build files"
depends="api.clean,core.clean,protocol.multiplex.clean,samples.clean,testing-support.clean"/>
- <target name="test" description="Run all tests"
depends="api.test,core.test,protocol.basic.test,protocol.multiplex.test"/>
+ <target name="test" description="Run all tests"
depends="api.test,core.test,protocol.multiplex.test"/>
</project>
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicConfiguration.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicConfiguration.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicConfiguration.java 2009-01-29
20:33:56 UTC (rev 4844)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.samples.protocol.basic;
+
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.xnio.BufferAllocator;
+import java.util.concurrent.Executor;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public final class BasicConfiguration {
+ private MarshallerFactory marshallerFactory;
+ private MarshallingConfiguration marshallingConfiguration;
+ private int linkMetric;
+ private Executor executor;
+ private BufferAllocator<ByteBuffer> allocator;
+
+ public MarshallerFactory getMarshallerFactory() {
+ return marshallerFactory;
+ }
+
+ public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
+ this.marshallerFactory = marshallerFactory;
+ }
+
+ public MarshallingConfiguration getMarshallingConfiguration() {
+ return marshallingConfiguration;
+ }
+
+ public void setMarshallingConfiguration(final MarshallingConfiguration
marshallingConfiguration) {
+ this.marshallingConfiguration = marshallingConfiguration;
+ }
+
+ public int getLinkMetric() {
+ return linkMetric;
+ }
+
+ public void setLinkMetric(final int linkMetric) {
+ this.linkMetric = linkMetric;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(final Executor executor) {
+ this.executor = executor;
+ }
+
+ public BufferAllocator<ByteBuffer> getAllocator() {
+ return allocator;
+ }
+
+ public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
+ this.allocator = allocator;
+ }
+}
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicHandlerReplyConsumer.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicHandlerReplyConsumer.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicHandlerReplyConsumer.java 2009-01-29
20:33:56 UTC (rev 4844)
@@ -0,0 +1,144 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.samples.protocol.basic;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.Queue;
+import java.io.IOException;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.RemoteExecutionException;
+import org.jboss.remoting.ReplyException;
+import org.jboss.remoting.IndeterminateOutcomeException;
+
+/**
+ *
+ */
+final class BasicHandlerReplyConsumer implements Runnable {
+
+ private static final Logger log = Logger.getLogger(BasicHandlerReplyConsumer.class);
+
+ private final AtomicInteger replySequence;
+ private final Unmarshaller unmarshaller;
+ private final StreamChannel streamChannel;
+ private final Lock reqLock;
+ private final Queue<ReplyHandler> replyQueue;
+
+ public BasicHandlerReplyConsumer(final Unmarshaller unmarshaller, final StreamChannel
streamChannel, final Lock reqLock, final Queue<ReplyHandler> replyQueue) {
+ this.unmarshaller = unmarshaller;
+ this.streamChannel = streamChannel;
+ this.reqLock = reqLock;
+ this.replyQueue = replyQueue;
+ replySequence = new AtomicInteger();
+ }
+
+ public void run() {
+ try {
+ for (;;) {
+ final int type = unmarshaller.read();
+ switch (type) {
+ case -1: {
+ // done.
+ return;
+ }
+ case 1: {
+ // reply - success
+ reqLock.lock();
+ try {
+ replySequence.getAndIncrement();
+ final ReplyHandler replyHandler = replyQueue.remove();
+ final Object reply;
+ try {
+ reply = unmarshaller.readObject();
+ } catch (Exception e) {
+ SpiUtils.safeHandleException(replyHandler, new
ReplyException("Failed to read reply from server", e));
+ return;
+ }
+ SpiUtils.safeHandleReply(replyHandler, reply);
+ break;
+ } finally {
+ reqLock.unlock();
+ }
+ }
+ case 2: {
+ // reply - cancelled
+ reqLock.lock();
+ try {
+ final int id = unmarshaller.readInt();
+ if (id != replySequence.getAndIncrement()) {
+ replySequence.decrementAndGet();
+ break;
+ }
+ final ReplyHandler replyHandler = replyQueue.remove();
+ SpiUtils.safeHandleCancellation(replyHandler);
+ break;
+ } finally {
+ reqLock.unlock();
+ }
+ }
+ case 3: {
+ // reply - exception
+ reqLock.lock();
+ try {
+ replySequence.getAndIncrement();
+ final ReplyHandler replyHandler = replyQueue.remove();
+ final Throwable e;
+ try {
+ e = (Throwable) unmarshaller.readObject();
+ } catch (Exception e2) {
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Failed to read exception from server", e2));
+ return;
+ }
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote execution failed", e));
+ break;
+ } finally {
+ reqLock.unlock();
+ }
+ }
+ default: {
+ // invalid byte
+ throw new IOException("Read an invalid byte from the
server");
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error(e, "Error receiving reply");
+ } finally {
+ IoUtils.safeClose(streamChannel);
+ reqLock.lock();
+ try {
+ while (replyQueue.size() > 0) {
+ ReplyHandler replyHandler = replyQueue.remove();
+ SpiUtils.safeHandleException(replyHandler, new
IndeterminateOutcomeException("Connection terminated; operation outcome
unknown"));
+ }
+ } finally {
+ reqLock.unlock();
+ }
+ }
+ }
+}
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicProtocol.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicProtocol.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicProtocol.java 2009-01-29
20:33:56 UTC (rev 4844)
@@ -0,0 +1,83 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.samples.protocol.basic;
+
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.spi.Handle;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.channels.ChannelOutputStream;
+import org.jboss.xnio.channels.ChannelInputStream;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Marshalling;
+import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.Executor;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Queue;
+import java.util.LinkedList;
+
+/**
+ * A very basic example protocol.
+ */
+public final class BasicProtocol {
+
+ private BasicProtocol() {
+ }
+
+ public static final void createServer(final Handle<RequestHandler>
requestHandlerHandle, final StreamChannel streamChannel, final BasicConfiguration
configuration) throws IOException {
+ final RequestHandler requestHandler = requestHandlerHandle.getResource();
+ final MarshallingConfiguration marshallerConfiguration =
configuration.getMarshallingConfiguration();
+ final MarshallerFactory marshallerFactory =
configuration.getMarshallerFactory();
+ final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallerConfiguration);
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallerConfiguration);
+ final Executor executor = configuration.getExecutor();
+ marshaller.start(Marshalling.createByteOutput(new
ChannelOutputStream(streamChannel)));
+ unmarshaller.start(Marshalling.createByteInput(new
ChannelInputStream(streamChannel)));
+ final BlockingQueue<FutureBasicReply> replyQueue = new
LinkedBlockingQueue<FutureBasicReply>();
+ // todo - handle rejected execution...
+ executor.execute(new BasicServerReplyTransmitter(replyQueue, marshaller,
streamChannel, requestHandlerHandle));
+ // todo - handle rejected execution...
+ executor.execute(new BasicServerRequestConsumer(unmarshaller, requestHandler,
replyQueue, streamChannel, requestHandlerHandle));
+ }
+
+ public static final Handle<RequestHandler> createClient(final StreamChannel
streamChannel, final BasicConfiguration configuration) throws IOException {
+ final MarshallingConfiguration marshallerConfiguration =
configuration.getMarshallingConfiguration();
+ final MarshallerFactory marshallerFactory =
configuration.getMarshallerFactory();
+ final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallerConfiguration);
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallerConfiguration);
+ final Executor executor = configuration.getExecutor();
+ marshaller.start(Marshalling.createByteOutput(new
ChannelOutputStream(streamChannel)));
+ unmarshaller.start(Marshalling.createByteInput(new
ChannelInputStream(streamChannel)));
+ final Lock reqLock = new ReentrantLock();
+ final Queue<ReplyHandler> replyQueue = new
LinkedList<ReplyHandler>();
+ // todo - handle rejected execution...
+ executor.execute(new BasicHandlerReplyConsumer(unmarshaller, streamChannel,
reqLock, replyQueue));
+ return new BasicRequestHandler(reqLock, marshaller, replyQueue, streamChannel,
executor).getHandle();
+ }
+}
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicRequestHandler.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicRequestHandler.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicRequestHandler.java 2009-01-29
20:33:56 UTC (rev 4844)
@@ -0,0 +1,101 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.samples.protocol.basic;
+
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.spi.RemoteRequestContext;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.spi.AbstractAutoCloseable;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.Executor;
+import java.util.Queue;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class BasicRequestHandler extends AbstractAutoCloseable<RequestHandler>
implements RequestHandler {
+
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.basic");
+
+ private final AtomicInteger requestSequence;
+ private final Lock reqLock;
+ private final Marshaller marshaller;
+ private final Queue<ReplyHandler> replyQueue;
+ private final StreamChannel streamChannel;
+
+ public BasicRequestHandler(final Lock reqLock, final Marshaller marshaller, final
Queue<ReplyHandler> replyQueue, final StreamChannel streamChannel, final Executor
executor) {
+ super(executor);
+ this.reqLock = reqLock;
+ this.marshaller = marshaller;
+ this.replyQueue = replyQueue;
+ this.streamChannel = streamChannel;
+ requestSequence = new AtomicInteger();
+ }
+
+ public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler
replyHandler) {
+ reqLock.lock();
+ try {
+ marshaller.write(2);
+ marshaller.writeObject(request);
+ marshaller.flush();
+ final int id = requestSequence.getAndIncrement();
+ replyQueue.add(replyHandler);
+ return new RemoteRequestContext() {
+ public void cancel() {
+ reqLock.lock();
+ try {
+ marshaller.write(3);
+ marshaller.writeInt(id);
+ marshaller.flush();
+ } catch (IOException e) {
+ log.error(e, "Error writing cancel request");
+ IoUtils.safeClose(BasicRequestHandler.this);
+ } finally {
+ reqLock.unlock();
+ }
+ }
+ };
+ } catch (IOException e) {
+ SpiUtils.safeHandleException(replyHandler, e);
+ IoUtils.safeClose(this);
+ return SpiUtils.getBlankRemoteRequestContext();
+ } finally {
+ reqLock.unlock();
+ }
+ }
+
+ protected void closeAction() throws IOException {
+ streamChannel.close();
+ }
+
+ public String toString() {
+ return "basic protocol handler <" + Integer.toHexString(hashCode())
+ ">";
+ }
+}
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicServerReplyTransmitter.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicServerReplyTransmitter.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicServerReplyTransmitter.java 2009-01-29
20:33:56 UTC (rev 4844)
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.samples.protocol.basic;
+
+import java.util.concurrent.BlockingQueue;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.Handle;
+
+/**
+ *
+ */
+final class BasicServerReplyTransmitter implements Runnable {
+
+ private static final Logger log =
Logger.getLogger(BasicServerReplyTransmitter.class);
+
+ private final BlockingQueue<FutureBasicReply> replyQueue;
+ private final Marshaller marshaller;
+ private final StreamChannel streamChannel;
+ private final Handle<RequestHandler> requestHandlerHandle;
+
+ public BasicServerReplyTransmitter(final BlockingQueue<FutureBasicReply>
replyQueue, final Marshaller marshaller, final StreamChannel streamChannel, final
Handle<RequestHandler> requestHandlerHandle) {
+ this.replyQueue = replyQueue;
+ this.marshaller = marshaller;
+ this.streamChannel = streamChannel;
+ this.requestHandlerHandle = requestHandlerHandle;
+ }
+
+ public void run() {
+ try {
+ for (;;) {
+ final FutureBasicReply futureBasicReply = replyQueue.take();
+ OUT: for (;;) switch (futureBasicReply.awaitInterruptibly()) {
+ case DONE: {
+ marshaller.write(1);
+ marshaller.writeObject(futureBasicReply.get());
+ marshaller.flush();
+ break OUT;
+ }
+ case CANCELLED: {
+ marshaller.write(2);
+ marshaller.writeInt(futureBasicReply.id);
+ marshaller.flush();
+ break OUT;
+ }
+ case FAILED: {
+ marshaller.write(3);
+ marshaller.writeObject(futureBasicReply.getException());
+ marshaller.flush();
+ break OUT;
+ }
+ case WAITING: {
+ // spurious wakeup, try again
+ continue;
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.trace(e, "Interrupted");
+ } catch (Exception e) {
+ log.error(e, "Error in reply transmitter");
+ } finally {
+ IoUtils.safeClose(streamChannel);
+ IoUtils.safeClose(requestHandlerHandle);
+ }
+ }
+}
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicServerRequestConsumer.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicServerRequestConsumer.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/BasicServerRequestConsumer.java 2009-01-29
20:33:56 UTC (rev 4844)
@@ -0,0 +1,118 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.samples.protocol.basic;
+
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.Handle;
+import org.jboss.remoting.spi.RemoteRequestContext;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import java.util.concurrent.BlockingQueue;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class BasicServerRequestConsumer implements Runnable {
+
+ private static final Logger log =
Logger.getLogger(BasicServerRequestConsumer.class);
+
+ private final Unmarshaller unmarshaller;
+ private final RequestHandler requestHandler;
+ private final BlockingQueue<FutureBasicReply> replyQueue;
+ private final StreamChannel streamChannel;
+ private final Handle<RequestHandler> requestHandlerHandle;
+
+ public BasicServerRequestConsumer(final Unmarshaller unmarshaller, final
RequestHandler requestHandler, final BlockingQueue<FutureBasicReply> replyQueue,
final StreamChannel streamChannel, final Handle<RequestHandler>
requestHandlerHandle) {
+ this.unmarshaller = unmarshaller;
+ this.requestHandler = requestHandler;
+ this.replyQueue = replyQueue;
+ this.streamChannel = streamChannel;
+ this.requestHandlerHandle = requestHandlerHandle;
+ }
+
+ public void run() {
+ try {
+ int requestSequence = 0;
+ for (;;) {
+ final int id = unmarshaller.read();
+ switch (id) {
+ case -1: {
+ // done.
+ return;
+ }
+ case 2: {
+ // two-way request
+ final int requestId = requestSequence++;
+ final Object request = unmarshaller.readObject();
+ final FutureBasicReply future = new FutureBasicReply(requestId);
+ replyQueue.add(future);
+ final RemoteRequestContext requestContext =
requestHandler.receiveRequest(request, new ReplyHandler() {
+
+ public void handleReply(final Object reply) {
+ future.setResult(reply);
+ }
+
+ public void handleException(final IOException exception) {
+ future.setException(exception);
+ }
+
+ public void handleCancellation() {
+ future.finishCancel();
+ }
+ });
+ future.requestContext = requestContext;
+ break;
+ }
+ case 3: {
+ // cancel request
+ final int requestId = unmarshaller.readInt();
+ // simply iterate over the outstanding requests until we match or
are past it...
+ for (FutureBasicReply future : replyQueue) {
+ final int queuedId = future.id;
+ if (queuedId == requestId) {
+ future.cancel();
+ break;
+ } else if (queuedId > requestId) {
+ break;
+ }
+ }
+ break;
+ }
+ default: {
+ // invalid byte
+ throw new IOException("Read an invalid byte from the
client");
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error(e, "Connection failed");
+ } finally {
+ IoUtils.safeClose(streamChannel);
+ IoUtils.safeClose(requestHandlerHandle);
+ }
+ }
+}
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/FutureBasicReply.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/FutureBasicReply.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/protocol/basic/FutureBasicReply.java 2009-01-29
20:33:56 UTC (rev 4844)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.samples.protocol.basic;
+
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
+import org.jboss.remoting.spi.RemoteRequestContext;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class FutureBasicReply extends AbstractIoFuture<Object> {
+
+ final int id;
+ RemoteRequestContext requestContext;
+
+ public FutureBasicReply(final int id) {
+ this.id = id;
+ }
+
+ protected boolean setException(final IOException exception) {
+ return super.setException(exception);
+ }
+
+ protected boolean setResult(final Object result) {
+ return super.setResult(result);
+ }
+
+ protected boolean finishCancel() {
+ return super.finishCancel();
+ }
+
+ public IoFuture<Object> cancel() {
+ requestContext.cancel();
+ return this;
+ }
+}
Added:
remoting3/trunk/samples/src/test/java/org/jboss/remoting/samples/protocol/basic/BasicTestCase.java
===================================================================
---
remoting3/trunk/samples/src/test/java/org/jboss/remoting/samples/protocol/basic/BasicTestCase.java
(rev 0)
+++
remoting3/trunk/samples/src/test/java/org/jboss/remoting/samples/protocol/basic/BasicTestCase.java 2009-01-29
20:33:56 UTC (rev 4844)
@@ -0,0 +1,139 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.samples.protocol.basic;
+
+import junit.framework.TestCase;
+import org.jboss.xnio.Xnio;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.nio.NioXnio;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.Remoting;
+import org.jboss.remoting.AbstractRequestListener;
+import org.jboss.remoting.RequestContext;
+import org.jboss.remoting.RemoteExecutionException;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.test.support.LoggingHelper;
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.Handle;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.marshalling.river.RiverMarshallerFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class BasicTestCase extends TestCase {
+ static {
+ LoggingHelper.init();
+ }
+
+ public static void testConnect() throws Throwable {
+ ExecutorService executor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
+ try {
+ Xnio xnio = NioXnio.create(executor, 2, 2, 2);
+ try {
+ final BasicConfiguration configuration = new BasicConfiguration();
+ configuration.setExecutor(executor);
+ configuration.setMarshallerFactory(new RiverMarshallerFactory());
+ final MarshallingConfiguration marshallingConfiguration = new
MarshallingConfiguration();
+ configuration.setMarshallingConfiguration(marshallingConfiguration);
+ final Endpoint endpoint = Remoting.createEndpoint(executor,
"test");
+ try {
+ final Handle<RequestHandler> requestHandlerHandle =
endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
+ public void handleRequest(final RequestContext<Object>
context, final Object request) throws RemoteExecutionException {
+ System.out.println("Got a request! " +
request.toString());
+ try {
+ context.sendReply("GOOMBA");
+ } catch (IOException e) {
+ try {
+ context.sendFailure("Failed", e);
+ } catch (IOException e1) {
+ // buh
+ }
+ }
+ }
+ }, Object.class, Object.class);
+ try {
+ final ChannelSource<StreamChannel> channelSource =
xnio.createPipeServer(executor, IoUtils.singletonHandlerFactory(new
IoHandler<StreamChannel>() {
+ public void handleOpened(final StreamChannel channel) {
+ try {
+ System.out.println("Opening channel");
+ BasicProtocol.createServer(requestHandlerHandle,
channel, configuration);
+ } catch (IOException e) {
+ e.printStackTrace();
+ IoUtils.safeClose(channel);
+ }
+ }
+
+ public void handleReadable(final StreamChannel channel) {
+ }
+
+ public void handleWritable(final StreamChannel channel) {
+ }
+
+ public void handleClosed(final StreamChannel channel) {
+ System.out.println("Closing channel");
+ }
+ }));
+ final IoFuture<StreamChannel> futureChannel =
channelSource.open(IoUtils.nullHandler());
+ assertEquals(IoFuture.Status.DONE, futureChannel.await(1L,
TimeUnit.SECONDS));
+ final StreamChannel channel = futureChannel.get();
+ try {
+ final Handle<RequestHandler> clientHandlerHandle =
BasicProtocol.createClient(channel, configuration);
+ try {
+ final Client<Object,Object> client =
endpoint.createClient(clientHandlerHandle.getResource(), Object.class, Object.class);
+ try {
+ final IoFuture<? extends Object> futureReply =
client.send("GORBA!");
+ assertEquals(IoFuture.Status.DONE,
futureReply.await(500L, TimeUnit.MILLISECONDS));
+ System.out.println("Reply is:" +
futureReply.get());
+ } finally {
+ IoUtils.safeClose(client);
+ }
+ } finally {
+ IoUtils.safeClose(clientHandlerHandle);
+ }
+ } finally {
+ IoUtils.safeClose(channel);
+ }
+ } finally {
+ IoUtils.safeClose(requestHandlerHandle);
+ }
+ } finally {
+ IoUtils.safeClose(endpoint);
+ }
+ } finally {
+ IoUtils.safeClose(xnio);
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+}