[jboss-remoting-commits] JBoss Remoting SVN: r4604 - in remoting3/trunk: protocol and 10 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Tue Oct 21 00:56:10 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-10-21 00:56:10 -0400 (Tue, 21 Oct 2008)
New Revision: 4604

Added:
   remoting3/trunk/protocol/basic/
   remoting3/trunk/protocol/basic/src/
   remoting3/trunk/protocol/basic/src/main/
   remoting3/trunk/protocol/basic/src/main/java/
   remoting3/trunk/protocol/basic/src/main/java/org/
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicConfiguration.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandlerReplyConsumer.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerReplyTransmitter.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerRequestConsumer.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/FutureBasicReply.java
   remoting3/trunk/protocol/basic/src/test/
   remoting3/trunk/protocol/basic/src/test/java/
Modified:
   remoting3/trunk/build.xml
Log:
Cleanup; add *really* basic protocol

Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml	2008-10-21 02:44:31 UTC (rev 4603)
+++ remoting3/trunk/build.xml	2008-10-21 04:56:10 UTC (rev 4604)
@@ -761,7 +761,6 @@
             <compilerarg value="-Xlint:unchecked"/>
             <classpath>
                 <path refid="api.classpath"/>
-                <!-- TODO: marshallers should be moved to their own module -->
                 <path refid="core.classpath"/>
                 <path refid="util.classpath"/>
                 <pathelement location="${lib.marshalling-api.local}"/>
@@ -866,7 +865,135 @@
         </path>
     </target>
 
+    <!-- protocol.basic module -->
 
