Author: david.lloyd(a)jboss.com
Date: 2008-10-21 00:56:10 -0400 (Tue, 21 Oct 2008)
New Revision: 4604
Added:
remoting3/trunk/protocol/basic/
remoting3/trunk/protocol/basic/src/
remoting3/trunk/protocol/basic/src/main/
remoting3/trunk/protocol/basic/src/main/java/
remoting3/trunk/protocol/basic/src/main/java/org/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicConfiguration.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandlerReplyConsumer.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerReplyTransmitter.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerRequestConsumer.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/FutureBasicReply.java
remoting3/trunk/protocol/basic/src/test/
remoting3/trunk/protocol/basic/src/test/java/
Modified:
remoting3/trunk/build.xml
Log:
Cleanup; add *really* basic protocol
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-10-21 02:44:31 UTC (rev 4603)
+++ remoting3/trunk/build.xml 2008-10-21 04:56:10 UTC (rev 4604)
@@ -761,7 +761,6 @@
<compilerarg value="-Xlint:unchecked"/>
<classpath>
<path refid="api.classpath"/>
- <!-- TODO: marshallers should be moved to their own module -->
<path refid="core.classpath"/>
<path refid="util.classpath"/>
<pathelement location="${lib.marshalling-api.local}"/>
@@ -866,7 +865,135 @@
</path>
</target>
+ <!-- protocol.basic module -->
+ <target name="protocol.basic.compile.depcheck">
+ <mkdir dir="protocol/basic/target/main"/>
+ <uptodate property="protocol/basic.compile.uptodate"
targetfile="protocol/basic/target/main/.lastcompile">
+ <srcfiles dir="protocol/basic/src/main/java">
+ <include name="**/"/>
+ <include name="**/*.java"/>
+ <exclude name="**/.*"/>
+ </srcfiles>
+ </uptodate>
+ </target>
+
+ <target name="protocol.basic.compile"
depends="protocol.basic.compile.depcheck"
unless="protocol.basic.compile.uptodate">
+ <mkdir dir="protocol/basic/target/main/classes"/>
+ <javac
+ source="${javac.source}"
+ target="${javac.target}"
+ srcdir="protocol/basic/src/main/java"
+ destdir="protocol/basic/target/main/classes"
+ debug="true">
+ <compilerarg value="-Xlint:unchecked"/>
+ <classpath>
+ <path refid="api.classpath"/>
+ <path refid="core.classpath"/>
+ <path refid="util.classpath"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
+ <pathelement location="${lib.xnio-api.local}"/>
+ </classpath>
+ </javac>
+ <touch file="protocol/basic/target/main/.lastcompile"
verbose="false"/>
+ </target>
+
+ <target name="protocol.basic.test.compile.depcheck">
+ <mkdir dir="protocol/basic/target/test"/>
+ <uptodate property="protocol.basic.compile.uptodate"
targetfile="protocol/basic/target/test/.lastcompile">
+ <srcfiles dir="protocol/basic/src/test/java">
+ <include name="**/"/>
+ <include name="**/*.java"/>
+ <exclude name="**/.*"/>
+ </srcfiles>
+ </uptodate>
+ </target>
+
+ <target name="protocol.basic.test.compile"
depends="lib.junit,protocol.basic.compile,protocol.basic.test.compile.depcheck"
unless="protocol.basic.test.compile.uptodate">
+ <mkdir dir="protocol/basic/target/test/classes"/>
+ <javac
+ source="${javac.source}"
+ target="${javac.target}"
+ srcdir="protocol/basic/src/test/java"
+ destdir="protocol/basic/target/test/classes"
+ debug="true">
+ <compilerarg value="-Xlint:unchecked"/>
+ <classpath>
+ <path refid="api.classpath"/>
+ <path refid="core.classpath"/>
+ <path refid="protocol.basic.classpath"/>
+ <path refid="util.classpath"/>
+ <path refid="testing-support.classpath"/>
+ <pathelement location="${lib.junit.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
+ <pathelement location="${lib.river.local}"/>
+ <pathelement location="${lib.xnio-api.local}"/>
+ <pathelement location="${lib.xnio-nio.local}"/>
+ </classpath>
+ </javac>
+ <touch file="protocol/basic/target/test/.lastcompile"
verbose="false"/>
+ </target>
+
+ <target name="protocol.basic.test.pseudotarget">
+ <echo message="============================================="/>
+ <echo message="${message}"/>
+ <echo message="============================================="/>
+ <mkdir dir="protocol/basic/target/test-results"/>
+ <junit printsummary="true" fork="yes"
includeantruntime="true">
+ <sysproperty key="build.home" value="${basedir}"/>
+ <sysproperty key="ant.library.dir"
value="${ant.home}/lib"/>
+ <sysproperty key="lib.junit.local"
value="${lib.junit.local}"/>
+ <sysproperty key="lib.marshalling-api.local"
value="${lib.marshalling-api.local}"/>
+ <sysproperty key="lib.xnio-api.local"
value="${lib.xnio-api.local}"/>
+ <sysproperty key="lib.xnio-nio.local"
value="${lib.xnio-nio.local}"/>
+ <jvmarg line="${test.jvmargs}"/>
+ <formatter type="plain" extension="${extension}"/>
+ <classpath>
+ <path refid="api.classpath"/>
+ <path refid="core.classpath"/>
+ <path refid="protocol.basic.classpath"/>
+ <path refid="testing-support.classpath"/>
+ <path refid="util.classpath"/>
+ <pathelement
location="protocol/basic/target/test/classes"/>
+ <pathelement location="${lib.junit.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
+ <pathelement location="${lib.river.local}"/>
+ <pathelement location="${lib.xnio-api.local}"/>
+ <pathelement location="${lib.xnio-nio.local}"/>
+ </classpath>
+ <batchtest fork="yes"
todir="protocol/basic/target/test-results"
+ haltonfailure="no">
+ <fileset dir="protocol/basic/target/test/classes">
+ <include name="**/*TestCase.class"/>
+ </fileset>
+ </batchtest>
+ </junit>
+ </target>
+
+ <target name="protocol.basic.test"
depends="lib.xnio-nio,api,core,protocol.basic,testing-support,util,protocol.basic.test.compile">
+ <antcall inheritall="true" inheritrefs="true"
target="protocol.basic.test.pseudotarget">
+ <param name="extension" value=".txt"/>
+ <param name="message" value="Running with no security
manager"/>
+ <param name="test.jvmargs" value="-Ddummy=dummy"/>
+ </antcall>
+ <antcall inheritall="true" inheritrefs="true"
target="protocol.basic.test.pseudotarget">
+ <param name="extension" value="-security.txt"/>
+ <param name="message" value="Running with security
manager"/>
+ <param name="test.jvmargs"
value="-Djava.security.manager=org.jboss.remoting.test.support.LoggingSecurityManager
-Djava.security.policy=${basedir}/testing-support/src/main/resources/testing.policy
-Dsecurity.debug=policy"/>
+ </antcall>
+ </target>
+
+ <target name="protocol.basic.clean">
+ <delete dir="protocol/basic/target"/>
+ </target>
+
+ <target name="protocol.basic" description="Build the protocol.basic
module" depends="lib.xnio-api,api,core,util,protocol.basic.compile">
+ <path id="protocol.basic.classpath">
+ <pathelement location="protocol/basic/target/main/classes"/>
+ </path>
+ </target>
+
+
<!-- samples module -->
<target name="samples.compile.depcheck">
@@ -1173,6 +1300,7 @@
debug="true">
<compilerarg value="-Xlint:unchecked"/>
<classpath>
+ <pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
<touch file="util/target/main/.lastcompile"
verbose="false"/>
@@ -1182,7 +1310,7 @@
<delete dir="util/target"/>
</target>
- <target name="util" description="Build the utilities module"
depends="util.compile">
+ <target name="util" description="Build the utilities module"
depends="lib.xnio-api,util.compile">
<path id="util.classpath">
<pathelement location="util/target/main/classes"/>
</path>
@@ -1222,7 +1350,7 @@
<path id="version.classpath">
<pathelement location="version/target/main/classes"/>
</path>
- <java classpathref="version.classpath"
classname="org.jboss.cx.remoting.version.Version"
outputproperty="version"/>
+ <java classpathref="version.classpath"
classname="org.jboss.remoting.version.Version"
outputproperty="version"/>
<property name="version" value="UNKNOWN"/>
</target>
@@ -1341,9 +1469,9 @@
<!-- core -->
- <target name="all-core" description="Build all core targets"
depends="api,compat,core,mc-deployers,protocol.multiplex,samples,standalone,testing-support,tools,util"/>
+ <target name="all-core" description="Build all core targets"
depends="api,compat,core,mc-deployers,protocol.basic,protocol.multiplex,samples,standalone,testing-support,tools,util"/>
- <target name="clean-core" description="Clean all core targets"
depends="api.clean,compat.clean,core.clean,mc-deployers.clean,protocol.multiplex.clean,samples.clean,standalone.clean,testing-support.clean,tools.clean,util.clean"/>
+ <target name="clean-core" description="Clean all core targets"
depends="api.clean,compat.clean,core.clean,mc-deployers.clean,protocol.basic.clean,protocol.multiplex.clean,samples.clean,standalone.clean,testing-support.clean,tools.clean,util.clean"/>
<!-- http -->
@@ -1365,6 +1493,6 @@
<target name="clean" description="Clean out all build files"
depends="clean-core,clean-http,version.clean,srp.clean"/>
- <target name="test" description="Run all tests"
depends="api.test,core.test,protocol.multiplex.test"/>
+ <target name="test" description="Run all tests"
depends="api.test,core.test,protocol.basic.test,protocol.multiplex.test"/>
</project>
Property changes on: remoting3/trunk/protocol/basic
___________________________________________________________________
Name: svn:ignore
+ target
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicConfiguration.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicConfiguration.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicConfiguration.java 2008-10-21
04:56:10 UTC (rev 4604)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Configuration;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.remoting.spi.stream.StreamDetector;
+import java.util.concurrent.Executor;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public final class BasicConfiguration {
+ private MarshallerFactory marshallerFactory;
+ private Configuration marshallingConfiguration;
+ private int linkMetric;
+ private Executor executor;
+ private BufferAllocator<ByteBuffer> allocator;
+ private StreamDetector streamDetector;
+
+ public MarshallerFactory getMarshallerFactory() {
+ return marshallerFactory;
+ }
+
+ public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
+ this.marshallerFactory = marshallerFactory;
+ }
+
+ public Configuration getMarshallingConfiguration() {
+ return marshallingConfiguration;
+ }
+
+ public void setMarshallingConfiguration(final Configuration marshallingConfiguration)
{
+ this.marshallingConfiguration = marshallingConfiguration;
+ }
+
+ public int getLinkMetric() {
+ return linkMetric;
+ }
+
+ public void setLinkMetric(final int linkMetric) {
+ this.linkMetric = linkMetric;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(final Executor executor) {
+ this.executor = executor;
+ }
+
+ public BufferAllocator<ByteBuffer> getAllocator() {
+ return allocator;
+ }
+
+ public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
+ this.allocator = allocator;
+ }
+
+ public StreamDetector getStreamDetector() {
+ return streamDetector;
+ }
+
+ public void setStreamDetector(final StreamDetector streamDetector) {
+ this.streamDetector = streamDetector;
+ }
+}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandlerReplyConsumer.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandlerReplyConsumer.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandlerReplyConsumer.java 2008-10-21
04:56:10 UTC (rev 4604)
@@ -0,0 +1,141 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.Queue;
+import java.io.IOException;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoUtils;
+import org.jboss.remoting.spi.remote.ReplyHandler;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.RemoteExecutionException;
+import org.jboss.remoting.ReplyException;
+import org.jboss.remoting.IndeterminateOutcomeException;
+
+/**
+ *
+ */
+final class BasicHandlerReplyConsumer implements Runnable {
+
+ private final AtomicInteger replySequence;
+ private final Unmarshaller unmarshaller;
+ private final StreamChannel streamChannel;
+ private final Lock reqLock;
+ private final Queue<ReplyHandler> replyQueue;
+
+ public BasicHandlerReplyConsumer(final Unmarshaller unmarshaller, final StreamChannel
streamChannel, final Lock reqLock, final Queue<ReplyHandler> replyQueue) {
+ this.unmarshaller = unmarshaller;
+ this.streamChannel = streamChannel;
+ this.reqLock = reqLock;
+ this.replyQueue = replyQueue;
+ replySequence = new AtomicInteger();
+ }
+
+ public void run() {
+ try {
+ for (;;) {
+ final int type = unmarshaller.read();
+ switch (type) {
+ case -1: {
+ // done.
+ return;
+ }
+ case 1: {
+ // reply - success
+ reqLock.lock();
+ try {
+ replySequence.getAndIncrement();
+ final ReplyHandler replyHandler = replyQueue.remove();
+ final Object reply;
+ try {
+ reply = unmarshaller.readObject();
+ } catch (Exception e) {
+ SpiUtils.safeHandleException(replyHandler, new
ReplyException("Failed to read reply from server", e));
+ return;
+ }
+ SpiUtils.safeHandleReply(replyHandler, reply);
+ break;
+ } finally {
+ reqLock.unlock();
+ }
+ }
+ case 2: {
+ // reply - cancelled
+ reqLock.lock();
+ try {
+ final int id = unmarshaller.readInt();
+ if (id != replySequence.getAndIncrement()) {
+ replySequence.decrementAndGet();
+ break;
+ }
+ final ReplyHandler replyHandler = replyQueue.remove();
+ SpiUtils.safeHandleCancellation(replyHandler);
+ break;
+ } finally {
+ reqLock.unlock();
+ }
+ }
+ case 3: {
+ // reply - exception
+ reqLock.lock();
+ try {
+ replySequence.getAndIncrement();
+ final ReplyHandler replyHandler = replyQueue.remove();
+ final Throwable e;
+ try {
+ e = (Throwable) unmarshaller.readObject();
+ } catch (Exception e2) {
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Failed to read exception from server", e2));
+ return;
+ }
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote execution failed", e));
+ break;
+ } finally {
+ reqLock.unlock();
+ }
+ }
+ default: {
+ // invalid byte
+ throw new IOException("Read an invalid byte from the
server");
+ }
+ }
+ }
+ } catch (Exception e) {
+ // todo log it
+ } finally {
+ IoUtils.safeClose(streamChannel);
+ reqLock.lock();
+ try {
+ while (replyQueue.size() > 0) {
+ ReplyHandler replyHandler = replyQueue.remove();
+ SpiUtils.safeHandleException(replyHandler, new
IndeterminateOutcomeException("Connection terminated; operation outcome
unknown"));
+ }
+ } finally {
+ reqLock.unlock();
+ }
+ }
+ }
+}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java 2008-10-21
04:56:10 UTC (rev 4604)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import org.jboss.remoting.spi.remote.RequestHandler;
+import org.jboss.remoting.spi.remote.ReplyHandler;
+import org.jboss.remoting.spi.remote.Handle;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.channels.ChannelOutputStream;
+import org.jboss.xnio.channels.ChannelInputStream;
+import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Marshalling;
+import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.Executor;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Queue;
+import java.util.LinkedList;
+
+/**
+ * A very basic example protocol.
+ */
+public final class BasicProtocol {
+
+ private BasicProtocol() {
+ }
+
+ public static final void createServer(final Handle<RequestHandler>
requestHandlerHandle, final StreamChannel streamChannel, final BasicConfiguration
configuration) throws IOException {
+ final RequestHandler requestHandler = requestHandlerHandle.getResource();
+ final Configuration marshallerConfiguration =
configuration.getMarshallingConfiguration();
+ final MarshallerFactory marshallerFactory =
configuration.getMarshallerFactory();
+ final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallerConfiguration);
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallerConfiguration);
+ final Executor executor = configuration.getExecutor();
+ marshaller.start(Marshalling.createByteOutput(new
ChannelOutputStream(streamChannel)));
+ unmarshaller.start(Marshalling.createByteInput(new
ChannelInputStream(streamChannel)));
+ final BlockingQueue<FutureBasicReply> replyQueue = new
LinkedBlockingQueue<FutureBasicReply>();
+ executor.execute(new BasicServerReplyTransmitter(replyQueue, marshaller,
streamChannel, requestHandlerHandle));
+ executor.execute(new BasicServerRequestConsumer(unmarshaller, requestHandler,
replyQueue, streamChannel, requestHandlerHandle));
+ }
+
+ public static final Handle<RequestHandler> createClient(final StreamChannel
streamChannel, final BasicConfiguration configuration) throws IOException {
+ final Configuration marshallerConfiguration =
configuration.getMarshallingConfiguration();
+ final MarshallerFactory marshallerFactory =
configuration.getMarshallerFactory();
+ final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallerConfiguration);
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallerConfiguration);
+ final Executor executor = configuration.getExecutor();
+ marshaller.start(Marshalling.createByteOutput(new
ChannelOutputStream(streamChannel)));
+ unmarshaller.start(Marshalling.createByteInput(new
ChannelInputStream(streamChannel)));
+ final Lock reqLock = new ReentrantLock();
+ final Queue<ReplyHandler> replyQueue = new
LinkedList<ReplyHandler>();
+ executor.execute(new BasicHandlerReplyConsumer(unmarshaller, streamChannel,
reqLock, replyQueue));
+ return new BasicRequestHandler(reqLock, marshaller, replyQueue, streamChannel,
executor).getHandle();
+ }
+}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java 2008-10-21
04:56:10 UTC (rev 4604)
@@ -0,0 +1,110 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import org.jboss.remoting.spi.remote.RequestHandler;
+import org.jboss.remoting.spi.remote.ReplyHandler;
+import org.jboss.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.spi.AbstractAutoCloseable;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoUtils;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.Executor;
+import java.util.Queue;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class BasicRequestHandler extends AbstractAutoCloseable<RequestHandler>
implements RequestHandler {
+
+ private final AtomicInteger requestSequence;
+ private final Lock reqLock;
+ private final Marshaller marshaller;
+ private final Queue<ReplyHandler> replyQueue;
+ private final StreamChannel streamChannel;
+
+ public BasicRequestHandler(final Lock reqLock, final Marshaller marshaller, final
Queue<ReplyHandler> replyQueue, final StreamChannel streamChannel, final Executor
executor) {
+ super(executor);
+ this.reqLock = reqLock;
+ this.marshaller = marshaller;
+ this.replyQueue = replyQueue;
+ this.streamChannel = streamChannel;
+ requestSequence = new AtomicInteger();
+ }
+
+ public void receiveRequest(final Object request) {
+ reqLock.lock();
+ try {
+ marshaller.write(1);
+ marshaller.writeObject(request);
+ marshaller.flush();
+ } catch (IOException e) {
+ // todo log it
+ IoUtils.safeClose(this);
+ } finally {
+ reqLock.unlock();
+ }
+ }
+
+ public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler
replyHandler) {
+ reqLock.lock();
+ try {
+ marshaller.write(2);
+ marshaller.writeObject(request);
+ marshaller.flush();
+ final int id = requestSequence.getAndIncrement();
+ replyQueue.add(replyHandler);
+ return new RemoteRequestContext() {
+ public void cancel() {
+ reqLock.lock();
+ try {
+ marshaller.write(3);
+ marshaller.writeInt(id);
+ marshaller.flush();
+ } catch (IOException e) {
+ // todo log it
+ IoUtils.safeClose(BasicRequestHandler.this);
+ }
+ }
+ };
+ } catch (IOException e) {
+ SpiUtils.safeHandleException(replyHandler, e);
+ IoUtils.safeClose(this);
+ return SpiUtils.getBlankRemoteRequestContext();
+ } finally {
+ reqLock.unlock();
+ }
+ }
+
+ protected void closeAction() throws IOException {
+ streamChannel.close();
+ }
+
+ public String toString() {
+ return "basic protocol handler <" + Integer.toString(hashCode(), 16)
+ ">";
+ }
+}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerReplyTransmitter.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerReplyTransmitter.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerReplyTransmitter.java 2008-10-21
04:56:10 UTC (rev 4604)
@@ -0,0 +1,88 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import java.util.concurrent.BlockingQueue;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoUtils;
+import org.jboss.remoting.spi.remote.RequestHandler;
+import org.jboss.remoting.spi.remote.Handle;
+
+/**
+ *
+ */
+final class BasicServerReplyTransmitter implements Runnable {
+
+ private final BlockingQueue<FutureBasicReply> replyQueue;
+ private final Marshaller marshaller;
+ private final StreamChannel streamChannel;
+ private final Handle<RequestHandler> requestHandlerHandle;
+
+ public BasicServerReplyTransmitter(final BlockingQueue<FutureBasicReply>
replyQueue, final Marshaller marshaller, final StreamChannel streamChannel, final
Handle<RequestHandler> requestHandlerHandle) {
+ this.replyQueue = replyQueue;
+ this.marshaller = marshaller;
+ this.streamChannel = streamChannel;
+ this.requestHandlerHandle = requestHandlerHandle;
+ }
+
+ public void run() {
+ try {
+ for (;;) {
+ final FutureBasicReply futureBasicReply = replyQueue.remove();
+ OUT: for (;;) switch (futureBasicReply.awaitInterruptibly()) {
+ case DONE: {
+ marshaller.write(1);
+ marshaller.writeObject(futureBasicReply.get());
+ marshaller.flush();
+ break OUT;
+ }
+ case CANCELLED: {
+ marshaller.write(2);
+ marshaller.writeInt(futureBasicReply.id);
+ marshaller.flush();
+ break OUT;
+ }
+ case FAILED: {
+ marshaller.write(3);
+ marshaller.writeObject(futureBasicReply.getException());
+ marshaller.flush();
+ break OUT;
+ }
+ case WAITING: {
+ // spurious wakeup, try again
+ continue;
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // todo log it
+ } catch (Exception e) {
+ // todo log it
+ } finally {
+ IoUtils.safeClose(streamChannel);
+ IoUtils.safeClose(requestHandlerHandle);
+ }
+ }
+}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerRequestConsumer.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerRequestConsumer.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerRequestConsumer.java 2008-10-21
04:56:10 UTC (rev 4604)
@@ -0,0 +1,121 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting.spi.remote.RequestHandler;
+import org.jboss.remoting.spi.remote.Handle;
+import org.jboss.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.remoting.spi.remote.ReplyHandler;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoUtils;
+import java.util.concurrent.BlockingQueue;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class BasicServerRequestConsumer implements Runnable {
+
+ private final Unmarshaller unmarshaller;
+ private final RequestHandler requestHandler;
+ private final BlockingQueue<FutureBasicReply> replyQueue;
+ private final StreamChannel streamChannel;
+ private final Handle<RequestHandler> requestHandlerHandle;
+
+ public BasicServerRequestConsumer(final Unmarshaller unmarshaller, final
RequestHandler requestHandler, final BlockingQueue<FutureBasicReply> replyQueue,
final StreamChannel streamChannel, final Handle<RequestHandler>
requestHandlerHandle) {
+ this.unmarshaller = unmarshaller;
+ this.requestHandler = requestHandler;
+ this.replyQueue = replyQueue;
+ this.streamChannel = streamChannel;
+ this.requestHandlerHandle = requestHandlerHandle;
+ }
+
+ public void run() {
+ try {
+ int requestSequence = 0;
+ for (;;) {
+ final int id = unmarshaller.read();
+ switch (id) {
+ case -1: {
+ // done.
+ return;
+ }
+ case 1: {
+ // one-way request
+ final Object request = unmarshaller.readObject();
+ requestHandler.receiveRequest(request);
+ break;
+ }
+ case 2: {
+ // two-way request
+ final int requestId = requestSequence++;
+ final Object request = unmarshaller.readObject();
+ final FutureBasicReply future = new FutureBasicReply(requestId);
+ replyQueue.add(future);
+ final RemoteRequestContext requestContext =
requestHandler.receiveRequest(request, new ReplyHandler() {
+
+ public void handleReply(final Object reply) {
+ future.setResult(reply);
+ }
+
+ public void handleException(final IOException exception) {
+ future.setException(exception);
+ }
+
+ public void handleCancellation() {
+ future.finishCancel();
+ }
+ });
+ future.requestContext = requestContext;
+ break;
+ }
+ case 3: {
+ // cancel request
+ final int requestId = unmarshaller.readInt();
+ // simply iterate over the outstanding requests until we match or
are past it...
+ for (FutureBasicReply future : replyQueue) {
+ final int queuedId = future.id;
+ if (queuedId == requestId) {
+ future.cancel();
+ break;
+ } else if (queuedId > requestId) {
+ break;
+ }
+ }
+ break;
+ }
+ default: {
+ // invalid byte
+ throw new IOException("Read an invalid byte from the
client");
+ }
+ }
+ }
+ } catch (Exception e) {
+ // todo log it
+ } finally {
+ IoUtils.safeClose(streamChannel);
+ IoUtils.safeClose(requestHandlerHandle);
+ }
+ }
+}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/FutureBasicReply.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/FutureBasicReply.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/FutureBasicReply.java 2008-10-21
04:56:10 UTC (rev 4604)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
+import org.jboss.remoting.spi.remote.RemoteRequestContext;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class FutureBasicReply extends AbstractIoFuture<Object> {
+
+ final int id;
+ RemoteRequestContext requestContext;
+
+ public FutureBasicReply(final int id) {
+ this.id = id;
+ }
+
+ protected boolean setException(final IOException exception) {
+ return super.setException(exception);
+ }
+
+ protected boolean setResult(final Object result) {
+ return super.setResult(result);
+ }
+
+ protected boolean finishCancel() {
+ return super.finishCancel();
+ }
+
+ public IoFuture<Object> cancel() {
+ requestContext.cancel();
+ return this;
+ }
+}