Author: david.lloyd(a)jboss.com
Date: 2008-10-20 22:39:24 -0400 (Mon, 20 Oct 2008)
New Revision: 4601
Added:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamProvider.java
remoting3/trunk/protocol/multiplex/
remoting3/trunk/protocol/multiplex/src/
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/StreamContextImpl.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
Removed:
remoting3/trunk/protocol/basic/
remoting3/trunk/protocol/multiplex/src/
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/basic/
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicProtocol.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/basic/
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/RequestHandler.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamContext.java
remoting3/trunk/build.properties
remoting3/trunk/build.xml
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java
remoting3/trunk/testing-support/src/main/resources/testing.policy
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/OrderedExecutor.java
Log:
The basic protocol is really a multiplex protocol. Make room for a *real* basic
protocol.
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java 2008-10-20
13:51:33 UTC (rev 4600)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -28,6 +28,7 @@
import org.jboss.remoting.RequestContext;
import org.jboss.remoting.CloseHandler;
import org.jboss.xnio.log.Logger;
+import java.io.IOException;
/**
* Utility methods for Remoting service providers.
@@ -41,12 +42,11 @@
* Safely notify a reply handler of an exception.
*
* @param replyHandler the reply handler
- * @param msg the message
- * @param cause the cause
+ * @param exception
*/
- public static void safeHandleException(final ReplyHandler replyHandler, final String
msg, final Throwable cause) {
+ public static void safeHandleException(final ReplyHandler replyHandler, final
IOException exception) {
try {
- replyHandler.handleException(msg, cause);
+ replyHandler.handleException(exception);
} catch (Throwable t) {
log.error(t, "Failed to properly handle exception");
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java 2008-10-20
13:51:33 UTC (rev 4600)
+++
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -22,6 +22,8 @@
package org.jboss.remoting.spi.remote;
+import java.io.IOException;
+
/**
* A handler for replies from a request. The handler should respect the first invocation
made on it, and ignore
* any subsequent invocations.
@@ -36,13 +38,11 @@
void handleReply(Object reply);
/**
- * Handle a remote exception.
+ * Handle an exception.
*
- * @param msg the message
- * @param cause the cause
+ * @param exception an exception which describes the problem
*/
- // TODO - change to accept a RemotingException instead?
- void handleException(final String msg, Throwable cause);
+ void handleException(IOException exception);
/**
* Handle a cancellation request.
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/RequestHandler.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/RequestHandler.java 2008-10-20
13:51:33 UTC (rev 4600)
+++
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/RequestHandler.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -44,7 +44,7 @@
/**
* Receive a request from a remote system. This method is intended to be called by
protocol handlers. If the
* request cannot be accepted for some reason, the
- * {@link ReplyHandler#handleException(String, Throwable)}
+ * {@link ReplyHandler#handleException(java.io.IOException)}
* method is called immediately.
*
* @param request the request
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamContext.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamContext.java 2008-10-20
13:51:33 UTC (rev 4600)
+++
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamContext.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -23,7 +23,9 @@
package org.jboss.remoting.spi.stream;
import java.util.concurrent.Executor;
-import org.jboss.marshalling.MarshallerFactory;
+import java.io.IOException;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
/**
* A context for stream serialization.
@@ -31,16 +33,25 @@
public interface StreamContext {
/**
- * Get an executor which may be used for various asynchronous tasks.
+ * Get an executor which may be used by a stream serializer for various asynchronous
tasks.
*
* @return an executor
*/
Executor getExecutor();
/**
- * Get a marshaller factory which is configured compatibly with the channel.
+ * Create a marshaller which is configured compatibly with the channel.
*
- * @return the marshaller factory
+ * @return a marshaller
*/
- MarshallerFactory getMarshallerFactory();
+ Marshaller createMarshaller() throws IOException;
+
+ /**
+ * Create an unmarshaller which is configured compatibly with the channel.
+ *
+ * @return an unmarshaller
+ */
+ Unmarshaller createUnmarshaller() throws IOException;
+
+ // todo - getter & setter for child ObjectTable, ClassTable, ExternalizerFactory,
etc. for marshaller and unmarshaller
}
Added:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamProvider.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamProvider.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamProvider.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.spi.stream;
+
+import org.jboss.xnio.Connector;
+import org.jboss.xnio.Acceptor;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import java.io.IOException;
+
+/**
+ * A provider for streams.
+ *
+ * @param <A> the address type
+ */
+public interface StreamProvider<A> {
+ Connector<A, StreamChannel> getStreamChannelConnector();
+
+ Connector<A, AllocatedMessageChannel> getMessageChannelConnector();
+
+ Acceptor<A, StreamChannel> getStreamChannelAcceptor();
+
+ Acceptor<A, AllocatedMessageChannel> getMessageChannelAcceptor();
+}
Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/build.properties 2008-10-21 02:39:24 UTC (rev 4601)
@@ -115,7 +115,7 @@
lib.jboss-managed.local=${local.repository}/${lib.jboss-managed.local-path}
lib.jboss-managed.remote=${remote.repository}/${lib.jboss-managed.remote-path}
-lib.marshalling-api.version=1.0.0.Beta1
+lib.marshalling-api.version=1.0.0.Beta2
lib.marshalling-api.name=marshalling-api.jar
lib.marshalling-api.license=lgpl
lib.marshalling-api.dir=jboss/marshalling/${lib.marshalling-api.version}/lib
@@ -179,7 +179,7 @@
lib.trove.local=${local.repository}/${lib.trove.path}
lib.trove.remote=${remote.repository}/${lib.trove.path}
-lib.xnio.version=1.1.0.CR1
+lib.xnio.version=1.2.0.Alpha2008101601
lib.xnio-api.name=xnio-api-${lib.xnio.version}.jar
lib.xnio-api.license=lgpl
@@ -188,10 +188,10 @@
lib.xnio-api.local=${local.repository}/${lib.xnio-api.path}
lib.xnio-api.remote=${remote.repository}/${lib.xnio-api.path}
-lib.xnio-standalone.name=xnio-standalone-${lib.xnio.version}.jar
-lib.xnio-standalone.license=lgpl
-lib.xnio-standalone.dir=maven2/org/jboss/xnio/xnio-standalone/${lib.xnio.version}
-lib.xnio-standalone.path=${lib.xnio-standalone.dir}/${lib.xnio-standalone.name}
-lib.xnio-standalone.local=${local.repository}/${lib.xnio-standalone.path}
-lib.xnio-standalone.remote=${remote.repository}/${lib.xnio-standalone.path}
+lib.xnio-nio.name=xnio-nio-${lib.xnio.version}.jar
+lib.xnio-nio.license=lgpl
+lib.xnio-nio.dir=maven2/org/jboss/xnio/xnio-nio/${lib.xnio.version}
+lib.xnio-nio.path=${lib.xnio-nio.dir}/${lib.xnio-nio.name}
+lib.xnio-nio.local=${local.repository}/${lib.xnio-nio.path}
+lib.xnio-nio.remote=${remote.repository}/${lib.xnio-nio.path}
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/build.xml 2008-10-21 02:39:24 UTC (rev 4601)
@@ -211,16 +211,16 @@
<get src="${remote.license.dir}/${lib.xnio-api.license}.txt"
dest="${lib.xnio-api.local}.license.txt" usetimestamp="true"
ignoreerrors="false"/>
</target>
- <!-- External library: XNIO standalone -->
+ <!-- External library: XNIO nio -->
- <target name="lib.xnio-standalone-check">
- <available property="lib.xnio-standalone.exists"
file="${lib.xnio-standalone.local}"/>
+ <target name="lib.xnio-nio-check">
+ <available property="lib.xnio-nio.exists"
file="${lib.xnio-nio.local}"/>
</target>
- <target name="lib.xnio-standalone"
depends="lib.xnio-standalone-check"
unless="lib.xnio-standalone.exists">
- <mkdir dir="${local.repository}/${lib.xnio-standalone.dir}"/>
- <get src="${lib.xnio-standalone.remote}"
dest="${lib.xnio-standalone.local}" usetimestamp="true"
ignoreerrors="false"/>
- <get src="${remote.license.dir}/${lib.xnio-standalone.license}.txt"
dest="${lib.xnio-standalone.local}.license.txt" usetimestamp="true"
ignoreerrors="false"/>
+ <target name="lib.xnio-nio" depends="lib.xnio-nio-check"
unless="lib.xnio-nio.exists">
+ <mkdir dir="${local.repository}/${lib.xnio-nio.dir}"/>
+ <get src="${lib.xnio-nio.remote}"
dest="${lib.xnio-nio.local}" usetimestamp="true"
ignoreerrors="false"/>
+ <get src="${remote.license.dir}/${lib.xnio-nio.license}.txt"
dest="${lib.xnio-nio.local}.license.txt" usetimestamp="true"
ignoreerrors="false"/>
</target>
<!-- ============================================== -->
@@ -800,7 +800,8 @@
<pathelement location="${lib.junit.local}"/>
<pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.river.local}"/>
- <pathelement location="${lib.xnio-standalone.local}"/>
+ <pathelement location="${lib.xnio-api.local}"/>
+ <pathelement location="${lib.xnio-nio.local}"/>
</classpath>
</javac>
<touch file="protocol/basic/target/test/.lastcompile"
verbose="false"/>
@@ -816,7 +817,8 @@
<sysproperty key="ant.library.dir"
value="${ant.home}/lib"/>
<sysproperty key="lib.junit.local"
value="${lib.junit.local}"/>
<sysproperty key="lib.marshalling-api.local"
value="${lib.marshalling-api.local}"/>
- <sysproperty key="lib.xnio-standalone.local"
value="${lib.xnio-standalone.local}"/>
+ <sysproperty key="lib.xnio-api.local"
value="${lib.xnio-api.local}"/>
+ <sysproperty key="lib.xnio-nio.local"
value="${lib.xnio-nio.local}"/>
<jvmarg line="${test.jvmargs}"/>
<formatter type="plain" extension="${extension}"/>
<classpath>
@@ -829,7 +831,8 @@
<pathelement location="${lib.junit.local}"/>
<pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.river.local}"/>
- <pathelement location="${lib.xnio-standalone.local}"/>
+ <pathelement location="${lib.xnio-api.local}"/>
+ <pathelement location="${lib.xnio-nio.local}"/>
</classpath>
<batchtest fork="yes"
todir="protocol/basic/target/test-results"
haltonfailure="no">
@@ -840,7 +843,7 @@
</junit>
</target>
- <target name="protocol.basic.test"
depends="lib.xnio-standalone,api,core,protocol.basic,testing-support,util,protocol.basic.test.compile">
+ <target name="protocol.basic.test"
depends="lib.xnio-nio,api,core,protocol.basic,testing-support,util,protocol.basic.test.compile">
<antcall inheritall="true" inheritrefs="true"
target="protocol.basic.test.pseudotarget">
<param name="extension" value=".txt"/>
<param name="message" value="Running with no security
manager"/>
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java 2008-10-20
13:51:33 UTC (rev 4600)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -22,12 +22,12 @@
package org.jboss.remoting.core;
-import org.jboss.remoting.RemotingException;
import org.jboss.remoting.spi.remote.ReplyHandler;
import org.jboss.remoting.spi.remote.RemoteRequestContext;
import org.jboss.xnio.AbstractIoFuture;
import org.jboss.xnio.IoFuture;
import java.util.concurrent.Executor;
+import java.io.IOException;
/**
*
@@ -67,8 +67,8 @@
setResult((O) reply);
}
- public void handleException(final String exMsg, final Throwable exCause) {
- setException(new RemotingException(exMsg, exCause));
+ public void handleException(final IOException exception) {
+ setException(exception);
}
public void handleCancellation() {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java 2008-10-20
13:51:33 UTC (rev 4600)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -82,9 +82,9 @@
try {
requestListener.handleRequest(context, (I) request);
} catch (RemoteExecutionException e) {
- SpiUtils.safeHandleException(replyHandler, e.getMessage(),
e.getCause());
+ SpiUtils.safeHandleException(replyHandler, e);
} catch (Throwable t) {
- SpiUtils.safeHandleException(replyHandler, "Unexpected exception
in request listener", t);
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Request handler threw an exception", t));
}
}
});
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java 2008-10-20
13:51:33 UTC (rev 4600)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -26,6 +26,7 @@
import org.jboss.remoting.ClientContext;
import org.jboss.remoting.RemotingException;
import org.jboss.remoting.RequestCancelHandler;
+import org.jboss.remoting.RemoteExecutionException;
import org.jboss.remoting.core.util.TaggingExecutor;
import org.jboss.remoting.spi.remote.ReplyHandler;
import org.jboss.remoting.spi.SpiUtils;
@@ -81,7 +82,7 @@
public void sendFailure(final String msg, final Throwable cause) throws
RemotingException, IllegalStateException {
if (! closed.getAndSet(true)) {
if (replyHandler != null) {
- replyHandler.handleException(msg, cause);
+ replyHandler.handleException(new RemoteExecutionException(msg, cause));
}
} else {
throw new IllegalStateException("Reply already sent");
Copied: remoting3/trunk/protocol/multiplex (from rev 4573,
remoting3/trunk/protocol/basic)
Copied: remoting3/trunk/protocol/multiplex/src (from rev 4600,
remoting3/trunk/protocol/basic/src)
Copied:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex
(from rev 4573,
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic)
Deleted:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/AbstractConnection.java 2008-09-18
03:06:39 UTC (rev 4573)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -1,44 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.remoting.spi.AbstractSimpleCloseable;
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public abstract class AbstractConnection extends AbstractSimpleCloseable {
- /**
- * Basic constructor.
- *
- * @param executor the executor used to execute the close notification handlers
- */
- protected AbstractConnection(final Executor executor) {
- super(executor);
- }
-
- public String toString() {
- return "connection <" + Integer.toString(hashCode()) +
">";
- }
-}
Copied:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java
(from rev 4600,
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/AbstractConnection.java)
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.AbstractSimpleCloseable;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public abstract class AbstractConnection extends AbstractSimpleCloseable {
+ /**
+ * Basic constructor.
+ *
+ * @param executor the executor used to execute the close notification handlers
+ */
+ protected AbstractConnection(final Executor executor) {
+ super(executor);
+ }
+
+ public String toString() {
+ return "connection <" + Integer.toString(hashCode()) +
">";
+ }
+}
Deleted:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandler.java 2008-09-18
03:06:39 UTC (rev 4573)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicHandler.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -1,915 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import org.jboss.xnio.IoHandler;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.log.Logger;
-import org.jboss.remoting.spi.remote.RequestHandler;
-import org.jboss.remoting.spi.remote.RequestHandlerSource;
-import org.jboss.remoting.spi.remote.ReplyHandler;
-import org.jboss.remoting.spi.remote.RemoteRequestContext;
-import org.jboss.remoting.spi.remote.Handle;
-import org.jboss.remoting.spi.SpiUtils;
-import org.jboss.remoting.spi.AbstractAutoCloseable;
-import static org.jboss.remoting.util.CollectionUtil.concurrentIntegerMap;
-import org.jboss.remoting.util.CollectionUtil;
-import org.jboss.remoting.util.ConcurrentIntegerMap;
-import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.Endpoint;
-import org.jboss.remoting.SimpleCloseable;
-import org.jboss.marshalling.MarshallerFactory;
-import org.jboss.marshalling.ByteInput;
-import org.jboss.marshalling.Unmarshaller;
-import org.jboss.marshalling.ByteOutput;
-import org.jboss.marshalling.Marshaller;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.nio.ByteBuffer;
-import java.nio.BufferUnderflowException;
-import java.io.IOException;
-
-/**
- *
- */
-public final class BasicHandler implements IoHandler<AllocatedMessageChannel> {
-
- private static final Logger log = Logger.getLogger(BasicHandler.class);
-
- //--== Connection configuration items ==--
- private final MarshallerFactory marshallerFactory;
- private final int linkMetric;
- private final Executor executor;
- private final ClassLoader classLoader;
- // buffer allocator for outbound message assembly
- private final BufferAllocator<ByteBuffer> allocator;
-
- // running on remote node
- private final ConcurrentIntegerMap<ReplyHandler> remoteRequests =
concurrentIntegerMap();
- // running on local node
- private final ConcurrentIntegerMap<RemoteRequestContext> localRequests =
concurrentIntegerMap();
- // sequence for remote requests
- private final AtomicInteger requestSequence = new AtomicInteger();
-
- // clients whose requests get forwarded to the remote side
- // even #s were opened from services forwarded to us (our sequence)
- // odd #s were forwarded directly to us (remote sequence)
- private final ConcurrentIntegerMap<RequestHandler> remoteClients =
concurrentIntegerMap();
- // forwarded to remote side (handled on this side)
- private final ConcurrentIntegerMap<Handle<RequestHandler>>
forwardedClients = concurrentIntegerMap();
- // sequence for forwarded clients (unsigned; shift left one bit, add one)
- private final AtomicInteger forwardedClientSequence = new AtomicInteger();
- // sequence for clients created from services forwarded to us (unsigned; shift left
one bit)
- private final AtomicInteger remoteClientSequence = new AtomicInteger();
-
- // services forwarded to us
- private final ConcurrentIntegerMap<RequestHandlerSource> remoteServices =
concurrentIntegerMap();
- // forwarded to remote side (handled on this side)
- private final ConcurrentIntegerMap<Handle<RequestHandlerSource>>
forwardedServices = concurrentIntegerMap();
- // sequence for forwarded services
- private final AtomicInteger serviceSequence = new AtomicInteger();
-
- private volatile AllocatedMessageChannel channel;
-
- public BasicHandler(final RemotingChannelConfiguration configuration) {
- allocator = configuration.getAllocator();
- executor = configuration.getExecutor();
- classLoader = configuration.getClassLoader();
- marshallerFactory = configuration.getMarshallerFactory();
- linkMetric = configuration.getLinkMetric();
- }
-
- public void handleOpened(final AllocatedMessageChannel channel) {
- channel.resumeReads();
- }
-
- public void handleReadable(final AllocatedMessageChannel channel) {
- for (;;) try {
- final ByteBuffer buffer;
- try {
- buffer = channel.receive();
- } catch (IOException e) {
- log.error(e, "I/O error in protocol channel; closing
channel");
- IoUtils.safeClose(channel);
- return;
- }
- if (buffer == null) {
- // todo release all handles...
- // todo what if the write queue is not empty?
- IoUtils.safeClose(channel);
- return;
- }
- if (! buffer.hasRemaining()) {
- // would block
- channel.resumeReads();
- return;
- }
- final MessageType msgType;
- try {
- msgType = MessageType.getMessageType(buffer.get() & 0xff);
- } catch (IllegalArgumentException ex) {
- log.trace("Received invalid message type");
- return;
- }
- log.trace("Received message %s, type %s", buffer, msgType);
- switch (msgType) {
- case REQUEST_ONEWAY: {
- final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle =
forwardedClients.get(clientId);
- if (handle == null) {
- log.trace("Request on invalid client ID %d",
Integer.valueOf(clientId));
- return;
- }
- final Object payload;
- try {
- final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller();
- try {
- unmarshaller.start(createByteInput(buffer, true));
- try {
- payload = unmarshaller.readObject();
- } catch (ClassNotFoundException e) {
- log.trace("Class not found in one-way request for
client ID %d", Integer.valueOf(clientId));
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException ex) {
- log.error(ex, "Failed to unmarshal a one-way
request");
- break;
- }
- final RequestHandler requestHandler = handle.getResource();
- try {
- requestHandler.receiveRequest(payload);
- } catch (Throwable t) {
- log.error(t, "One-way request handler unexpectedly threw an
exception");
- }
- break;
- }
- case REQUEST: {
- final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle =
forwardedClients.get(clientId);
- if (handle == null) {
- log.trace("Request on invalid client ID %d",
Integer.valueOf(clientId));
- break;
- }
- final int requestId = buffer.getInt();
- final Object payload;
- try {
- final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller();
- try {
- unmarshaller.start(createByteInput(buffer, true));
- try {
- payload = unmarshaller.readObject();
- } catch (ClassNotFoundException e) {
- log.trace("Class not found in request ID %d for
client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
- // todo - send request receive failed message
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException ex) {
- log.trace("Failed to unmarshal a request (%s), sending
%s", ex, MessageType.REQUEST_RECEIVE_FAILED);
- // todo send a request failure message
- break;
- }
- final RequestHandler requestHandler = handle.getResource();
- requestHandler.receiveRequest(payload, new ReplyHandlerImpl(channel,
requestId, allocator));
- break;
- }
- case REPLY: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
- break;
- }
- final Object payload;
- try {
- final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller();
- try {
- unmarshaller.start(createByteInput(buffer, true));
- try {
- payload = unmarshaller.readObject();
- } catch (ClassNotFoundException e) {
- replyHandler.handleException("Reply unmarshalling
failed", e);
- log.trace("Class not found in reply to request ID
%d", Integer.valueOf(requestId));
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException ex) {
- log.trace("Failed to unmarshal a reply (%s), sending a
ReplyException");
- // todo
- SpiUtils.safeHandleException(replyHandler, null, null);
- break;
- }
- SpiUtils.safeHandleReply(replyHandler, payload);
- break;
- }
- case CANCEL_REQUEST: {
- final int requestId = buffer.getInt();
- final RemoteRequestContext context = localRequests.get(requestId);
- if (context != null) {
- context.cancel();
- }
- break;
- }
- case CANCEL_ACK: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.get(requestId);
- if (replyHandler != null) {
- replyHandler.handleCancellation();
- }
- break;
- }
- case REQUEST_RECEIVE_FAILED: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
- break;
- }
- final String reason = readUTFZ(buffer);
- // todo - throw a new ReplyException
- break;
- }
- case REQUEST_FAILED: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
- break;
- }
- final Throwable cause;
- try {
- final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller();
- try {
- unmarshaller.start(createByteInput(buffer, true));
- try {
- cause = (Throwable) unmarshaller.readObject();
- } catch (ClassNotFoundException e) {
- replyHandler.handleException("Exception reply
unmarshalling failed", e);
- log.trace("Class not found in exception reply to
request ID %d", Integer.valueOf(requestId));
- break;
- } catch (ClassCastException e) {
- // todo - report a generic exception
- SpiUtils.safeHandleException(replyHandler, null, null);
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException ex) {
- log.trace("Failed to unmarshal an exception reply (%s),
sending a generic execution exception");
- // todo
- SpiUtils.safeHandleException(replyHandler, null, null);
- break;
- }
- // todo - wrap with REE
- SpiUtils.safeHandleException(replyHandler, null, cause);
- break;
- }
- case REQUEST_OUTCOME_UNKNOWN: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
- break;
- }
- final String reason = readUTFZ(buffer);
- // todo - throw a new IndetermOutcomeEx
- break;
- }
- case CLIENT_CLOSE: {
- final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle =
forwardedClients.remove(clientId);
- if (handle == null) {
- log.warn("Got client close message for unknown client
%d", Integer.valueOf(clientId));
- break;
- }
- IoUtils.safeClose(handle);
- break;
- }
- case CLIENT_OPEN: {
- final int serviceId = buffer.getInt();
- final int clientId = buffer.getInt();
- final Handle<RequestHandlerSource> handle =
forwardedServices.get(serviceId);
- if (handle == null) {
- log.warn("Received client open message for unknown service
%d", Integer.valueOf(serviceId));
- break;
- }
- try {
- final RequestHandlerSource requestHandlerSource =
handle.getResource();
- final Handle<RequestHandler> clientHandle =
requestHandlerSource.createRequestHandler();
- // todo check for duplicate
- // todo validate the client ID
- log.trace("Opening client %d from service %d",
Integer.valueOf(clientId), Integer.valueOf(serviceId));
- forwardedClients.put(clientId, clientHandle);
- } catch (IOException ex) {
- log.error(ex, "Failed to create a request handler for client
ID %d", Integer.valueOf(clientId));
- break;
- } finally {
- IoUtils.safeClose(handle);
- }
- break;
- }
- case SERVICE_CLOSE: {
- final Handle<RequestHandlerSource> handle =
forwardedServices.remove(buffer.getInt());
- if (handle == null) {
- break;
- }
- IoUtils.safeClose(handle);
- break;
- }
- case SERVICE_ADVERTISE: {
- final int serviceId = buffer.getInt();
- final String serviceType = readUTFZ(buffer);
- final String groupName = readUTFZ(buffer);
- final String endpointName = readUTFZ(buffer);
- final int baseMetric = buffer.getInt();
- Endpoint endpoint = null;
- int id = -1;
- final RequestHandlerSource handlerSource = new
RequestHandlerSourceImpl(allocator, id);
- final int calcMetric = baseMetric + linkMetric;
- if (calcMetric > 0) {
- try {
- final SimpleCloseable closeable =
endpoint.registerRemoteService(serviceType, groupName, endpointName, handlerSource,
calcMetric);
- // todo - something with that closeable
- } catch (IOException e) {
- log.error(e, "Unable to register remote service");
- }
- }
- break;
- }
- case SERVICE_UNADVERTISE: {
- final int serviceId = buffer.getInt();
- IoUtils.safeClose(remoteServices.get(serviceId));
- break;
- }
- default: {
- log.trace("Received invalid message type %s", msgType);
- }
- }
- } catch (BufferUnderflowException e) {
- log.error(e, "Malformed packet");
- }
- }
-
- public void handleWritable(final AllocatedMessageChannel channel) {
- for (;;) {
- final WriteHandler handler = outputQueue.peek();
- if (handler == null) {
- return;
- }
- try {
- if (handler.handleWrite(channel)) {
- log.trace("Handled write with handler %s", handler);
- pending.decrementAndGet();
- outputQueue.remove();
- } else {
- channel.resumeWrites();
- return;
- }
- } catch (Throwable t) {
- pending.decrementAndGet();
- outputQueue.remove();
- }
- }
- }
-
- public void handleClosed(final AllocatedMessageChannel channel) {
- }
-
- RequestHandlerSource getRemoteService(final int id) {
- return new RequestHandlerSourceImpl(allocator, id);
- }
-
- private final class ReplyHandlerImpl implements ReplyHandler {
-
- private final AllocatedMessageChannel channel;
- private final int requestId;
- private final BufferAllocator<ByteBuffer> allocator;
-
- private ReplyHandlerImpl(final AllocatedMessageChannel channel, final int
requestId, final BufferAllocator<ByteBuffer> allocator) {
- if (channel == null) {
- throw new NullPointerException("channel is null");
- }
- if (allocator == null) {
- throw new NullPointerException("allocator is null");
- }
- this.channel = channel;
- this.requestId = requestId;
- this.allocator = allocator;
- }
-
- public void handleReply(final Object reply) {
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.REPLY.getId());
- buffer.putInt(requestId);
- try {
- final org.jboss.marshalling.Marshaller marshaller =
marshallerFactory.createMarshaller();
- try {
- final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.start(output);
- marshaller.writeObject(reply);
- marshaller.close();
- output.close();
- registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (IOException e) {
- log.error(e, "Failed to send a reply to the remote side");
- } catch (InterruptedException e) {
- log.error(e, "Reply handler thread interrupted before a reply could
be sent");
- Thread.currentThread().interrupt();
- }
- }
-
- public void handleException(final String msg, final Throwable cause) {
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.REQUEST_FAILED.getId());
- buffer.putInt(requestId);
- try {
- final org.jboss.marshalling.Marshaller marshaller =
marshallerFactory.createMarshaller();
- try {
- final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.start(output);
- marshaller.writeObject(cause);
- marshaller.close();
- output.close();
- registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (IOException e) {
- log.error(e, "Failed to send an exception to the remote
side");
- } catch (InterruptedException e) {
- log.error(e, "Reply handler thread interrupted before an exception
could be sent");
- Thread.currentThread().interrupt();
- }
- }
-
- public void handleCancellation() {
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CANCEL_ACK.getId());
- buffer.putInt(requestId);
- buffer.flip();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- } catch (InterruptedException e) {
- // todo log
- Thread.currentThread().interrupt();
- }
- }
- }
-
- // Writer members
-
- private final BlockingQueue<WriteHandler> outputQueue =
CollectionUtil.blockingQueue(64);
- private final AtomicInteger pending = new AtomicInteger();
-
- private void registerWriter(final AllocatedMessageChannel channel, final WriteHandler
writeHandler) throws InterruptedException {
- outputQueue.put(writeHandler);
- if (pending.getAndIncrement() == 0) {
- channel.resumeWrites();
- }
- }
-
- private int writeUTFZ(ByteBuffer buffer, CharSequence s) {
- final int len = s.length();
- for (int i = 0; i < len; i++) {
- char c = s.charAt(i);
- if (1 <= c && c < 0x80) {
- if (buffer.hasRemaining()) {
- buffer.put((byte) c);
- } else {
- return i;
- }
- } else if (c < 0x0800) {
- if (buffer.remaining() >= 2) {
- buffer.put((byte) (0xc0 | (c >> 6)));
- buffer.put((byte) (0x80 | (c & 0x3f)));
- } else {
- return i;
- }
- } else {
- if (buffer.remaining() >= 3) {
- buffer.put((byte) (0xe0 | (c >> 12)));
- buffer.put((byte) (0x80 | ((c >> 6) & 0x3f)));
- buffer.put((byte) (0x80 | (c & 0x3f)));
- } else {
- return i;
- }
- }
- }
- if (buffer.hasRemaining()) {
- buffer.put((byte) 0);
- return -1;
- } else {
- return len;
- }
- }
-
- // Reader utils
-
- private String readUTFZ(ByteBuffer buffer) {
- StringBuilder builder = new StringBuilder();
- int state = 0, a = 0;
- while (buffer.hasRemaining()) {
- final int v = buffer.get() & 0xff;
- switch (state) {
- case 0: {
- if (v == 0) {
- return builder.toString();
- } else if (v < 128) {
- builder.append((char) v);
- } else if (192 <= v && v < 224) {
- a = v << 6;
- state = 1;
- } else if (224 <= v && v < 232) {
- a = v << 12;
- state = 2;
- } else {
- builder.append('?');
- }
- break;
- }
- case 1: {
- if (v == 0) {
- builder.append('?');
- return builder.toString();
- } else if (128 <= v && v < 192) {
- a |= v & 0x3f;
- builder.append((char) a);
- } else {
- builder.append('?');
- }
- state = 0;
- break;
- }
- case 2: {
- if (v == 0) {
- builder.append('?');
- return builder.toString();
- } else if (128 <= v && v < 192) {
- a |= (v & 0x3f) << 6;
- state = 1;
- } else {
- builder.append('?');
- state = 0;
- }
- break;
- }
- default:
- throw new IllegalStateException("wrong state");
- }
- }
- return builder.toString();
- }
-
- // client endpoint
-
- private final class RequestHandlerImpl extends
AbstractAutoCloseable<RequestHandler> implements RequestHandler {
-
- private final int identifier;
- private final BufferAllocator<ByteBuffer> allocator;
-
- public RequestHandlerImpl(final int identifier, final
BufferAllocator<ByteBuffer> allocator) {
- super(executor);
- if (allocator == null) {
- throw new NullPointerException("allocator is null");
- }
- this.identifier = identifier;
- this.allocator = allocator;
- addCloseHandler(new CloseHandler<RequestHandler>() {
- public void handleClose(final RequestHandler closed) {
- remoteClients.remove(identifier, this);
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
- buffer.putInt(identifier);
- buffer.flip();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator,
buffer));
- } catch (InterruptedException e) {
- log.warn("Client close notification was interrupted before
it could be sent");
- }
- }
- });
- }
-
- public void receiveRequest(final Object request) {
- log.trace("Sending outbound one-way request of type %s", request ==
null ? "null" : request.getClass());
- try {
- final List<ByteBuffer> bufferList;
- final Marshaller marshaller = marshallerFactory.createMarshaller();
- try {
- bufferList = new ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.write(MessageType.REQUEST_ONEWAY.getId());
- marshaller.writeInt(identifier);
- marshaller.writeObject(request);
- marshaller.close();
- output.close();
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
- } catch (InterruptedException e) {
- log.trace(e, "receiveRequest was interrupted");
- Thread.currentThread().interrupt();
- return;
- }
- } catch (Throwable t) {
- // ignore
- log.trace(t, "receiveRequest failed with an exception");
- return;
- }
- }
-
- public RemoteRequestContext receiveRequest(final Object request, final
ReplyHandler handler) {
- log.trace("Sending outbound request of type %s", request == null ?
"null" : request.getClass());
- try {
- final List<ByteBuffer> bufferList;
- final Marshaller marshaller = marshallerFactory.createMarshaller();
- try {
- bufferList = new ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.write(MessageType.REQUEST.getId());
- marshaller.writeInt(identifier);
-
- int id;
- do {
- id = requestSequence.getAndIncrement();
- } while (remoteRequests.putIfAbsent(id, handler) != null);
- marshaller.writeInt(id);
- marshaller.writeObject(request);
- marshaller.close();
- output.close();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- executor.execute(new Runnable() {
- public void run() {
- SpiUtils.safeHandleCancellation(handler);
- }
- });
- return SpiUtils.getBlankRemoteRequestContext();
- }
- log.trace("Sent request %s", request);
- return new RemoteRequestContextImpl(id, allocator, channel);
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (final IOException t) {
- log.trace(t, "receiveRequest failed with an exception");
- executor.execute(new Runnable() {
- public void run() {
- SpiUtils.safeHandleException(handler, "Failed to build
request", t);
- }
- });
- return SpiUtils.getBlankRemoteRequestContext();
- }
- }
-
- public String toString() {
- return "forwarding request handler <" +
Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
- }
- }
-
- public final class RemoteRequestContextImpl implements RemoteRequestContext {
-
- private final BufferAllocator<ByteBuffer> allocator;
- private final int id;
- private final AllocatedMessageChannel channel;
-
- public RemoteRequestContextImpl(final int id, final
BufferAllocator<ByteBuffer> allocator, final AllocatedMessageChannel channel) {
- this.id = id;
- this.allocator = allocator;
- this.channel = channel;
- }
-
- public void cancel() {
- try {
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
- buffer.putInt(id);
- buffer.flip();
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- } catch (InterruptedException e) {
- // todo log that cancel attempt failed
- Thread.currentThread().interrupt();
- } catch (Throwable t) {
- // todo log that cancel attempt failed
- }
- }
- }
-
- public final class RequestHandlerSourceImpl extends
AbstractAutoCloseable<RequestHandlerSource> implements RequestHandlerSource {
-
- private final BufferAllocator<ByteBuffer> allocator;
- private final int identifier;
-
- protected RequestHandlerSourceImpl(final BufferAllocator<ByteBuffer>
allocator, final int identifier) {
- super(executor);
- this.allocator = allocator;
- this.identifier = identifier;
- addCloseHandler(new CloseHandler<RequestHandlerSource>() {
- public void handleClose(final RequestHandlerSource closed) {
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.SERVICE_CLOSE.getId());
- buffer.putInt(identifier);
- buffer.flip();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator,
buffer));
- } catch (InterruptedException e) {
- log.warn("Service close notification was interrupted before
it could be sent");
- }
- }
- });
- }
-
- public Handle<RequestHandler> createRequestHandler() throws IOException {
- int id;
- do {
- id = remoteClientSequence.getAndIncrement() << 1;
- } while (remoteClients.putIfAbsent(id, new RequestHandlerImpl(id,
BasicHandler.this.allocator)) != null);
- final int clientId = id;
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CLIENT_OPEN.getId());
- buffer.putInt(identifier);
- buffer.putInt(clientId);
- buffer.flip();
- // todo - probably should bail out if we're interrupted?
- boolean intr = false;
- for (;;) {
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- try {
- return new RequestHandlerImpl(clientId, allocator).getHandle();
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- } catch (InterruptedException e) {
- intr = true;
- }
- }
- }
-
- public String toString() {
- return "forwarding request handler source <" +
Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
- }
- }
-
- public static ByteInput createByteInput(final ByteBuffer buffer, final boolean eof)
{
- return new ByteInput() {
- public int read() throws IOException {
- if (buffer.hasRemaining()) {
- return buffer.get() & 0xff;
- } else {
- return eof ? -1 : 0;
- }
- }
-
- public int read(final byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- public int read(final byte[] b, final int off, final int len) throws
IOException {
- int r = Math.min(buffer.remaining(), len);
- if (r > 0) {
- buffer.get(b, off, r);
- return r;
- } else {
- return eof ? -1 : 0;
- }
- }
-
- public int available() throws IOException {
- return buffer.remaining();
- }
-
- public long skip(final long n) throws IOException {
- final int cnt = n > (long) Integer.MAX_VALUE ? Integer.MAX_VALUE :
(int) n;
- int r = Math.min(buffer.remaining(), cnt);
- if (r > 0) {
- final int oldPos = buffer.position();
- final int newPos = oldPos + r;
- if (newPos < 0) {
- final int lim = buffer.limit();
- buffer.position(lim);
- return lim - oldPos;
- }
- }
- return r;
- }
-
- public void close() {
- }
- };
- }
-
- public static ByteOutput createByteOutput(final BufferAllocator<ByteBuffer>
allocator, final Collection<ByteBuffer> target) {
- return new ByteOutput() {
- private ByteBuffer current;
-
- private ByteBuffer getCurrent() {
- final ByteBuffer buffer = current;
- return buffer == null ? (current = allocator.allocate()) : buffer;
- }
-
- public void write(final int i) throws IOException {
- final ByteBuffer buffer = getCurrent();
- buffer.put((byte) i);
- if (! buffer.hasRemaining()) {
- buffer.flip();
- target.add(buffer);
- current = null;
- }
- }
-
- public void write(final byte[] bytes) throws IOException {
- write(bytes, 0, bytes.length);
- }
-
- public void write(final byte[] bytes, int offs, int len) throws IOException
{
- while (len > 0) {
- final ByteBuffer buffer = getCurrent();
- final int c = Math.min(len, buffer.remaining());
- buffer.put(bytes, offs, c);
- offs += c;
- len -= c;
- if (! buffer.hasRemaining()) {
- buffer.flip();
- target.add(buffer);
- current = null;
- }
- }
- }
-
- public void close() throws IOException {
- flush();
- }
-
- public void flush() throws IOException {
- final ByteBuffer buffer = current;
- if (buffer != null) {
- buffer.flip();
- target.add(buffer);
- current = null;
- }
- }
- };
- }
-}
Deleted:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicProtocol.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java 2008-09-18
03:06:39 UTC (rev 4573)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicProtocol.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -1,96 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.remoting.RemotingException;
-import org.jboss.remoting.SimpleCloseable;
-import org.jboss.remoting.spi.remote.RequestHandlerSource;
-import org.jboss.remoting.spi.remote.Handle;
-import org.jboss.xnio.IoHandlerFactory;
-import org.jboss.xnio.ChannelSource;
-import org.jboss.xnio.IoFuture;
-import org.jboss.xnio.AbstractConvertingIoFuture;
-import org.jboss.xnio.IoHandler;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.log.Logger;
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public final class BasicProtocol {
-
- private static final Logger log = Logger.getLogger(BasicProtocol.class);
-
- private BasicProtocol() {
- }
-
- /**
- * Create a request server for the basic protocol.
- *
- * @param executor the executor to use for invocations
- * @param allocator the buffer allocator to use
- * @return a handler factory for passing to an XNIO server
- */
- public static IoHandlerFactory<AllocatedMessageChannel> createServer(final
Executor executor, final BufferAllocator<ByteBuffer> allocator) {
- return new IoHandlerFactory<AllocatedMessageChannel>() {
- public IoHandler<? super AllocatedMessageChannel> createHandler() {
- final RemotingChannelConfiguration configuration = new
RemotingChannelConfiguration();
- configuration.setAllocator(allocator);
- configuration.setExecutor(executor);
- // todo marshaller factory... etc
- return new BasicHandler(configuration);
- }
- };
- }
-
- /**
- * Create a request client for the basic protocol.
- *
- * @param executor the executor to use for invocations
- * @param channelSource the XNIO channel source to use to establish the connection
- * @param allocator the buffer allocator to use
- * @return a handle which may be used to close the connection
- * @throws IOException if an error occurs
- */
- public static IoFuture<SimpleCloseable> connect(final Executor executor, final
ChannelSource<AllocatedMessageChannel> channelSource, final
BufferAllocator<ByteBuffer> allocator) throws IOException {
- final RemotingChannelConfiguration configuration = new
RemotingChannelConfiguration();
- configuration.setAllocator(allocator);
- configuration.setExecutor(executor);
- // todo marshaller factory... etc
- final BasicHandler basicHandler = new BasicHandler(configuration);
- final IoFuture<AllocatedMessageChannel> futureChannel =
channelSource.open(basicHandler);
- return new AbstractConvertingIoFuture<SimpleCloseable,
AllocatedMessageChannel>(futureChannel) {
- protected SimpleCloseable convert(final AllocatedMessageChannel channel)
throws RemotingException {
- return new AbstractConnection(executor) {
- public Handle<RequestHandlerSource> getServiceForId(final int
id) throws IOException {
- return basicHandler.getRemoteService(id).getHandle();
- }
- };
- }
- };
- }
-}
Deleted:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/ConfigValue.java 2008-09-18
03:06:39 UTC (rev 4573)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -1,67 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-/**
- *
- */
-public enum ConfigValue {
-
- /**
- * The protocol version to use. Value type is {@code int}.
- */
- PROTOCOL_VERSION(0),
- /**
- * The name of the marshaller to use. Value type is {@code String}.
- */
- MARSHALLER_NAME(1),
- ;
- private final int id;
-
- private ConfigValue(final int id) {
- this.id = id;
- }
-
- /**
- * Get the integer ID for this config value.
- *
- * @return the integer ID
- */
- public int getId() {
- return id;
- }
-
- /**
- * Get the config value for an integer ID.
- *
- * @param id the integer ID
- * @return the config value instance
- */
- public static ConfigValue getConfigValue(final int id) {
- switch (id) {
- case 0: return PROTOCOL_VERSION;
- case 1: return MARSHALLER_NAME;
- default: throw new IllegalArgumentException("Invalid config value
ID");
- }
- }
-}
Copied:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java
(from rev 4600,
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/ConfigValue.java)
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+/**
+ *
+ */
+public enum ConfigValue {
+
+ /**
+ * The protocol version to use. Value type is {@code int}.
+ */
+ PROTOCOL_VERSION(0),
+ /**
+ * The name of the marshaller to use. Value type is {@code String}.
+ */
+ MARSHALLER_NAME(1),
+ ;
+ private final int id;
+
+ private ConfigValue(final int id) {
+ this.id = id;
+ }
+
+ /**
+ * Get the integer ID for this config value.
+ *
+ * @return the integer ID
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Get the config value for an integer ID.
+ *
+ * @param id the integer ID
+ * @return the config value instance
+ */
+ public static ConfigValue getConfigValue(final int id) {
+ switch (id) {
+ case 0: return PROTOCOL_VERSION;
+ case 1: return MARSHALLER_NAME;
+ default: throw new IllegalArgumentException("Invalid config value
ID");
+ }
+ }
+}
Deleted:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/ConnectionListener.java 2008-09-18
03:06:39 UTC (rev 4573)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -1,32 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.remoting.SimpleCloseable;
-
-/**
- *
- */
-public interface ConnectionListener {
- void handleOpened(SimpleCloseable connection);
-}
Copied:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java
(from rev 4600,
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/ConnectionListener.java)
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.SimpleCloseable;
+
+/**
+ *
+ */
+public interface ConnectionListener {
+ void handleOpened(SimpleCloseable connection);
+}
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,154 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.util.Arrays;
+
+/**
+ *
+ */
+public final class IdentityIntMap<T> {
+
+ private int[] values;
+ private Object[] keys;
+ private int count;
+ private int resizeCount;
+
+ public IdentityIntMap(int initialCapacity, final float loadFactor) {
+ if (initialCapacity < 1) {
+ throw new IllegalArgumentException("initialCapacity must be >
0");
+ }
+ if (loadFactor <= 0.0f || loadFactor >= 1.0f) {
+ throw new IllegalArgumentException("loadFactor must be > 0.0 and <
1.0");
+ }
+ if (initialCapacity < 16) {
+ initialCapacity = 16;
+ } else {
+ // round up
+ final int c = Integer.highestOneBit(initialCapacity) - 1;
+ initialCapacity = Integer.highestOneBit(initialCapacity + c);
+ }
+ keys = new Object[initialCapacity];
+ values = new int[initialCapacity];
+ resizeCount = (int) ((double) initialCapacity * (double) loadFactor);
+ }
+
+ public IdentityIntMap(final float loadFactor) {
+ this(64, loadFactor);
+ }
+
+ public IdentityIntMap(final int initialCapacity) {
+ this(initialCapacity, 0.5f);
+ }
+
+ public IdentityIntMap() {
+ this(0.5f);
+ }
+
+ public int get(T key, int defVal) {
+ if (key == null) {
+ throw new NullPointerException("key is null");
+ }
+ final Object[] keys = this.keys;
+ final int mask = keys.length - 1;
+ int hc = System.identityHashCode(key) & mask;
+ Object v;
+ for (;;) {
+ v = keys[hc];
+ if (v == key) {
+ return values[hc];
+ }
+ if (v == null) {
+ // not found
+ return defVal;
+ }
+ hc = (hc + 1) & mask;
+ }
+ }
+
+ public void put(T key, int value) {
+ if (key == null) {
+ throw new NullPointerException("key is null");
+ }
+ final Object[] keys = this.keys;
+ final int mask = keys.length - 1;
+ final int[] values = this.values;
+ Object v;
+ int hc = System.identityHashCode(key) & mask;
+ for (int idx = hc;; idx = hc++ & mask) {
+ v = keys[idx];
+ if (v == null) {
+ keys[idx] = key;
+ values[idx] = value;
+ if (++count > resizeCount) {
+ resize();
+ }
+ return;
+ }
+ if (v == key) {
+ values[idx] = value;
+ return;
+ }
+ hc++;
+ }
+ }
+
+ private final void resize() {
+ final Object[] oldKeys = keys;
+ final int oldsize = oldKeys.length;
+ final int[] oldValues = values;
+ if (oldsize >= 0x40000000) {
+ throw new IllegalStateException("Table full");
+ }
+ final int newsize = oldsize << 1;
+ final int mask = newsize - 1;
+ final Object[] newKeys = new Object[newsize];
+ final int[] newValues = new int[newsize];
+ keys = newKeys;
+ values = newValues;
+ if ((resizeCount <<= 1) == 0) {
+ resizeCount = Integer.MAX_VALUE;
+ }
+ for (int oi = 0; oi < oldsize; oi ++) {
+ final Object key = oldKeys[oi];
+ if (key != null) {
+ int ni = System.identityHashCode(key) & mask;
+ for (;;) {
+ final Object v = newKeys[ni];
+ if (v == null) {
+ // found
+ newKeys[ni] = key;
+ newValues[ni] = oldValues[oi];
+ break;
+ }
+ ni = (ni + 1) & mask;
+ }
+ }
+ }
+ }
+
+ public void clear() {
+ Arrays.fill(keys, null);
+ count = 0;
+ }
+}
Deleted:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/MessageType.java 2008-09-18
03:06:39 UTC (rev 4573)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -1,91 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-/**
- * The type of a protocol message.
- */
-public enum MessageType {
-
- // One-way request, no return value may be sent
- REQUEST_ONEWAY(1),
- // Two-way request, return value is expected
- REQUEST(2),
- // Reply
- REPLY(3),
- // Attempt to cancel a request
- CANCEL_REQUEST(4),
- // Acknowledge that a request was cancelled
- CANCEL_ACK(5),
- // Request failed due to protocol or unmarshalling problem
- REQUEST_RECEIVE_FAILED(6),
- // Request failed due to exception
- REQUEST_FAILED(7),
- // Request completed but no reply or exception was sent
- REQUEST_OUTCOME_UNKNOWN(8),
- // Remote side called .close() on a forwarded RequestHandler
- CLIENT_CLOSE(9),
- // Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
- CLIENT_OPEN(10),
- // Remote side called .close() on a forwarded RequestHandlerSource
- SERVICE_CLOSE(11),
- // Remote side brought a new service online
- SERVICE_ADVERTISE(12),
- // Remote side's service is no longer available
- SERVICE_UNADVERTISE(13),
- ;
- private final int id;
-
- private MessageType(int id) {
- this.id = id;
- }
-
- public int getId() {
- return id;
- }
-
- /**
- * Get the message type for an integer ID.
- *
- * @param id the integer ID
- * @return the message type instance
- */
- public static MessageType getMessageType(final int id) {
- switch (id) {
- case 1: return REQUEST_ONEWAY;
- case 2: return REQUEST;
- case 3: return REPLY;
- case 4: return CANCEL_REQUEST;
- case 5: return CANCEL_ACK;
- case 6: return REQUEST_RECEIVE_FAILED;
- case 7: return REQUEST_FAILED;
- case 8: return REQUEST_OUTCOME_UNKNOWN;
- case 9: return CLIENT_CLOSE;
- case 10: return CLIENT_OPEN;
- case 11: return SERVICE_CLOSE;
- case 12: return SERVICE_ADVERTISE;
- case 13: return SERVICE_UNADVERTISE;
- default: throw new IllegalArgumentException("Invalid message type
ID");
- }
- }
-}
Copied:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
(from rev 4600,
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/MessageType.java)
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+/**
+ * The type of a protocol message.
+ */
+public enum MessageType {
+
+ // One-way request, no return value may be sent
+ REQUEST_ONEWAY(1),
+ // Two-way request, return value is expected
+ REQUEST(2),
+ // Reply
+ REPLY(3),
+ // Attempt to cancel a request
+ CANCEL_REQUEST(4),
+ // Acknowledge that a request was cancelled
+ CANCEL_ACK(5),
+ // Request failed due to protocol or unmarshalling problem
+ REQUEST_RECEIVE_FAILED(6),
+ // Request failed due to exception
+ REQUEST_FAILED(7),
+ // Request completed but no reply or exception was sent
+ REQUEST_OUTCOME_UNKNOWN(8),
+ // Remote side called .close() on a forwarded RequestHandler
+ CLIENT_CLOSE(9),
+ // Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
+ CLIENT_OPEN(10),
+ // Remote side called .close() on a forwarded RequestHandlerSource
+ SERVICE_CLOSE(11),
+ // Remote side brought a new service online
+ SERVICE_ADVERTISE(12),
+ // Remote side's service is no longer available
+ SERVICE_UNADVERTISE(13),
+ ;
+ private final int id;
+
+ private MessageType(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Get the message type for an integer ID.
+ *
+ * @param id the integer ID
+ * @return the message type instance
+ */
+ public static MessageType getMessageType(final int id) {
+ switch (id) {
+ case 1: return REQUEST_ONEWAY;
+ case 2: return REQUEST;
+ case 3: return REPLY;
+ case 4: return CANCEL_REQUEST;
+ case 5: return CANCEL_ACK;
+ case 6: return REQUEST_RECEIVE_FAILED;
+ case 7: return REQUEST_FAILED;
+ case 8: return REQUEST_OUTCOME_UNKNOWN;
+ case 9: return CLIENT_CLOSE;
+ case 10: return CLIENT_OPEN;
+ case 11: return SERVICE_CLOSE;
+ case 12: return SERVICE_ADVERTISE;
+ case 13: return SERVICE_UNADVERTISE;
+ default: throw new IllegalArgumentException("Invalid message type
ID");
+ }
+ }
+}
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,907 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.Connector;
+import org.jboss.xnio.Acceptor;
+import org.jboss.xnio.log.Logger;
+import org.jboss.remoting.spi.remote.RequestHandler;
+import org.jboss.remoting.spi.remote.RequestHandlerSource;
+import org.jboss.remoting.spi.remote.ReplyHandler;
+import org.jboss.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.remoting.spi.remote.Handle;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.spi.AbstractAutoCloseable;
+import org.jboss.remoting.spi.stream.StreamDetector;
+import org.jboss.remoting.spi.stream.StreamSerializerFactory;
+import org.jboss.remoting.spi.stream.StreamProvider;
+import static org.jboss.remoting.util.CollectionUtil.concurrentIntegerMap;
+import org.jboss.remoting.util.CollectionUtil;
+import org.jboss.remoting.util.ConcurrentIntegerMap;
+import org.jboss.remoting.CloseHandler;
+import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.SimpleCloseable;
+import org.jboss.remoting.RemoteExecutionException;
+import org.jboss.remoting.IndeterminateOutcomeException;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.ObjectTable;
+import org.jboss.marshalling.Marshalling;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.io.IOException;
+import java.io.InvalidClassException;
+
+/**
+ * Protocol handler for the basic message-oriented Remoting protocol.
+ *
+ * @param <A> stream channel address type (Void if streams are not supported)
+ */
+public final class MultiplexHandler<A> implements
IoHandler<AllocatedMessageChannel> {
+
+ private static final Logger log = Logger.getLogger(MultiplexHandler.class);
+
+ //--== Connection configuration items ==--
+ private final MarshallerFactory marshallerFactory;
+ private final Configuration marshallingConfiguration;
+ private final int linkMetric;
+ private final Executor executor;
+ // buffer allocator for outbound message assembly
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final StreamDetector streamDetector;
+ private final Connector<A, AllocatedMessageChannel> messageConnector;
+ private final Acceptor<A, AllocatedMessageChannel> messageAcceptor;
+ private final Connector<A, StreamChannel> streamConnector;
+ private final Acceptor<A, StreamChannel> streamAcceptor;
+
+ // running on remote node
+ private final ConcurrentIntegerMap<ReplyHandler> remoteRequests =
concurrentIntegerMap();
+ // running on local node
+ private final ConcurrentIntegerMap<RemoteRequestContext> localRequests =
concurrentIntegerMap();
+ // sequence for remote requests
+ private final AtomicInteger requestSequence = new AtomicInteger();
+
+ // clients whose requests get forwarded to the remote side
+ // even #s were opened from services forwarded to us (our sequence)
+ // odd #s were forwarded directly to us (remote sequence)
+ private final ConcurrentIntegerMap<RequestHandler> remoteClients =
concurrentIntegerMap();
+ // forwarded to remote side (handled on this side)
+ private final ConcurrentIntegerMap<Handle<RequestHandler>>
forwardedClients = concurrentIntegerMap();
+ // sequence for forwarded clients (unsigned; shift left one bit, add one)
+ private final AtomicInteger forwardedClientSequence = new AtomicInteger();
+ // sequence for clients created from services forwarded to us (unsigned; shift left
one bit)
+ private final AtomicInteger remoteClientSequence = new AtomicInteger();
+
+ // services forwarded to us
+ private final ConcurrentIntegerMap<RequestHandlerSource> remoteServices =
concurrentIntegerMap();
+ // forwarded to remote side (handled on this side)
+ private final ConcurrentIntegerMap<Handle<RequestHandlerSource>>
forwardedServices = concurrentIntegerMap();
+ // sequence for forwarded services
+ private final AtomicInteger serviceSequence = new AtomicInteger();
+
+ private final Endpoint endpoint;
+
+ private volatile AllocatedMessageChannel channel;
+
+ public MultiplexHandler(final Endpoint endpoint, final RemotingChannelConfiguration
configuration, final StreamProvider<A> streamProvider) {
+ this.endpoint = endpoint;
+ messageConnector = streamProvider.getMessageChannelConnector();
+ messageAcceptor = streamProvider.getMessageChannelAcceptor();
+ streamConnector = streamProvider.getStreamChannelConnector();
+ streamAcceptor = streamProvider.getStreamChannelAcceptor();
+ allocator = configuration.getAllocator();
+ executor = configuration.getExecutor();
+ marshallerFactory = configuration.getMarshallerFactory();
+ marshallingConfiguration = configuration.getMarshallingConfiguration();
+ linkMetric = configuration.getLinkMetric();
+ streamDetector = configuration.getStreamDetector();
+ }
+
+ public void handleOpened(final AllocatedMessageChannel channel) {
+ channel.resumeReads();
+ }
+
+ public void handleReadable(final AllocatedMessageChannel channel) {
+ for (;;) try {
+ final ByteBuffer buffer;
+ try {
+ buffer = channel.receive();
+ } catch (IOException e) {
+ log.error(e, "I/O error in protocol channel; closing
channel");
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (buffer == null) {
+ // todo release all handles...
+ // todo what if the write queue is not empty?
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (! buffer.hasRemaining()) {
+ // would block
+ channel.resumeReads();
+ return;
+ }
+ final MessageType msgType;
+ try {
+ msgType = MessageType.getMessageType(buffer.get() & 0xff);
+ } catch (IllegalArgumentException ex) {
+ log.trace("Received invalid message type");
+ return;
+ }
+ log.trace("Received message %s, type %s", buffer, msgType);
+ switch (msgType) {
+ case REQUEST_ONEWAY: {
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandler> handle =
forwardedClients.get(clientId);
+ if (handle == null) {
+ log.trace("Request on invalid client ID %d",
Integer.valueOf(clientId));
+ return;
+ }
+ final Object payload;
+ try {
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ try {
+ payload = unmarshaller.readObject();
+ unmarshaller.finish();
+ } catch (ClassNotFoundException e) {
+ log.trace("Class not found in one-way request for
client ID %d", Integer.valueOf(clientId));
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.error(ex, "Failed to unmarshal a one-way
request");
+ break;
+ }
+ final RequestHandler requestHandler = handle.getResource();
+ try {
+ requestHandler.receiveRequest(payload);
+ } catch (Throwable t) {
+ log.error(t, "One-way request handler unexpectedly threw an
exception");
+ }
+ break;
+ }
+ case REQUEST: {
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandler> handle =
forwardedClients.get(clientId);
+ if (handle == null) {
+ log.trace("Request on invalid client ID %d",
Integer.valueOf(clientId));
+ break;
+ }
+ final int requestId = buffer.getInt();
+ final Object payload;
+ try {
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ try {
+ payload = unmarshaller.readObject();
+ unmarshaller.finish();
+ } catch (ClassNotFoundException e) {
+ log.trace("Class not found in request ID %d for
client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
+ // todo - send request receive failed message
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.trace("Failed to unmarshal a request (%s), sending
%s", ex, MessageType.REQUEST_RECEIVE_FAILED);
+ // todo send a request failure message
+ break;
+ }
+ final RequestHandler requestHandler = handle.getResource();
+ requestHandler.receiveRequest(payload, new ReplyHandlerImpl(channel,
requestId, allocator));
+ break;
+ }
+ case REPLY: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
+ break;
+ }
+ final Object payload;
+ try {
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ try {
+ payload = unmarshaller.readObject();
+ unmarshaller.finish();
+ } catch (ClassNotFoundException e) {
+ replyHandler.handleException(new
InvalidClassException("Class not found: " + e.toString()));
+ log.trace("Class not found in reply to request ID
%d", Integer.valueOf(requestId));
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.trace("Failed to unmarshal a reply (%s), sending a
ReplyException");
+ // todo
+ SpiUtils.safeHandleException(replyHandler, ex);
+ break;
+ }
+ SpiUtils.safeHandleReply(replyHandler, payload);
+ break;
+ }
+ case CANCEL_REQUEST: {
+ final int requestId = buffer.getInt();
+ final RemoteRequestContext context = localRequests.get(requestId);
+ if (context != null) {
+ context.cancel();
+ }
+ break;
+ }
+ case CANCEL_ACK: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.get(requestId);
+ if (replyHandler != null) {
+ replyHandler.handleCancellation();
+ }
+ break;
+ }
+ case REQUEST_RECEIVE_FAILED: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
+ break;
+ }
+ final String reason = readUTFZ(buffer);
+
+ // todo - throw a new ReplyException
+ break;
+ }
+ case REQUEST_FAILED: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
+ break;
+ }
+ final Throwable cause;
+ try {
+ final Unmarshaller unmarshaller =
marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ try {
+ cause = (Throwable) unmarshaller.readObject();
+ } catch (ClassNotFoundException e) {
+ replyHandler.handleException(new
InvalidClassException("Class not found: " + e.toString()));
+ log.trace("Class not found in exception reply to
request ID %d", Integer.valueOf(requestId));
+ break;
+ } catch (ClassCastException e) {
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred
when attempting to unmarshal the cause)"));
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.trace("Failed to unmarshal an exception reply (%s),
sending a generic execution exception");
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred
when attempting to read the cause)"));
+ break;
+ }
+ SpiUtils.safeHandleException(replyHandler, new
RemoteExecutionException("Remote execution failed", cause));
+ break;
+ }
+ case REQUEST_OUTCOME_UNKNOWN: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
+ break;
+ }
+ final String reason = readUTFZ(buffer);
+ SpiUtils.safeHandleException(replyHandler, new
IndeterminateOutcomeException(reason));
+ break;
+ }
+ case CLIENT_CLOSE: {
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandler> handle =
forwardedClients.remove(clientId);
+ if (handle == null) {
+ log.warn("Got client close message for unknown client
%d", Integer.valueOf(clientId));
+ break;
+ }
+ IoUtils.safeClose(handle);
+ break;
+ }
+ case CLIENT_OPEN: {
+ final int serviceId = buffer.getInt();
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandlerSource> handle =
forwardedServices.get(serviceId);
+ if (handle == null) {
+ log.warn("Received client open message for unknown service
%d", Integer.valueOf(serviceId));
+ break;
+ }
+ try {
+ final RequestHandlerSource requestHandlerSource =
handle.getResource();
+ final Handle<RequestHandler> clientHandle =
requestHandlerSource.createRequestHandler();
+ // todo check for duplicate
+ // todo validate the client ID
+ log.trace("Opening client %d from service %d",
Integer.valueOf(clientId), Integer.valueOf(serviceId));
+ forwardedClients.put(clientId, clientHandle);
+ } catch (IOException ex) {
+ log.error(ex, "Failed to create a request handler for client
ID %d", Integer.valueOf(clientId));
+ break;
+ } finally {
+ IoUtils.safeClose(handle);
+ }
+ break;
+ }
+ case SERVICE_CLOSE: {
+ final Handle<RequestHandlerSource> handle =
forwardedServices.remove(buffer.getInt());
+ if (handle == null) {
+ break;
+ }
+ IoUtils.safeClose(handle);
+ break;
+ }
+ case SERVICE_ADVERTISE: {
+ final int serviceId = buffer.getInt();
+ final String serviceType = readUTFZ(buffer);
+ final String groupName = readUTFZ(buffer);
+ final String endpointName = readUTFZ(buffer);
+ final int baseMetric = buffer.getInt();
+ int id = -1;
+ final RequestHandlerSource handlerSource = new
RequestHandlerSourceImpl(allocator, id);
+ final int calcMetric = baseMetric + linkMetric;
+ if (calcMetric > 0) {
+ try {
+ final SimpleCloseable closeable =
endpoint.registerRemoteService(serviceType, groupName, endpointName, handlerSource,
calcMetric);
+ // todo - something with that closeable
+ } catch (IOException e) {
+ log.error(e, "Unable to register remote service");
+ }
+ }
+ break;
+ }
+ case SERVICE_UNADVERTISE: {
+ final int serviceId = buffer.getInt();
+ IoUtils.safeClose(remoteServices.get(serviceId));
+ break;
+ }
+ default: {
+ log.error("Malformed packet received (invalid message type
%s)", msgType);
+ }
+ }
+ } catch (BufferUnderflowException e) {
+ log.error("Malformed packet received (buffer underflow)");
+ }
+ }
+
+ public void handleWritable(final AllocatedMessageChannel channel) {
+ for (;;) {
+ final WriteHandler handler = outputQueue.peek();
+ if (handler == null) {
+ return;
+ }
+ try {
+ if (handler.handleWrite(channel)) {
+ log.trace("Handled write with handler %s", handler);
+ pending.decrementAndGet();
+ outputQueue.remove();
+ } else {
+ channel.resumeWrites();
+ return;
+ }
+ } catch (Throwable t) {
+ pending.decrementAndGet();
+ outputQueue.remove();
+ }
+ }
+ }
+
+ public void handleClosed(final AllocatedMessageChannel channel) {
+ }
+
+ RequestHandlerSource getRemoteService(final int id) {
+ return new RequestHandlerSourceImpl(allocator, id);
+ }
+
+ private final class ReplyHandlerImpl implements ReplyHandler {
+
+ private final AllocatedMessageChannel channel;
+ private final int requestId;
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ private ReplyHandlerImpl(final AllocatedMessageChannel channel, final int
requestId, final BufferAllocator<ByteBuffer> allocator) {
+ if (channel == null) {
+ throw new NullPointerException("channel is null");
+ }
+ if (allocator == null) {
+ throw new NullPointerException("allocator is null");
+ }
+ this.channel = channel;
+ this.requestId = requestId;
+ this.allocator = allocator;
+ }
+
+ public void handleReply(final Object reply) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.REPLY.getId());
+ buffer.putInt(requestId);
+ try {
+ final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
+ final ByteOutput output = createByteOutput(allocator, bufferList);
+ try {
+ marshaller.start(output);
+ marshaller.writeObject(reply);
+ marshaller.close();
+ output.close();
+ registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (IOException e) {
+ log.error(e, "Failed to send a reply to the remote side");
+ } catch (InterruptedException e) {
+ log.error(e, "Reply handler thread interrupted before a reply could
be sent");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void handleException(final IOException exception) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.REQUEST_FAILED.getId());
+ buffer.putInt(requestId);
+ try {
+ final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
+ final ByteOutput output = createByteOutput(allocator, bufferList);
+ try {
+ marshaller.start(output);
+ marshaller.writeObject(exception);
+ marshaller.close();
+ output.close();
+ registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (IOException e) {
+ log.error(e, "Failed to send an exception to the remote
side");
+ } catch (InterruptedException e) {
+ log.error(e, "Reply handler thread interrupted before an exception
could be sent");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void handleCancellation() {
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CANCEL_ACK.getId());
+ buffer.putInt(requestId);
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ // todo log
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ // Writer members
+
+ private final BlockingQueue<WriteHandler> outputQueue =
CollectionUtil.blockingQueue(64);
+ private final AtomicInteger pending = new AtomicInteger();
+
+ private void registerWriter(final AllocatedMessageChannel channel, final WriteHandler
writeHandler) throws InterruptedException {
+ outputQueue.put(writeHandler);
+ if (pending.getAndIncrement() == 0) {
+ channel.resumeWrites();
+ }
+ }
+
+ // Reader utils
+
+ private String readUTFZ(ByteBuffer buffer) {
+ StringBuilder builder = new StringBuilder();
+ int state = 0, a = 0;
+ while (buffer.hasRemaining()) {
+ final int v = buffer.get() & 0xff;
+ switch (state) {
+ case 0: {
+ if (v == 0) {
+ return builder.toString();
+ } else if (v < 128) {
+ builder.append((char) v);
+ } else if (192 <= v && v < 224) {
+ a = v << 6;
+ state = 1;
+ } else if (224 <= v && v < 232) {
+ a = v << 12;
+ state = 2;
+ } else {
+ builder.append('?');
+ }
+ break;
+ }
+ case 1: {
+ if (v == 0) {
+ builder.append('?');
+ return builder.toString();
+ } else if (128 <= v && v < 192) {
+ a |= v & 0x3f;
+ builder.append((char) a);
+ } else {
+ builder.append('?');
+ }
+ state = 0;
+ break;
+ }
+ case 2: {
+ if (v == 0) {
+ builder.append('?');
+ return builder.toString();
+ } else if (128 <= v && v < 192) {
+ a |= (v & 0x3f) << 6;
+ state = 1;
+ } else {
+ builder.append('?');
+ state = 0;
+ }
+ break;
+ }
+ default:
+ throw new IllegalStateException("wrong state");
+ }
+ }
+ return builder.toString();
+ }
+
+ // client endpoint
+
+ private final class RequestHandlerImpl extends
AbstractAutoCloseable<RequestHandler> implements RequestHandler {
+
+ private final int identifier;
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ public RequestHandlerImpl(final int identifier, final
BufferAllocator<ByteBuffer> allocator) {
+ super(executor);
+ if (allocator == null) {
+ throw new NullPointerException("allocator is null");
+ }
+ this.identifier = identifier;
+ this.allocator = allocator;
+ addCloseHandler(new CloseHandler<RequestHandler>() {
+ public void handleClose(final RequestHandler closed) {
+ remoteClients.remove(identifier, this);
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
+ buffer.putInt(identifier);
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator,
buffer));
+ } catch (InterruptedException e) {
+ log.warn("Client close notification was interrupted before
it could be sent");
+ }
+ }
+ });
+ }
+
+ public void receiveRequest(final Object request) {
+ log.trace("Sending outbound one-way request of type %s", request ==
null ? "null" : request.getClass());
+ try {
+ final List<ByteBuffer> bufferList;
+ final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = createByteOutput(allocator, bufferList);
+ try {
+ marshaller.write(MessageType.REQUEST_ONEWAY.getId());
+ marshaller.writeInt(identifier);
+ marshaller.writeObject(request);
+ marshaller.close();
+ output.close();
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
+ } catch (InterruptedException e) {
+ log.trace(e, "receiveRequest was interrupted");
+ Thread.currentThread().interrupt();
+ return;
+ }
+ } catch (Throwable t) {
+ // ignore
+ log.trace(t, "receiveRequest failed with an exception");
+ return;
+ }
+ }
+
+ public RemoteRequestContext receiveRequest(final Object request, final
ReplyHandler handler) {
+ log.trace("Sending outbound request of type %s", request == null ?
"null" : request.getClass());
+ try {
+ final List<ByteBuffer> bufferList;
+ final Marshaller marshaller =
marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = createByteOutput(allocator, bufferList);
+ try {
+ marshaller.write(MessageType.REQUEST.getId());
+ marshaller.writeInt(identifier);
+
+ int id;
+ do {
+ id = requestSequence.getAndIncrement();
+ } while (remoteRequests.putIfAbsent(id, handler) != null);
+ marshaller.writeInt(id);
+ marshaller.writeObject(request);
+ marshaller.close();
+ output.close();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ executor.execute(new Runnable() {
+ public void run() {
+ SpiUtils.safeHandleCancellation(handler);
+ }
+ });
+ return SpiUtils.getBlankRemoteRequestContext();
+ }
+ log.trace("Sent request %s", request);
+ return new RemoteRequestContextImpl(id, allocator, channel);
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (final IOException t) {
+ log.trace(t, "receiveRequest failed with an exception");
+ executor.execute(new Runnable() {
+ public void run() {
+ SpiUtils.safeHandleException(handler, t);
+ }
+ });
+ return SpiUtils.getBlankRemoteRequestContext();
+ }
+ }
+
+ public String toString() {
+ return "forwarding request handler <" +
Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
+ }
+ }
+
+ public final class RemoteRequestContextImpl implements RemoteRequestContext {
+
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final int id;
+ private final AllocatedMessageChannel channel;
+
+ public RemoteRequestContextImpl(final int id, final
BufferAllocator<ByteBuffer> allocator, final AllocatedMessageChannel channel) {
+ this.id = id;
+ this.allocator = allocator;
+ this.channel = channel;
+ }
+
+ public void cancel() {
+ try {
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
+ buffer.putInt(id);
+ buffer.flip();
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ // todo log that cancel attempt failed
+ Thread.currentThread().interrupt();
+ } catch (Throwable t) {
+ // todo log that cancel attempt failed
+ }
+ }
+ }
+
+ public final class RequestHandlerSourceImpl extends
AbstractAutoCloseable<RequestHandlerSource> implements RequestHandlerSource {
+
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final int identifier;
+
+ protected RequestHandlerSourceImpl(final BufferAllocator<ByteBuffer>
allocator, final int identifier) {
+ super(executor);
+ this.allocator = allocator;
+ this.identifier = identifier;
+ addCloseHandler(new CloseHandler<RequestHandlerSource>() {
+ public void handleClose(final RequestHandlerSource closed) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.SERVICE_CLOSE.getId());
+ buffer.putInt(identifier);
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator,
buffer));
+ } catch (InterruptedException e) {
+ log.warn("Service close notification was interrupted before
it could be sent");
+ }
+ }
+ });
+ }
+
+ public Handle<RequestHandler> createRequestHandler() throws IOException {
+ int id;
+ do {
+ id = remoteClientSequence.getAndIncrement() << 1;
+ } while (remoteClients.putIfAbsent(id, new RequestHandlerImpl(id,
MultiplexHandler.this.allocator)) != null);
+ final int clientId = id;
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CLIENT_OPEN.getId());
+ buffer.putInt(identifier);
+ buffer.putInt(clientId);
+ buffer.flip();
+ // todo - probably should bail out if we're interrupted?
+ boolean intr = false;
+ for (;;) {
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ try {
+ return new RequestHandlerImpl(clientId, allocator).getHandle();
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ }
+ }
+
+ public String toString() {
+ return "forwarding request handler source <" +
Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
+ }
+ }
+
+ public static ByteOutput createByteOutput(final BufferAllocator<ByteBuffer>
allocator, final Collection<ByteBuffer> target) {
+ return new ByteOutput() {
+ private ByteBuffer current;
+
+ private ByteBuffer getCurrent() {
+ final ByteBuffer buffer = current;
+ return buffer == null ? (current = allocator.allocate()) : buffer;
+ }
+
+ public void write(final int i) {
+ final ByteBuffer buffer = getCurrent();
+ buffer.put((byte) i);
+ if (! buffer.hasRemaining()) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+
+ public void write(final byte[] bytes) {
+ write(bytes, 0, bytes.length);
+ }
+
+ public void write(final byte[] bytes, int offs, int len) {
+ while (len > 0) {
+ final ByteBuffer buffer = getCurrent();
+ final int c = Math.min(len, buffer.remaining());
+ buffer.put(bytes, offs, c);
+ offs += c;
+ len -= c;
+ if (! buffer.hasRemaining()) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+ }
+
+ public void close() {
+ flush();
+ }
+
+ public void flush() {
+ final ByteBuffer buffer = current;
+ if (buffer != null) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+ };
+ }
+
+ public class ProtocolObjectTableWriter implements ObjectTable.Writer {
+
+ public void writeObject(final Marshaller marshaller, final Object o) throws
IOException {
+
+ }
+ }
+
+ public class ProtocolObjectTable implements ObjectTable {
+
+ public Writer getObjectWriter(final Object o) throws IOException /* fixed in
beta2 */ {
+ if (o instanceof RequestHandler) {
+ final RequestHandler requestHandler = (RequestHandler) o;
+
+ } else if (o instanceof RequestHandlerSource) {
+ final RequestHandlerSource requestHandlerSource = (RequestHandlerSource)
o;
+
+ } else {
+ final StreamSerializerFactory ssf = streamDetector.detectStream(o);
+ if (ssf != null) {
+ final IoHandler<? super AllocatedMessageChannel> streamHandler
= ssf.getLocalSide(o, new StreamContextImpl(executor, marshallerFactory,
marshallingConfiguration));
+ // todo - this should really be the "server" side
+ final IoFuture<AllocatedMessageChannel> futureChannel =
messageConnector.connectTo(null, streamHandler);
+
+ }
+ }
+ return null;
+ }
+
+ public Object readObject(final Unmarshaller unmarshaller) throws IOException,
ClassNotFoundException {
+ switch (unmarshaller.readByte()) {
+ case 1: {
+ // remote client
+ final int id = unmarshaller.readInt();
+ }
+ case 2: {
+ // remote client source
+ }
+ case 3: {
+ // stream
+ }
+ default: {
+ // invalid
+ }
+ }
+ return null;
+ }
+ }
+}
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.RemotingException;
+import org.jboss.remoting.SimpleCloseable;
+import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.spi.remote.RequestHandlerSource;
+import org.jboss.remoting.spi.remote.Handle;
+import org.jboss.remoting.spi.stream.StreamProvider;
+import org.jboss.xnio.IoHandlerFactory;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.AbstractConvertingIoFuture;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public final class MultiplexProtocol {
+
+ private static final Logger log = Logger.getLogger(MultiplexProtocol.class);
+
+ private MultiplexProtocol() {
+ }
+
+ /**
+ * Create a request server for the multiplex protocol.
+ *
+ * @param executor the executor to use for invocations
+ * @param allocator the buffer allocator to use
+ * @return a handler factory for passing to an XNIO server
+ * @param <A> stream channel address type
+ */
+ public static <A> IoHandlerFactory<AllocatedMessageChannel>
createServer(final Endpoint endpoint, final Executor executor, final
BufferAllocator<ByteBuffer> allocator, final StreamProvider<A> streamProvider)
{
+ return new IoHandlerFactory<AllocatedMessageChannel>() {
+ public IoHandler<? super AllocatedMessageChannel> createHandler() {
+ final RemotingChannelConfiguration configuration = new
RemotingChannelConfiguration();
+ configuration.setAllocator(allocator);
+ configuration.setExecutor(executor);
+ // todo marshaller factory... etc
+ return new MultiplexHandler<A>(endpoint, configuration,
streamProvider);
+ }
+ };
+ }
+
+ /**
+ * Create a request client for the multiplex protocol.
+ *
+ * @return a handle which may be used to close the connection
+ * @throws IOException if an error occurs @param executor the executor to use for
invocations
+ * @param channelSource the XNIO channel source to use to establish the connection
+ * @param allocator the buffer allocator to use
+ * @param streamProvider
+ */
+ public static <A> IoFuture<SimpleCloseable> connect(final Endpoint
endpoint, final Executor executor, final ChannelSource<AllocatedMessageChannel>
channelSource, final BufferAllocator<ByteBuffer> allocator, final
StreamProvider<A> streamProvider) throws IOException {
+ final RemotingChannelConfiguration configuration = new
RemotingChannelConfiguration();
+ configuration.setAllocator(allocator);
+ configuration.setExecutor(executor);
+ // todo marshaller factory... etc
+ final MultiplexHandler multiplexHandler = new MultiplexHandler<A>(endpoint,
configuration, streamProvider);
+ final IoFuture<AllocatedMessageChannel> futureChannel =
channelSource.open(multiplexHandler);
+ return new AbstractConvertingIoFuture<SimpleCloseable,
AllocatedMessageChannel>(futureChannel) {
+ protected SimpleCloseable convert(final AllocatedMessageChannel channel)
throws RemotingException {
+ return new AbstractConnection(executor) {
+ public Handle<RequestHandlerSource> getServiceForId(final int
id) throws IOException {
+ return multiplexHandler.getRemoteService(id).getHandle();
+ }
+ };
+ }
+ };
+ }
+}
Deleted:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/RemotingChannelConfiguration.java 2008-09-18
03:06:39 UTC (rev 4573)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -1,82 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import java.util.concurrent.Executor;
-import java.nio.ByteBuffer;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.marshalling.MarshallerFactory;
-
-/**
- *
- */
-public final class RemotingChannelConfiguration {
- private MarshallerFactory marshallerFactory;
- private int linkMetric;
- private Executor executor;
- private ClassLoader classLoader;
- private BufferAllocator<ByteBuffer> allocator;
-
- public RemotingChannelConfiguration() {
- }
-
- public MarshallerFactory getMarshallerFactory() {
- return marshallerFactory;
- }
-
- public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
- this.marshallerFactory = marshallerFactory;
- }
-
- public int getLinkMetric() {
- return linkMetric;
- }
-
- public void setLinkMetric(final int linkMetric) {
- this.linkMetric = linkMetric;
- }
-
- public Executor getExecutor() {
- return executor;
- }
-
- public void setExecutor(final Executor executor) {
- this.executor = executor;
- }
-
- public ClassLoader getClassLoader() {
- return classLoader;
- }
-
- public void setClassLoader(final ClassLoader classLoader) {
- this.classLoader = classLoader;
- }
-
- public BufferAllocator<ByteBuffer> getAllocator() {
- return allocator;
- }
-
- public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
- this.allocator = allocator;
- }
-}
Copied:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java
(from rev 4600,
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/RemotingChannelConfiguration.java)
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,93 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.util.concurrent.Executor;
+import java.nio.ByteBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Configuration;
+import org.jboss.remoting.spi.stream.StreamDetector;
+
+/**
+ *
+ */
+public final class RemotingChannelConfiguration {
+ private MarshallerFactory marshallerFactory;
+ private Configuration marshallingConfiguration;
+ private int linkMetric;
+ private Executor executor;
+ private BufferAllocator<ByteBuffer> allocator;
+ private StreamDetector streamDetector;
+
+ public RemotingChannelConfiguration() {
+ }
+
+ public MarshallerFactory getMarshallerFactory() {
+ return marshallerFactory;
+ }
+
+ public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
+ this.marshallerFactory = marshallerFactory;
+ }
+
+ public Configuration getMarshallingConfiguration() {
+ return marshallingConfiguration;
+ }
+
+ public void setMarshallingConfiguration(final Configuration marshallingConfiguration)
{
+ this.marshallingConfiguration = marshallingConfiguration;
+ }
+
+ public int getLinkMetric() {
+ return linkMetric;
+ }
+
+ public void setLinkMetric(final int linkMetric) {
+ this.linkMetric = linkMetric;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(final Executor executor) {
+ this.executor = executor;
+ }
+
+ public BufferAllocator<ByteBuffer> getAllocator() {
+ return allocator;
+ }
+
+ public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
+ this.allocator = allocator;
+ }
+
+ public StreamDetector getStreamDetector() {
+ return streamDetector;
+ }
+
+ public void setStreamDetector(final StreamDetector streamDetector) {
+ this.streamDetector = streamDetector;
+ }
+}
Deleted:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/SimpleWriteHandler.java 2008-09-18
03:06:39 UTC (rev 4573)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -1,84 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.log.Logger;
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-/**
- *
- */
-public final class SimpleWriteHandler implements WriteHandler {
- private static final Logger log = Logger.getLogger(SimpleWriteHandler.class);
-
- private final BufferAllocator<ByteBuffer> allocator;
- private final ByteBuffer[] buffers;
-
- public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final
List<ByteBuffer> buffers) {
- this.allocator = allocator;
- this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
- logBufferSize();
- }
-
- public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final
ByteBuffer[] buffers) {
- this.allocator = allocator;
- this.buffers = buffers;
- logBufferSize();
- }
-
- public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final
ByteBuffer buffer) {
- this.allocator = allocator;
- buffers = new ByteBuffer[] { buffer };
- logBufferSize();
- }
-
- private void logBufferSize() {
- if (log.isTrace()) {
- long t = 0L;
- for (ByteBuffer buf : buffers) {
- t += (long)buf.remaining();
- }
- log.trace("Writing a message of size %d", Long.valueOf(t));
- }
- }
-
- public boolean handleWrite(final WritableMessageChannel channel) {
- boolean done = true;
- try {
- return (done = channel.send(buffers));
- } catch (IOException e) {
- log.trace(e, "Write failed");
- return true;
- } finally {
- if (done) {
- for (ByteBuffer buffer : buffers) {
- allocator.free(buffer);
- }
- }
- }
- }
-}
Copied:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
(from rev 4600,
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/SimpleWriteHandler.java)
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+/**
+ *
+ */
+public final class SimpleWriteHandler implements WriteHandler {
+ private static final Logger log = Logger.getLogger(SimpleWriteHandler.class);
+
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final ByteBuffer[] buffers;
+
+ public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final
List<ByteBuffer> buffers) {
+ this.allocator = allocator;
+ this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
+ logBufferSize();
+ }
+
+ public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final
ByteBuffer[] buffers) {
+ this.allocator = allocator;
+ this.buffers = buffers;
+ logBufferSize();
+ }
+
+ public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final
ByteBuffer buffer) {
+ this.allocator = allocator;
+ buffers = new ByteBuffer[] { buffer };
+ logBufferSize();
+ }
+
+ private void logBufferSize() {
+ if (log.isTrace()) {
+ long t = 0L;
+ for (ByteBuffer buf : buffers) {
+ t += (long)buf.remaining();
+ }
+ log.trace("Writing a message of size %d", Long.valueOf(t));
+ }
+ }
+
+ public boolean handleWrite(final WritableMessageChannel channel) {
+ boolean done = true;
+ try {
+ return (done = channel.send(buffers));
+ } catch (IOException e) {
+ log.trace(e, "Write failed");
+ return true;
+ } finally {
+ if (done) {
+ for (ByteBuffer buffer : buffers) {
+ allocator.free(buffer);
+ }
+ }
+ }
+ }
+}
Added:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/StreamContextImpl.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/StreamContextImpl.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/StreamContextImpl.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.stream.StreamContext;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.MarshallerFactory;
+import java.util.concurrent.Executor;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class StreamContextImpl implements StreamContext {
+
+ private final Executor executor;
+ private final MarshallerFactory marshallerFactory;
+ private final Configuration marshallerConfiguration;
+
+ StreamContextImpl(final Executor executor, final MarshallerFactory marshallerFactory,
final Configuration marshallerConfiguration) {
+ this.executor = executor;
+ this.marshallerFactory = marshallerFactory;
+ this.marshallerConfiguration = marshallerConfiguration;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public Marshaller createMarshaller() throws IOException {
+ return marshallerFactory.createMarshaller(marshallerConfiguration);
+ }
+
+ public Unmarshaller createUnmarshaller() throws IOException {
+ return marshallerFactory.createUnmarshaller(marshallerConfiguration);
+ }
+}
Deleted:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/WriteHandler.java 2008-09-18
03:06:39 UTC (rev 4573)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -1,32 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-/**
- *
- */
-public interface WriteHandler {
- boolean handleWrite(WritableMessageChannel channel);
-}
Copied:
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
(from rev 4600,
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/WriteHandler.java)
===================================================================
---
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+/**
+ *
+ */
+public interface WriteHandler {
+ boolean handleWrite(WritableMessageChannel channel);
+}
Copied:
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex
(from rev 4573,
remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic)
Deleted:
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
===================================================================
---
remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/ConnectionTestCase.java 2008-09-18
03:06:39 UTC (rev 4573)
+++
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -1,81 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import junit.framework.TestCase;
-import org.jboss.remoting.core.EndpointImpl;
-import org.jboss.remoting.test.support.LoggingHelper;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.Xnio;
-import org.jboss.xnio.CloseableExecutor;
-import org.jboss.xnio.nio.NioXnio;
-
-/**
- *
- */
-public final class ConnectionTestCase extends TestCase {
- static {
- LoggingHelper.init();
- }
-
- public void testConnection() throws Throwable {
- final String REQUEST = "request";
- final String REPLY = "reply";
- final List<Throwable> problems = Collections.synchronizedList(new
LinkedList<Throwable>());
- final CloseableExecutor closeableExecutor =
IoUtils.closeableExecutor(Executors.newCachedThreadPool(), 500L, TimeUnit.MILLISECONDS);
- try {
- final BufferAllocator<ByteBuffer> allocator = new
BufferAllocator<ByteBuffer>() {
- public ByteBuffer allocate() {
- return ByteBuffer.allocate(1024);
- }
-
- public void free(final ByteBuffer buffer) {
- }
- };
- final Xnio xnio = NioXnio.create();
- try {
- final EndpointImpl endpoint = new EndpointImpl();
- endpoint.setExecutor(closeableExecutor);
- endpoint.start();
- try {
- } finally {
- endpoint.stop();
- }
- } finally {
- IoUtils.safeClose(xnio);
- }
- } finally {
- IoUtils.safeClose(closeableExecutor);
- }
- for (Throwable t : problems) {
- throw t;
- }
- }
-}
Copied:
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
(from rev 4600,
remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/ConnectionTestCase.java)
===================================================================
---
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
(rev 0)
+++
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -0,0 +1,81 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import junit.framework.TestCase;
+import org.jboss.remoting.core.EndpointImpl;
+import org.jboss.remoting.test.support.LoggingHelper;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Xnio;
+import org.jboss.xnio.CloseableExecutor;
+import org.jboss.xnio.nio.NioXnio;
+
+/**
+ *
+ */
+public final class ConnectionTestCase extends TestCase {
+ static {
+ LoggingHelper.init();
+ }
+
+ public void testConnection() throws Throwable {
+ final String REQUEST = "request";
+ final String REPLY = "reply";
+ final List<Throwable> problems = Collections.synchronizedList(new
LinkedList<Throwable>());
+ final CloseableExecutor closeableExecutor =
IoUtils.closeableExecutor(Executors.newCachedThreadPool(), 500L, TimeUnit.MILLISECONDS);
+ try {
+ final BufferAllocator<ByteBuffer> allocator = new
BufferAllocator<ByteBuffer>() {
+ public ByteBuffer allocate() {
+ return ByteBuffer.allocate(1024);
+ }
+
+ public void free(final ByteBuffer buffer) {
+ }
+ };
+ final Xnio xnio = NioXnio.create();
+ try {
+ final EndpointImpl endpoint = new EndpointImpl();
+ endpoint.setExecutor(closeableExecutor);
+ endpoint.start();
+ try {
+ } finally {
+ endpoint.stop();
+ }
+ } finally {
+ IoUtils.safeClose(xnio);
+ }
+ } finally {
+ IoUtils.safeClose(closeableExecutor);
+ }
+ for (Throwable t : problems) {
+ throw t;
+ }
+ }
+}
Modified: remoting3/trunk/testing-support/src/main/resources/testing.policy
===================================================================
--- remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-10-20 13:51:33
UTC (rev 4600)
+++ remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-10-21 02:39:24
UTC (rev 4601)
@@ -15,7 +15,6 @@
{
permission java.lang.RuntimePermission "modifyThread"; // for executor
control
permission java.net.SocketPermission "*:*", "accept, connect,
resolve";
- permission java.util.PropertyPermission "xnio.provider", "read";
// todo - fixed in XNIO trunk...
};
// Permissions for Remoting itself
@@ -27,14 +26,11 @@
grant codeBase "file:${build.home}/core/target/main/classes/-"
{
- // TODO: this is for the marshallers, which ought to be in their own module/module
set
- permission java.io.SerializablePermission "enableSubstitution";
permission java.util.PropertyPermission "jboss.remoting.*",
"read";
};
grant codeBase "file:${build.home}/protocol/basic/target/main/classes/-"
{
- permission java.net.SocketPermission "*:*", "accept, connect,
resolve"; // todo - need a better solution
permission java.util.PropertyPermission "jboss.remoting.*",
"read";
};
@@ -66,7 +62,7 @@
permission java.security.AllPermission;
};
-grant codeBase "file:${lib.xnio-standalone.local}"
+grant codeBase "file:${lib.xnio-nio.local}"
{
permission java.security.AllPermission;
};
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/OrderedExecutor.java
===================================================================
---
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/OrderedExecutor.java 2008-10-20
13:51:33 UTC (rev 4600)
+++
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/OrderedExecutor.java 2008-10-21
02:39:24 UTC (rev 4601)
@@ -24,6 +24,7 @@
import java.util.concurrent.Executor;
import java.util.LinkedList;
+import org.jboss.xnio.log.Logger;
/**
* An executor that always runs all tasks in order, using a delegate executor to run the
tasks.
@@ -32,6 +33,8 @@
* same method, will result in B's task running after A's.
*/
public final class OrderedExecutor implements Executor {
+ private static final Logger log = Logger.getLogger(OrderedExecutor.class);
+
// @protectedby tasks
private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
// @protectedby tasks
@@ -60,7 +63,7 @@
try {
task.run();
} catch (Throwable t) {
- // eat it!
+ log.error(t, "Runnable task %s failed", task);
}
}
}
@@ -77,7 +80,15 @@
tasks.add(command);
if (! running) {
running = true;
- parent.execute(runner);
+ boolean ok = false;
+ try {
+ parent.execute(runner);
+ ok = true;
+ } finally {
+ if (! ok) {
+ running = false;
+ }
+ }
}
}
}