+    <target name="protocol.basic.compile.depcheck">
+        <mkdir dir="protocol/basic/target/main"/>
+        <uptodate property="protocol/basic.compile.uptodate" targetfile="protocol/basic/target/main/.lastcompile">
+            <srcfiles dir="protocol/basic/src/main/java">
+                <include name="**/"/>
+                <include name="**/*.java"/>
+                <exclude name="**/.*"/>
+            </srcfiles>
+        </uptodate>
+    </target>
+
+    <target name="protocol.basic.compile" depends="protocol.basic.compile.depcheck" unless="protocol.basic.compile.uptodate">
+        <mkdir dir="protocol/basic/target/main/classes"/>
+        <javac
+                source="${javac.source}"
+                target="${javac.target}"
+                srcdir="protocol/basic/src/main/java"
+                destdir="protocol/basic/target/main/classes"
+                debug="true">
+            <compilerarg value="-Xlint:unchecked"/>
+            <classpath>
+                <path refid="api.classpath"/>
+                <path refid="core.classpath"/>
+                <path refid="util.classpath"/>
+                <pathelement location="${lib.marshalling-api.local}"/>
+                <pathelement location="${lib.xnio-api.local}"/>
+            </classpath>
+        </javac>
+        <touch file="protocol/basic/target/main/.lastcompile" verbose="false"/>
+    </target>
+
+    <target name="protocol.basic.test.compile.depcheck">
+        <mkdir dir="protocol/basic/target/test"/>
+        <uptodate property="protocol.basic.compile.uptodate" targetfile="protocol/basic/target/test/.lastcompile">
+            <srcfiles dir="protocol/basic/src/test/java">
+                <include name="**/"/>
+                <include name="**/*.java"/>
+                <exclude name="**/.*"/>
+            </srcfiles>
+        </uptodate>
+    </target>
+
+    <target name="protocol.basic.test.compile" depends="lib.junit,protocol.basic.compile,protocol.basic.test.compile.depcheck" unless="protocol.basic.test.compile.uptodate">
+        <mkdir dir="protocol/basic/target/test/classes"/>
+        <javac
+                source="${javac.source}"
+                target="${javac.target}"
+                srcdir="protocol/basic/src/test/java"
+                destdir="protocol/basic/target/test/classes"
+                debug="true">
+            <compilerarg value="-Xlint:unchecked"/>
+            <classpath>
+                <path refid="api.classpath"/>
+                <path refid="core.classpath"/>
+                <path refid="protocol.basic.classpath"/>
+                <path refid="util.classpath"/>
+                <path refid="testing-support.classpath"/>
+                <pathelement location="${lib.junit.local}"/>
+                <pathelement location="${lib.marshalling-api.local}"/>
+                <pathelement location="${lib.river.local}"/>
+                <pathelement location="${lib.xnio-api.local}"/>
+                <pathelement location="${lib.xnio-nio.local}"/>
+            </classpath>
+        </javac>
+        <touch file="protocol/basic/target/test/.lastcompile" verbose="false"/>
+    </target>
+
+    <target name="protocol.basic.test.pseudotarget">
+        <echo message="============================================="/>
+        <echo message="${message}"/>
+        <echo message="============================================="/>
+        <mkdir dir="protocol/basic/target/test-results"/>
+        <junit printsummary="true" fork="yes" includeantruntime="true">
+            <sysproperty key="build.home" value="${basedir}"/>
+            <sysproperty key="ant.library.dir" value="${ant.home}/lib"/>
+            <sysproperty key="lib.junit.local" value="${lib.junit.local}"/>
+            <sysproperty key="lib.marshalling-api.local" value="${lib.marshalling-api.local}"/>
+            <sysproperty key="lib.xnio-api.local" value="${lib.xnio-api.local}"/>
+            <sysproperty key="lib.xnio-nio.local" value="${lib.xnio-nio.local}"/>
+            <jvmarg line="${test.jvmargs}"/>
+            <formatter type="plain" extension="${extension}"/>
+            <classpath>
+                <path refid="api.classpath"/>
+                <path refid="core.classpath"/>
+                <path refid="protocol.basic.classpath"/>
+                <path refid="testing-support.classpath"/>
+                <path refid="util.classpath"/>
+                <pathelement location="protocol/basic/target/test/classes"/>
+                <pathelement location="${lib.junit.local}"/>
+                <pathelement location="${lib.marshalling-api.local}"/>
+                <pathelement location="${lib.river.local}"/>
+                <pathelement location="${lib.xnio-api.local}"/>
+                <pathelement location="${lib.xnio-nio.local}"/>
+            </classpath>
+            <batchtest fork="yes" todir="protocol/basic/target/test-results"
+                       haltonfailure="no">
+               <fileset dir="protocol/basic/target/test/classes">
+                   <include name="**/*TestCase.class"/>
+               </fileset>
+            </batchtest>
+        </junit>
+    </target>
+
+    <target name="protocol.basic.test" depends="lib.xnio-nio,api,core,protocol.basic,testing-support,util,protocol.basic.test.compile">
+        <antcall inheritall="true" inheritrefs="true" target="protocol.basic.test.pseudotarget">
+            <param name="extension" value=".txt"/>
+            <param name="message" value="Running with no security manager"/>
+            <param name="test.jvmargs" value="-Ddummy=dummy"/>
+        </antcall>
+        <antcall inheritall="true" inheritrefs="true" target="protocol.basic.test.pseudotarget">
+            <param name="extension" value="-security.txt"/>
+            <param name="message" value="Running with security manager"/>
+            <param name="test.jvmargs" value="-Djava.security.manager=org.jboss.remoting.test.support.LoggingSecurityManager -Djava.security.policy=${basedir}/testing-support/src/main/resources/testing.policy -Dsecurity.debug=policy"/>
+        </antcall>
+    </target>
+
+    <target name="protocol.basic.clean">
+        <delete dir="protocol/basic/target"/>
+    </target>
+
+    <target name="protocol.basic" description="Build the protocol.basic module" depends="lib.xnio-api,api,core,util,protocol.basic.compile">
+        <path id="protocol.basic.classpath">
+            <pathelement location="protocol/basic/target/main/classes"/>
+        </path>
+    </target>
+
+
     <!-- samples module -->
 
     <target name="samples.compile.depcheck">
@@ -1173,6 +1300,7 @@
                 debug="true">
             <compilerarg value="-Xlint:unchecked"/>
             <classpath>
+                <pathelement location="${lib.xnio-api.local}"/>
             </classpath>
         </javac>
         <touch file="util/target/main/.lastcompile" verbose="false"/>
@@ -1182,7 +1310,7 @@
         <delete dir="util/target"/>
     </target>
 
-    <target name="util" description="Build the utilities module" depends="util.compile">
+    <target name="util" description="Build the utilities module" depends="lib.xnio-api,util.compile">
         <path id="util.classpath">
             <pathelement location="util/target/main/classes"/>
         </path>
@@ -1222,7 +1350,7 @@
         <path id="version.classpath">
             <pathelement location="version/target/main/classes"/>
         </path>
-        <java classpathref="version.classpath" classname="org.jboss.cx.remoting.version.Version" outputproperty="version"/>
+        <java classpathref="version.classpath" classname="org.jboss.remoting.version.Version" outputproperty="version"/>
         <property name="version" value="UNKNOWN"/>
     </target>
 
@@ -1341,9 +1469,9 @@
 
     <!-- core -->
 
-    <target name="all-core" description="Build all core targets" depends="api,compat,core,mc-deployers,protocol.multiplex,samples,standalone,testing-support,tools,util"/>
+    <target name="all-core" description="Build all core targets" depends="api,compat,core,mc-deployers,protocol.basic,protocol.multiplex,samples,standalone,testing-support,tools,util"/>
 
-    <target name="clean-core" description="Clean all core targets" depends="api.clean,compat.clean,core.clean,mc-deployers.clean,protocol.multiplex.clean,samples.clean,standalone.clean,testing-support.clean,tools.clean,util.clean"/>
+    <target name="clean-core" description="Clean all core targets" depends="api.clean,compat.clean,core.clean,mc-deployers.clean,protocol.basic.clean,protocol.multiplex.clean,samples.clean,standalone.clean,testing-support.clean,tools.clean,util.clean"/>
 
     <!-- http -->
 
@@ -1365,6 +1493,6 @@
 
     <target name="clean" description="Clean out all build files" depends="clean-core,clean-http,version.clean,srp.clean"/>
 
-    <target name="test" description="Run all tests" depends="api.test,core.test,protocol.multiplex.test"/>
+    <target name="test" description="Run all tests" depends="api.test,core.test,protocol.basic.test,protocol.multiplex.test"/>
 
 </project>


Property changes on: remoting3/trunk/protocol/basic
___________________________________________________________________
Name: svn:ignore
   + target


Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicConfiguration.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicConfiguration.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicConfiguration.java	2008-10-21 04:56:10 UTC (rev 4604)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Configuration;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.remoting.spi.stream.StreamDetector;
+import java.util.concurrent.Executor;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public final class BasicConfiguration {
+    private MarshallerFactory marshallerFactory;
+    private Configuration marshallingConfiguration;
+    private int linkMetric;
+    private Executor executor;
+    private BufferAllocator<ByteBuffer> allocator;
+    private StreamDetector streamDetector;
+
+    public MarshallerFactory getMarshallerFactory() {
+        return marshallerFactory;
+    }
+
+    public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
+        this.marshallerFactory = marshallerFactory;
+    }
+
+    public Configuration getMarshallingConfiguration() {
+        return marshallingConfiguration;
+    }
+
+    public void setMarshallingConfiguration(final Configuration marshallingConfiguration) {
+        this.marshallingConfiguration = marshallingConfiguration;
+    }
+
+    public int getLinkMetric() {
+        return linkMetric;
+    }
+
+    public void setLinkMetric(final int linkMetric) {
+        this.linkMetric = linkMetric;
+    }
+
+    public Executor getExecutor() {
+        return executor;
+    }
+
+    public void setExecutor(final Executor executor) {
+        this.executor = executor;
+    }
+
+    public BufferAllocator<ByteBuffer> getAllocator() {
+        return allocator;
+    }
+
+    public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
+        this.allocator = allocator;
+    }
+
+    public StreamDetector getStreamDetector() {
+        return streamDetector;
+    }
+
+    public void setStreamDetector(final StreamDetector streamDetector) {
+        this.streamDetector = streamDetector;
+    }
+}

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandlerReplyConsumer.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandlerReplyConsumer.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandlerReplyConsumer.java	2008-10-21 04:56:10 UTC (rev 4604)
@@ -0,0 +1,141 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.Queue;
+import java.io.IOException;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoUtils;
+import org.jboss.remoting.spi.remote.ReplyHandler;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.RemoteExecutionException;
+import org.jboss.remoting.ReplyException;
+import org.jboss.remoting.IndeterminateOutcomeException;
+
+/**
+ *
+ */
+final class BasicHandlerReplyConsumer implements Runnable {
+
+    private final AtomicInteger replySequence;
+    private final Unmarshaller unmarshaller;
+    private final StreamChannel streamChannel;
+    private final Lock reqLock;
+    private final Queue<ReplyHandler> replyQueue;
+
+    public BasicHandlerReplyConsumer(final Unmarshaller unmarshaller, final StreamChannel streamChannel, final Lock reqLock, final Queue<ReplyHandler> replyQueue) {
+        this.unmarshaller = unmarshaller;
+        this.streamChannel = streamChannel;
+        this.reqLock = reqLock;
+        this.replyQueue = replyQueue;
+        replySequence = new AtomicInteger();
+    }
+
+    public void run() {
+        try {
+            for (;;) {
+                final int type = unmarshaller.read();
+                switch (type) {
+                    case -1: {
+                        // done.
+                        return;
+                    }
+                    case 1: {
+                        // reply - success
+                        reqLock.lock();
+                        try {
+                            replySequence.getAndIncrement();
+                            final ReplyHandler replyHandler = replyQueue.remove();
+                            final Object reply;
+                            try {
+                                reply = unmarshaller.readObject();
+                            } catch (Exception e) {
+                                SpiUtils.safeHandleException(replyHandler, new ReplyException("Failed to read reply from server", e));
+                                return;
+                            }
+                            SpiUtils.safeHandleReply(replyHandler, reply);
+                            break;
+                        } finally {
+                            reqLock.unlock();
+                        }
+                    }
+                    case 2: {
+                        // reply - cancelled
+                        reqLock.lock();
+                        try {
+                            final int id = unmarshaller.readInt();
+                            if (id != replySequence.getAndIncrement()) {
+                                replySequence.decrementAndGet();
+                                break;
+                            }
+                            final ReplyHandler replyHandler = replyQueue.remove();
+                            SpiUtils.safeHandleCancellation(replyHandler);
+                            break;
+                        } finally {
+                            reqLock.unlock();
+                        }
+                    }
+                    case 3: {
+                        // reply - exception
+                        reqLock.lock();
+                        try {
+                            replySequence.getAndIncrement();
+                            final ReplyHandler replyHandler = replyQueue.remove();
+                            final Throwable e;
+                            try {
+                                e = (Throwable) unmarshaller.readObject();
+                            } catch (Exception e2) {
+                                SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Failed to read exception from server", e2));
+                                return;
+                            }
+                            SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote execution failed", e));
+                            break;
+                        } finally {
+                            reqLock.unlock();
+                        }
+                    }
+                    default: {
+                        // invalid byte
+                        throw new IOException("Read an invalid byte from the server");
+                    }
+                }
+            }
+        } catch (Exception e) {
+            // todo log it
+        } finally {
+            IoUtils.safeClose(streamChannel);
+            reqLock.lock();
+            try {
+                while (replyQueue.size() > 0) {
+                    ReplyHandler replyHandler = replyQueue.remove();
+                    SpiUtils.safeHandleException(replyHandler, new IndeterminateOutcomeException("Connection terminated; operation outcome unknown"));
+                }
+            } finally {
+                reqLock.unlock();
+            }
+        }
+    }
+}

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java	2008-10-21 04:56:10 UTC (rev 4604)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import org.jboss.remoting.spi.remote.RequestHandler;
+import org.jboss.remoting.spi.remote.ReplyHandler;
+import org.jboss.remoting.spi.remote.Handle;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.channels.ChannelOutputStream;
+import org.jboss.xnio.channels.ChannelInputStream;
+import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Marshalling;
+import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.Executor;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Queue;
+import java.util.LinkedList;
+
+/**
+ * A very basic example protocol.
+ */
+public final class BasicProtocol {
+
+    private BasicProtocol() {
+    }
+
+    public static final void createServer(final Handle<RequestHandler> requestHandlerHandle, final StreamChannel streamChannel, final BasicConfiguration configuration) throws IOException {
+        final RequestHandler requestHandler = requestHandlerHandle.getResource();
+        final Configuration marshallerConfiguration = configuration.getMarshallingConfiguration();
+        final MarshallerFactory marshallerFactory = configuration.getMarshallerFactory();
+        final Marshaller marshaller = marshallerFactory.createMarshaller(marshallerConfiguration);
+        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallerConfiguration);
+        final Executor executor = configuration.getExecutor();
+        marshaller.start(Marshalling.createByteOutput(new ChannelOutputStream(streamChannel)));
+        unmarshaller.start(Marshalling.createByteInput(new ChannelInputStream(streamChannel)));
+        final BlockingQueue<FutureBasicReply> replyQueue = new LinkedBlockingQueue<FutureBasicReply>();
+        executor.execute(new BasicServerReplyTransmitter(replyQueue, marshaller, streamChannel, requestHandlerHandle));
+        executor.execute(new BasicServerRequestConsumer(unmarshaller, requestHandler, replyQueue, streamChannel, requestHandlerHandle));
+    }
+
+    public static final Handle<RequestHandler> createClient(final StreamChannel streamChannel, final BasicConfiguration configuration) throws IOException {
+        final Configuration marshallerConfiguration = configuration.getMarshallingConfiguration();
+        final MarshallerFactory marshallerFactory = configuration.getMarshallerFactory();
+        final Marshaller marshaller = marshallerFactory.createMarshaller(marshallerConfiguration);
+        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallerConfiguration);
+        final Executor executor = configuration.getExecutor();
+        marshaller.start(Marshalling.createByteOutput(new ChannelOutputStream(streamChannel)));
+        unmarshaller.start(Marshalling.createByteInput(new ChannelInputStream(streamChannel)));
+        final Lock reqLock = new ReentrantLock();
+        final Queue<ReplyHandler> replyQueue = new LinkedList<ReplyHandler>();
+        executor.execute(new BasicHandlerReplyConsumer(unmarshaller, streamChannel, reqLock, replyQueue));
+        return new BasicRequestHandler(reqLock, marshaller, replyQueue, streamChannel, executor).getHandle();
+    }
+}

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java	2008-10-21 04:56:10 UTC (rev 4604)
@@ -0,0 +1,110 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import org.jboss.remoting.spi.remote.RequestHandler;
+import org.jboss.remoting.spi.remote.ReplyHandler;
+import org.jboss.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.spi.AbstractAutoCloseable;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoUtils;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.Executor;
+import java.util.Queue;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class BasicRequestHandler extends AbstractAutoCloseable<RequestHandler> implements RequestHandler {
+
+    private final AtomicInteger requestSequence;
+    private final Lock reqLock;
+    private final Marshaller marshaller;
+    private final Queue<ReplyHandler> replyQueue;
+    private final StreamChannel streamChannel;
+
+    public BasicRequestHandler(final Lock reqLock, final Marshaller marshaller, final Queue<ReplyHandler> replyQueue, final StreamChannel streamChannel, final Executor executor) {
+        super(executor);
+        this.reqLock = reqLock;
+        this.marshaller = marshaller;
+        this.replyQueue = replyQueue;
+        this.streamChannel = streamChannel;
+        requestSequence = new AtomicInteger();
+    }
+
+    public void receiveRequest(final Object request) {
+        reqLock.lock();
+        try {
+            marshaller.write(1);
+            marshaller.writeObject(request);
+            marshaller.flush();
+        } catch (IOException e) {
+            // todo log it
+            IoUtils.safeClose(this);
+        } finally {
+            reqLock.unlock();
+        }
+    }
+
+    public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler replyHandler) {
+        reqLock.lock();
+        try {
+            marshaller.write(2);
+            marshaller.writeObject(request);
+            marshaller.flush();
+            final int id = requestSequence.getAndIncrement();
+            replyQueue.add(replyHandler);
+            return new RemoteRequestContext() {
+                public void cancel() {
+                    reqLock.lock();
+                    try {
+                        marshaller.write(3);
+                        marshaller.writeInt(id);
+                        marshaller.flush();
+                    } catch (IOException e) {
+                        // todo log it
+                        IoUtils.safeClose(BasicRequestHandler.this);
+                    }
+                }
+            };
+        } catch (IOException e) {
+            SpiUtils.safeHandleException(replyHandler, e);
+            IoUtils.safeClose(this);
+            return SpiUtils.getBlankRemoteRequestContext();
+        } finally {
+            reqLock.unlock();
+        }
+    }
+
+    protected void closeAction() throws IOException {
+        streamChannel.close();
+    }
+
+    public String toString() {
+        return "basic protocol handler <" + Integer.toString(hashCode(), 16) + ">";
+    }
+}

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerReplyTransmitter.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerReplyTransmitter.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerReplyTransmitter.java	2008-10-21 04:56:10 UTC (rev 4604)
@@ -0,0 +1,88 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import java.util.concurrent.BlockingQueue;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoUtils;
+import org.jboss.remoting.spi.remote.RequestHandler;
+import org.jboss.remoting.spi.remote.Handle;
+
+/**
+ *
+ */
+final class BasicServerReplyTransmitter implements Runnable {
+
+    private final BlockingQueue<FutureBasicReply> replyQueue;
+    private final Marshaller marshaller;
+    private final StreamChannel streamChannel;
+    private final Handle<RequestHandler> requestHandlerHandle;
+
+    public BasicServerReplyTransmitter(final BlockingQueue<FutureBasicReply> replyQueue, final Marshaller marshaller, final StreamChannel streamChannel, final Handle<RequestHandler> requestHandlerHandle) {
+        this.replyQueue = replyQueue;
+        this.marshaller = marshaller;
+        this.streamChannel = streamChannel;
+        this.requestHandlerHandle = requestHandlerHandle;
+    }
+
+    public void run() {
+        try {
+            for (;;) {
+                final FutureBasicReply futureBasicReply = replyQueue.remove();
+                OUT: for (;;) switch (futureBasicReply.awaitInterruptibly()) {
+                    case DONE: {
+                        marshaller.write(1);
+                        marshaller.writeObject(futureBasicReply.get());
+                        marshaller.flush();
+                        break OUT;
+                    }
+                    case CANCELLED: {
+                        marshaller.write(2);
+                        marshaller.writeInt(futureBasicReply.id);
+                        marshaller.flush();
+                        break OUT;
+                    }
+                    case FAILED: {
+                        marshaller.write(3);
+                        marshaller.writeObject(futureBasicReply.getException());
+                        marshaller.flush();
+                        break OUT;
+                    }
+                    case WAITING: {
+                        // spurious wakeup, try again
+                        continue;
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            // todo log it
+        } catch (Exception e) {
+            // todo log it
+        } finally {
+            IoUtils.safeClose(streamChannel);
+            IoUtils.safeClose(requestHandlerHandle);
+        }
+    }
+}

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerRequestConsumer.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerRequestConsumer.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicServerRequestConsumer.java	2008-10-21 04:56:10 UTC (rev 4604)
@@ -0,0 +1,121 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting.spi.remote.RequestHandler;
+import org.jboss.remoting.spi.remote.Handle;
+import org.jboss.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.remoting.spi.remote.ReplyHandler;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoUtils;
+import java.util.concurrent.BlockingQueue;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class BasicServerRequestConsumer implements Runnable {
+
+    private final Unmarshaller unmarshaller;
+    private final RequestHandler requestHandler;
+    private final BlockingQueue<FutureBasicReply> replyQueue;
+    private final StreamChannel streamChannel;
+    private final Handle<RequestHandler> requestHandlerHandle;
+
+    public BasicServerRequestConsumer(final Unmarshaller unmarshaller, final RequestHandler requestHandler, final BlockingQueue<FutureBasicReply> replyQueue, final StreamChannel streamChannel, final Handle<RequestHandler> requestHandlerHandle) {
+        this.unmarshaller = unmarshaller;
+        this.requestHandler = requestHandler;
+        this.replyQueue = replyQueue;
+        this.streamChannel = streamChannel;
+        this.requestHandlerHandle = requestHandlerHandle;
+    }
+
+    public void run() {
+        try {
+            int requestSequence = 0;
+            for (;;) {
+                final int id = unmarshaller.read();
+                switch (id) {
+                    case -1: {
+                        // done.
+                        return;
+                    }
+                    case 1: {
+                        // one-way request
+                        final Object request = unmarshaller.readObject();
+                        requestHandler.receiveRequest(request);
+                        break;
+                    }
+                    case 2: {
+                        // two-way request
+                        final int requestId = requestSequence++;
+                        final Object request = unmarshaller.readObject();
+                        final FutureBasicReply future = new FutureBasicReply(requestId);
+                        replyQueue.add(future);
+                        final RemoteRequestContext requestContext = requestHandler.receiveRequest(request, new ReplyHandler() {
+
+                            public void handleReply(final Object reply) {
+                                future.setResult(reply);
+                            }
+
+                            public void handleException(final IOException exception) {
+                                future.setException(exception);
+                            }
+
+                            public void handleCancellation() {
+                                future.finishCancel();
+                            }
+                        });
+                        future.requestContext = requestContext;
+                        break;
+                    }
+                    case 3: {
+                        // cancel request
+                        final int requestId = unmarshaller.readInt();
+                        // simply iterate over the outstanding requests until we match or are past it...
+                        for (FutureBasicReply future : replyQueue) {
+                            final int queuedId = future.id;
+                            if (queuedId == requestId) {
+                                future.cancel();
+                                break;
+                            } else if (queuedId > requestId) {
+                                break;
+                            }
+                        }
+                        break;
+                    }
+                    default: {
+                        // invalid byte
+                        throw new IOException("Read an invalid byte from the client");
+                    }
+                }
+            }
+        } catch (Exception e) {
+            // todo log it
+        } finally {
+            IoUtils.safeClose(streamChannel);
+            IoUtils.safeClose(requestHandlerHandle);
+        }
+    }
+}

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/FutureBasicReply.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/FutureBasicReply.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/FutureBasicReply.java	2008-10-21 04:56:10 UTC (rev 4604)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.basic;
+
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
+import org.jboss.remoting.spi.remote.RemoteRequestContext;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class FutureBasicReply extends AbstractIoFuture<Object> {
+
+    final int id;
+    RemoteRequestContext requestContext;
+
+    public FutureBasicReply(final int id) {
+        this.id = id;
+    }
+
+    protected boolean setException(final IOException exception) {
+        return super.setException(exception);
+    }
+
+    protected boolean setResult(final Object result) {
+        return super.setResult(result);
+    }
+
+    protected boolean finishCancel() {
+        return super.finishCancel();
+    }
+
+    public IoFuture<Object> cancel() {
+        requestContext.cancel();
+        return this;
+    }
+}




More information about the jboss-remoting-commits mailing list