JBoss Remoting SVN: r4601 - in remoting3/trunk: api/src/main/java/org/jboss/remoting/spi and 11 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-10-20 22:39:24 -0400 (Mon, 20 Oct 2008)
New Revision: 4601
Added:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamProvider.java
remoting3/trunk/protocol/multiplex/
remoting3/trunk/protocol/multiplex/src/
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/StreamContextImpl.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
Removed:
remoting3/trunk/protocol/basic/
remoting3/trunk/protocol/multiplex/src/
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/basic/
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicProtocol.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/basic/
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/RequestHandler.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamContext.java
remoting3/trunk/build.properties
remoting3/trunk/build.xml
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java
remoting3/trunk/testing-support/src/main/resources/testing.policy
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/OrderedExecutor.java
Log:
The basic protocol is really a multiplex protocol. Make room for a *real* basic protocol.
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -28,6 +28,7 @@
import org.jboss.remoting.RequestContext;
import org.jboss.remoting.CloseHandler;
import org.jboss.xnio.log.Logger;
+import java.io.IOException;
/**
* Utility methods for Remoting service providers.
@@ -41,12 +42,11 @@
* Safely notify a reply handler of an exception.
*
* @param replyHandler the reply handler
- * @param msg the message
- * @param cause the cause
+ * @param exception
*/
- public static void safeHandleException(final ReplyHandler replyHandler, final String msg, final Throwable cause) {
+ public static void safeHandleException(final ReplyHandler replyHandler, final IOException exception) {
try {
- replyHandler.handleException(msg, cause);
+ replyHandler.handleException(exception);
} catch (Throwable t) {
log.error(t, "Failed to properly handle exception");
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -22,6 +22,8 @@
package org.jboss.remoting.spi.remote;
+import java.io.IOException;
+
/**
* A handler for replies from a request. The handler should respect the first invocation made on it, and ignore
* any subsequent invocations.
@@ -36,13 +38,11 @@
void handleReply(Object reply);
/**
- * Handle a remote exception.
+ * Handle an exception.
*
- * @param msg the message
- * @param cause the cause
+ * @param exception an exception which describes the problem
*/
- // TODO - change to accept a RemotingException instead?
- void handleException(final String msg, Throwable cause);
+ void handleException(IOException exception);
/**
* Handle a cancellation request.
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/RequestHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/RequestHandler.java 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/RequestHandler.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -44,7 +44,7 @@
/**
* Receive a request from a remote system. This method is intended to be called by protocol handlers. If the
* request cannot be accepted for some reason, the
- * {@link ReplyHandler#handleException(String, Throwable)}
+ * {@link ReplyHandler#handleException(java.io.IOException)}
* method is called immediately.
*
* @param request the request
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamContext.java 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamContext.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -23,7 +23,9 @@
package org.jboss.remoting.spi.stream;
import java.util.concurrent.Executor;
-import org.jboss.marshalling.MarshallerFactory;
+import java.io.IOException;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
/**
* A context for stream serialization.
@@ -31,16 +33,25 @@
public interface StreamContext {
/**
- * Get an executor which may be used for various asynchronous tasks.
+ * Get an executor which may be used by a stream serializer for various asynchronous tasks.
*
* @return an executor
*/
Executor getExecutor();
/**
- * Get a marshaller factory which is configured compatibly with the channel.
+ * Create a marshaller which is configured compatibly with the channel.
*
- * @return the marshaller factory
+ * @return a marshaller
*/
- MarshallerFactory getMarshallerFactory();
+ Marshaller createMarshaller() throws IOException;
+
+ /**
+ * Create an unmarshaller which is configured compatibly with the channel.
+ *
+ * @return an unmarshaller
+ */
+ Unmarshaller createUnmarshaller() throws IOException;
+
+ // todo - getter & setter for child ObjectTable, ClassTable, ExternalizerFactory, etc. for marshaller and unmarshaller
}
Added: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamProvider.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamProvider.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamProvider.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.spi.stream;
+
+import org.jboss.xnio.Connector;
+import org.jboss.xnio.Acceptor;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import java.io.IOException;
+
+/**
+ * A provider for streams.
+ *
+ * @param <A> the address type
+ */
+public interface StreamProvider<A> {
+ Connector<A, StreamChannel> getStreamChannelConnector();
+
+ Connector<A, AllocatedMessageChannel> getMessageChannelConnector();
+
+ Acceptor<A, StreamChannel> getStreamChannelAcceptor();
+
+ Acceptor<A, AllocatedMessageChannel> getMessageChannelAcceptor();
+}
Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/build.properties 2008-10-21 02:39:24 UTC (rev 4601)
@@ -115,7 +115,7 @@
lib.jboss-managed.local=${local.repository}/${lib.jboss-managed.local-path}
lib.jboss-managed.remote=${remote.repository}/${lib.jboss-managed.remote-path}
-lib.marshalling-api.version=1.0.0.Beta1
+lib.marshalling-api.version=1.0.0.Beta2
lib.marshalling-api.name=marshalling-api.jar
lib.marshalling-api.license=lgpl
lib.marshalling-api.dir=jboss/marshalling/${lib.marshalling-api.version}/lib
@@ -179,7 +179,7 @@
lib.trove.local=${local.repository}/${lib.trove.path}
lib.trove.remote=${remote.repository}/${lib.trove.path}
-lib.xnio.version=1.1.0.CR1
+lib.xnio.version=1.2.0.Alpha2008101601
lib.xnio-api.name=xnio-api-${lib.xnio.version}.jar
lib.xnio-api.license=lgpl
@@ -188,10 +188,10 @@
lib.xnio-api.local=${local.repository}/${lib.xnio-api.path}
lib.xnio-api.remote=${remote.repository}/${lib.xnio-api.path}
-lib.xnio-standalone.name=xnio-standalone-${lib.xnio.version}.jar
-lib.xnio-standalone.license=lgpl
-lib.xnio-standalone.dir=maven2/org/jboss/xnio/xnio-standalone/${lib.xnio.version}
-lib.xnio-standalone.path=${lib.xnio-standalone.dir}/${lib.xnio-standalone.name}
-lib.xnio-standalone.local=${local.repository}/${lib.xnio-standalone.path}
-lib.xnio-standalone.remote=${remote.repository}/${lib.xnio-standalone.path}
+lib.xnio-nio.name=xnio-nio-${lib.xnio.version}.jar
+lib.xnio-nio.license=lgpl
+lib.xnio-nio.dir=maven2/org/jboss/xnio/xnio-nio/${lib.xnio.version}
+lib.xnio-nio.path=${lib.xnio-nio.dir}/${lib.xnio-nio.name}
+lib.xnio-nio.local=${local.repository}/${lib.xnio-nio.path}
+lib.xnio-nio.remote=${remote.repository}/${lib.xnio-nio.path}
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/build.xml 2008-10-21 02:39:24 UTC (rev 4601)
@@ -211,16 +211,16 @@
<get src="${remote.license.dir}/${lib.xnio-api.license}.txt" dest="${lib.xnio-api.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
</target>
- <!-- External library: XNIO standalone -->
+ <!-- External library: XNIO nio -->
- <target name="lib.xnio-standalone-check">
- <available property="lib.xnio-standalone.exists" file="${lib.xnio-standalone.local}"/>
+ <target name="lib.xnio-nio-check">
+ <available property="lib.xnio-nio.exists" file="${lib.xnio-nio.local}"/>
</target>
- <target name="lib.xnio-standalone" depends="lib.xnio-standalone-check" unless="lib.xnio-standalone.exists">
- <mkdir dir="${local.repository}/${lib.xnio-standalone.dir}"/>
- <get src="${lib.xnio-standalone.remote}" dest="${lib.xnio-standalone.local}" usetimestamp="true" ignoreerrors="false"/>
- <get src="${remote.license.dir}/${lib.xnio-standalone.license}.txt" dest="${lib.xnio-standalone.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
+ <target name="lib.xnio-nio" depends="lib.xnio-nio-check" unless="lib.xnio-nio.exists">
+ <mkdir dir="${local.repository}/${lib.xnio-nio.dir}"/>
+ <get src="${lib.xnio-nio.remote}" dest="${lib.xnio-nio.local}" usetimestamp="true" ignoreerrors="false"/>
+ <get src="${remote.license.dir}/${lib.xnio-nio.license}.txt" dest="${lib.xnio-nio.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
</target>
<!-- ============================================== -->
@@ -800,7 +800,8 @@
<pathelement location="${lib.junit.local}"/>
<pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.river.local}"/>
- <pathelement location="${lib.xnio-standalone.local}"/>
+ <pathelement location="${lib.xnio-api.local}"/>
+ <pathelement location="${lib.xnio-nio.local}"/>
</classpath>
</javac>
<touch file="protocol/basic/target/test/.lastcompile" verbose="false"/>
@@ -816,7 +817,8 @@
<sysproperty key="ant.library.dir" value="${ant.home}/lib"/>
<sysproperty key="lib.junit.local" value="${lib.junit.local}"/>
<sysproperty key="lib.marshalling-api.local" value="${lib.marshalling-api.local}"/>
- <sysproperty key="lib.xnio-standalone.local" value="${lib.xnio-standalone.local}"/>
+ <sysproperty key="lib.xnio-api.local" value="${lib.xnio-api.local}"/>
+ <sysproperty key="lib.xnio-nio.local" value="${lib.xnio-nio.local}"/>
<jvmarg line="${test.jvmargs}"/>
<formatter type="plain" extension="${extension}"/>
<classpath>
@@ -829,7 +831,8 @@
<pathelement location="${lib.junit.local}"/>
<pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.river.local}"/>
- <pathelement location="${lib.xnio-standalone.local}"/>
+ <pathelement location="${lib.xnio-api.local}"/>
+ <pathelement location="${lib.xnio-nio.local}"/>
</classpath>
<batchtest fork="yes" todir="protocol/basic/target/test-results"
haltonfailure="no">
@@ -840,7 +843,7 @@
</junit>
</target>
- <target name="protocol.basic.test" depends="lib.xnio-standalone,api,core,protocol.basic,testing-support,util,protocol.basic.test.compile">
+ <target name="protocol.basic.test" depends="lib.xnio-nio,api,core,protocol.basic,testing-support,util,protocol.basic.test.compile">
<antcall inheritall="true" inheritrefs="true" target="protocol.basic.test.pseudotarget">
<param name="extension" value=".txt"/>
<param name="message" value="Running with no security manager"/>
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -22,12 +22,12 @@
package org.jboss.remoting.core;
-import org.jboss.remoting.RemotingException;
import org.jboss.remoting.spi.remote.ReplyHandler;
import org.jboss.remoting.spi.remote.RemoteRequestContext;
import org.jboss.xnio.AbstractIoFuture;
import org.jboss.xnio.IoFuture;
import java.util.concurrent.Executor;
+import java.io.IOException;
/**
*
@@ -67,8 +67,8 @@
setResult((O) reply);
}
- public void handleException(final String exMsg, final Throwable exCause) {
- setException(new RemotingException(exMsg, exCause));
+ public void handleException(final IOException exception) {
+ setException(exception);
}
public void handleCancellation() {
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -82,9 +82,9 @@
try {
requestListener.handleRequest(context, (I) request);
} catch (RemoteExecutionException e) {
- SpiUtils.safeHandleException(replyHandler, e.getMessage(), e.getCause());
+ SpiUtils.safeHandleException(replyHandler, e);
} catch (Throwable t) {
- SpiUtils.safeHandleException(replyHandler, "Unexpected exception in request listener", t);
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Request handler threw an exception", t));
}
}
});
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -26,6 +26,7 @@
import org.jboss.remoting.ClientContext;
import org.jboss.remoting.RemotingException;
import org.jboss.remoting.RequestCancelHandler;
+import org.jboss.remoting.RemoteExecutionException;
import org.jboss.remoting.core.util.TaggingExecutor;
import org.jboss.remoting.spi.remote.ReplyHandler;
import org.jboss.remoting.spi.SpiUtils;
@@ -81,7 +82,7 @@
public void sendFailure(final String msg, final Throwable cause) throws RemotingException, IllegalStateException {
if (! closed.getAndSet(true)) {
if (replyHandler != null) {
- replyHandler.handleException(msg, cause);
+ replyHandler.handleException(new RemoteExecutionException(msg, cause));
}
} else {
throw new IllegalStateException("Reply already sent");
Copied: remoting3/trunk/protocol/multiplex (from rev 4573, remoting3/trunk/protocol/basic)
Copied: remoting3/trunk/protocol/multiplex/src (from rev 4600, remoting3/trunk/protocol/basic/src)
Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex (from rev 4573, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic)
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/AbstractConnection.java 2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,44 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.remoting.spi.AbstractSimpleCloseable;
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public abstract class AbstractConnection extends AbstractSimpleCloseable {
- /**
- * Basic constructor.
- *
- * @param executor the executor used to execute the close notification handlers
- */
- protected AbstractConnection(final Executor executor) {
- super(executor);
- }
-
- public String toString() {
- return "connection <" + Integer.toString(hashCode()) + ">";
- }
-}
Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/AbstractConnection.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.AbstractSimpleCloseable;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public abstract class AbstractConnection extends AbstractSimpleCloseable {
+ /**
+ * Basic constructor.
+ *
+ * @param executor the executor used to execute the close notification handlers
+ */
+ protected AbstractConnection(final Executor executor) {
+ super(executor);
+ }
+
+ public String toString() {
+ return "connection <" + Integer.toString(hashCode()) + ">";
+ }
+}
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandler.java 2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicHandler.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,915 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import org.jboss.xnio.IoHandler;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.log.Logger;
-import org.jboss.remoting.spi.remote.RequestHandler;
-import org.jboss.remoting.spi.remote.RequestHandlerSource;
-import org.jboss.remoting.spi.remote.ReplyHandler;
-import org.jboss.remoting.spi.remote.RemoteRequestContext;
-import org.jboss.remoting.spi.remote.Handle;
-import org.jboss.remoting.spi.SpiUtils;
-import org.jboss.remoting.spi.AbstractAutoCloseable;
-import static org.jboss.remoting.util.CollectionUtil.concurrentIntegerMap;
-import org.jboss.remoting.util.CollectionUtil;
-import org.jboss.remoting.util.ConcurrentIntegerMap;
-import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.Endpoint;
-import org.jboss.remoting.SimpleCloseable;
-import org.jboss.marshalling.MarshallerFactory;
-import org.jboss.marshalling.ByteInput;
-import org.jboss.marshalling.Unmarshaller;
-import org.jboss.marshalling.ByteOutput;
-import org.jboss.marshalling.Marshaller;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.nio.ByteBuffer;
-import java.nio.BufferUnderflowException;
-import java.io.IOException;
-
-/**
- *
- */
-public final class BasicHandler implements IoHandler<AllocatedMessageChannel> {
-
- private static final Logger log = Logger.getLogger(BasicHandler.class);
-
- //--== Connection configuration items ==--
- private final MarshallerFactory marshallerFactory;
- private final int linkMetric;
- private final Executor executor;
- private final ClassLoader classLoader;
- // buffer allocator for outbound message assembly
- private final BufferAllocator<ByteBuffer> allocator;
-
- // running on remote node
- private final ConcurrentIntegerMap<ReplyHandler> remoteRequests = concurrentIntegerMap();
- // running on local node
- private final ConcurrentIntegerMap<RemoteRequestContext> localRequests = concurrentIntegerMap();
- // sequence for remote requests
- private final AtomicInteger requestSequence = new AtomicInteger();
-
- // clients whose requests get forwarded to the remote side
- // even #s were opened from services forwarded to us (our sequence)
- // odd #s were forwarded directly to us (remote sequence)
- private final ConcurrentIntegerMap<RequestHandler> remoteClients = concurrentIntegerMap();
- // forwarded to remote side (handled on this side)
- private final ConcurrentIntegerMap<Handle<RequestHandler>> forwardedClients = concurrentIntegerMap();
- // sequence for forwarded clients (unsigned; shift left one bit, add one)
- private final AtomicInteger forwardedClientSequence = new AtomicInteger();
- // sequence for clients created from services forwarded to us (unsigned; shift left one bit)
- private final AtomicInteger remoteClientSequence = new AtomicInteger();
-
- // services forwarded to us
- private final ConcurrentIntegerMap<RequestHandlerSource> remoteServices = concurrentIntegerMap();
- // forwarded to remote side (handled on this side)
- private final ConcurrentIntegerMap<Handle<RequestHandlerSource>> forwardedServices = concurrentIntegerMap();
- // sequence for forwarded services
- private final AtomicInteger serviceSequence = new AtomicInteger();
-
- private volatile AllocatedMessageChannel channel;
-
- public BasicHandler(final RemotingChannelConfiguration configuration) {
- allocator = configuration.getAllocator();
- executor = configuration.getExecutor();
- classLoader = configuration.getClassLoader();
- marshallerFactory = configuration.getMarshallerFactory();
- linkMetric = configuration.getLinkMetric();
- }
-
- public void handleOpened(final AllocatedMessageChannel channel) {
- channel.resumeReads();
- }
-
- public void handleReadable(final AllocatedMessageChannel channel) {
- for (;;) try {
- final ByteBuffer buffer;
- try {
- buffer = channel.receive();
- } catch (IOException e) {
- log.error(e, "I/O error in protocol channel; closing channel");
- IoUtils.safeClose(channel);
- return;
- }
- if (buffer == null) {
- // todo release all handles...
- // todo what if the write queue is not empty?
- IoUtils.safeClose(channel);
- return;
- }
- if (! buffer.hasRemaining()) {
- // would block
- channel.resumeReads();
- return;
- }
- final MessageType msgType;
- try {
- msgType = MessageType.getMessageType(buffer.get() & 0xff);
- } catch (IllegalArgumentException ex) {
- log.trace("Received invalid message type");
- return;
- }
- log.trace("Received message %s, type %s", buffer, msgType);
- switch (msgType) {
- case REQUEST_ONEWAY: {
- final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle = forwardedClients.get(clientId);
- if (handle == null) {
- log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
- return;
- }
- final Object payload;
- try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
- try {
- unmarshaller.start(createByteInput(buffer, true));
- try {
- payload = unmarshaller.readObject();
- } catch (ClassNotFoundException e) {
- log.trace("Class not found in one-way request for client ID %d", Integer.valueOf(clientId));
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException ex) {
- log.error(ex, "Failed to unmarshal a one-way request");
- break;
- }
- final RequestHandler requestHandler = handle.getResource();
- try {
- requestHandler.receiveRequest(payload);
- } catch (Throwable t) {
- log.error(t, "One-way request handler unexpectedly threw an exception");
- }
- break;
- }
- case REQUEST: {
- final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle = forwardedClients.get(clientId);
- if (handle == null) {
- log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
- break;
- }
- final int requestId = buffer.getInt();
- final Object payload;
- try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
- try {
- unmarshaller.start(createByteInput(buffer, true));
- try {
- payload = unmarshaller.readObject();
- } catch (ClassNotFoundException e) {
- log.trace("Class not found in request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
- // todo - send request receive failed message
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException ex) {
- log.trace("Failed to unmarshal a request (%s), sending %s", ex, MessageType.REQUEST_RECEIVE_FAILED);
- // todo send a request failure message
- break;
- }
- final RequestHandler requestHandler = handle.getResource();
- requestHandler.receiveRequest(payload, new ReplyHandlerImpl(channel, requestId, allocator));
- break;
- }
- case REPLY: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
- break;
- }
- final Object payload;
- try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
- try {
- unmarshaller.start(createByteInput(buffer, true));
- try {
- payload = unmarshaller.readObject();
- } catch (ClassNotFoundException e) {
- replyHandler.handleException("Reply unmarshalling failed", e);
- log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException ex) {
- log.trace("Failed to unmarshal a reply (%s), sending a ReplyException");
- // todo
- SpiUtils.safeHandleException(replyHandler, null, null);
- break;
- }
- SpiUtils.safeHandleReply(replyHandler, payload);
- break;
- }
- case CANCEL_REQUEST: {
- final int requestId = buffer.getInt();
- final RemoteRequestContext context = localRequests.get(requestId);
- if (context != null) {
- context.cancel();
- }
- break;
- }
- case CANCEL_ACK: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.get(requestId);
- if (replyHandler != null) {
- replyHandler.handleCancellation();
- }
- break;
- }
- case REQUEST_RECEIVE_FAILED: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
- break;
- }
- final String reason = readUTFZ(buffer);
- // todo - throw a new ReplyException
- break;
- }
- case REQUEST_FAILED: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
- break;
- }
- final Throwable cause;
- try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
- try {
- unmarshaller.start(createByteInput(buffer, true));
- try {
- cause = (Throwable) unmarshaller.readObject();
- } catch (ClassNotFoundException e) {
- replyHandler.handleException("Exception reply unmarshalling failed", e);
- log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
- break;
- } catch (ClassCastException e) {
- // todo - report a generic exception
- SpiUtils.safeHandleException(replyHandler, null, null);
- break;
- }
- } finally {
- IoUtils.safeClose(unmarshaller);
- }
- } catch (IOException ex) {
- log.trace("Failed to unmarshal an exception reply (%s), sending a generic execution exception");
- // todo
- SpiUtils.safeHandleException(replyHandler, null, null);
- break;
- }
- // todo - wrap with REE
- SpiUtils.safeHandleException(replyHandler, null, cause);
- break;
- }
- case REQUEST_OUTCOME_UNKNOWN: {
- final int requestId = buffer.getInt();
- final ReplyHandler replyHandler = remoteRequests.remove(requestId);
- if (replyHandler == null) {
- log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
- break;
- }
- final String reason = readUTFZ(buffer);
- // todo - throw a new IndetermOutcomeEx
- break;
- }
- case CLIENT_CLOSE: {
- final int clientId = buffer.getInt();
- final Handle<RequestHandler> handle = forwardedClients.remove(clientId);
- if (handle == null) {
- log.warn("Got client close message for unknown client %d", Integer.valueOf(clientId));
- break;
- }
- IoUtils.safeClose(handle);
- break;
- }
- case CLIENT_OPEN: {
- final int serviceId = buffer.getInt();
- final int clientId = buffer.getInt();
- final Handle<RequestHandlerSource> handle = forwardedServices.get(serviceId);
- if (handle == null) {
- log.warn("Received client open message for unknown service %d", Integer.valueOf(serviceId));
- break;
- }
- try {
- final RequestHandlerSource requestHandlerSource = handle.getResource();
- final Handle<RequestHandler> clientHandle = requestHandlerSource.createRequestHandler();
- // todo check for duplicate
- // todo validate the client ID
- log.trace("Opening client %d from service %d", Integer.valueOf(clientId), Integer.valueOf(serviceId));
- forwardedClients.put(clientId, clientHandle);
- } catch (IOException ex) {
- log.error(ex, "Failed to create a request handler for client ID %d", Integer.valueOf(clientId));
- break;
- } finally {
- IoUtils.safeClose(handle);
- }
- break;
- }
- case SERVICE_CLOSE: {
- final Handle<RequestHandlerSource> handle = forwardedServices.remove(buffer.getInt());
- if (handle == null) {
- break;
- }
- IoUtils.safeClose(handle);
- break;
- }
- case SERVICE_ADVERTISE: {
- final int serviceId = buffer.getInt();
- final String serviceType = readUTFZ(buffer);
- final String groupName = readUTFZ(buffer);
- final String endpointName = readUTFZ(buffer);
- final int baseMetric = buffer.getInt();
- Endpoint endpoint = null;
- int id = -1;
- final RequestHandlerSource handlerSource = new RequestHandlerSourceImpl(allocator, id);
- final int calcMetric = baseMetric + linkMetric;
- if (calcMetric > 0) {
- try {
- final SimpleCloseable closeable = endpoint.registerRemoteService(serviceType, groupName, endpointName, handlerSource, calcMetric);
- // todo - something with that closeable
- } catch (IOException e) {
- log.error(e, "Unable to register remote service");
- }
- }
- break;
- }
- case SERVICE_UNADVERTISE: {
- final int serviceId = buffer.getInt();
- IoUtils.safeClose(remoteServices.get(serviceId));
- break;
- }
- default: {
- log.trace("Received invalid message type %s", msgType);
- }
- }
- } catch (BufferUnderflowException e) {
- log.error(e, "Malformed packet");
- }
- }
-
- public void handleWritable(final AllocatedMessageChannel channel) {
- for (;;) {
- final WriteHandler handler = outputQueue.peek();
- if (handler == null) {
- return;
- }
- try {
- if (handler.handleWrite(channel)) {
- log.trace("Handled write with handler %s", handler);
- pending.decrementAndGet();
- outputQueue.remove();
- } else {
- channel.resumeWrites();
- return;
- }
- } catch (Throwable t) {
- pending.decrementAndGet();
- outputQueue.remove();
- }
- }
- }
-
- public void handleClosed(final AllocatedMessageChannel channel) {
- }
-
- RequestHandlerSource getRemoteService(final int id) {
- return new RequestHandlerSourceImpl(allocator, id);
- }
-
- private final class ReplyHandlerImpl implements ReplyHandler {
-
- private final AllocatedMessageChannel channel;
- private final int requestId;
- private final BufferAllocator<ByteBuffer> allocator;
-
- private ReplyHandlerImpl(final AllocatedMessageChannel channel, final int requestId, final BufferAllocator<ByteBuffer> allocator) {
- if (channel == null) {
- throw new NullPointerException("channel is null");
- }
- if (allocator == null) {
- throw new NullPointerException("allocator is null");
- }
- this.channel = channel;
- this.requestId = requestId;
- this.allocator = allocator;
- }
-
- public void handleReply(final Object reply) {
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.REPLY.getId());
- buffer.putInt(requestId);
- try {
- final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller();
- try {
- final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.start(output);
- marshaller.writeObject(reply);
- marshaller.close();
- output.close();
- registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (IOException e) {
- log.error(e, "Failed to send a reply to the remote side");
- } catch (InterruptedException e) {
- log.error(e, "Reply handler thread interrupted before a reply could be sent");
- Thread.currentThread().interrupt();
- }
- }
-
- public void handleException(final String msg, final Throwable cause) {
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.REQUEST_FAILED.getId());
- buffer.putInt(requestId);
- try {
- final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller();
- try {
- final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.start(output);
- marshaller.writeObject(cause);
- marshaller.close();
- output.close();
- registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (IOException e) {
- log.error(e, "Failed to send an exception to the remote side");
- } catch (InterruptedException e) {
- log.error(e, "Reply handler thread interrupted before an exception could be sent");
- Thread.currentThread().interrupt();
- }
- }
-
- public void handleCancellation() {
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CANCEL_ACK.getId());
- buffer.putInt(requestId);
- buffer.flip();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- } catch (InterruptedException e) {
- // todo log
- Thread.currentThread().interrupt();
- }
- }
- }
-
- // Writer members
-
- private final BlockingQueue<WriteHandler> outputQueue = CollectionUtil.blockingQueue(64);
- private final AtomicInteger pending = new AtomicInteger();
-
- private void registerWriter(final AllocatedMessageChannel channel, final WriteHandler writeHandler) throws InterruptedException {
- outputQueue.put(writeHandler);
- if (pending.getAndIncrement() == 0) {
- channel.resumeWrites();
- }
- }
-
- private int writeUTFZ(ByteBuffer buffer, CharSequence s) {
- final int len = s.length();
- for (int i = 0; i < len; i++) {
- char c = s.charAt(i);
- if (1 <= c && c < 0x80) {
- if (buffer.hasRemaining()) {
- buffer.put((byte) c);
- } else {
- return i;
- }
- } else if (c < 0x0800) {
- if (buffer.remaining() >= 2) {
- buffer.put((byte) (0xc0 | (c >> 6)));
- buffer.put((byte) (0x80 | (c & 0x3f)));
- } else {
- return i;
- }
- } else {
- if (buffer.remaining() >= 3) {
- buffer.put((byte) (0xe0 | (c >> 12)));
- buffer.put((byte) (0x80 | ((c >> 6) & 0x3f)));
- buffer.put((byte) (0x80 | (c & 0x3f)));
- } else {
- return i;
- }
- }
- }
- if (buffer.hasRemaining()) {
- buffer.put((byte) 0);
- return -1;
- } else {
- return len;
- }
- }
-
- // Reader utils
-
- private String readUTFZ(ByteBuffer buffer) {
- StringBuilder builder = new StringBuilder();
- int state = 0, a = 0;
- while (buffer.hasRemaining()) {
- final int v = buffer.get() & 0xff;
- switch (state) {
- case 0: {
- if (v == 0) {
- return builder.toString();
- } else if (v < 128) {
- builder.append((char) v);
- } else if (192 <= v && v < 224) {
- a = v << 6;
- state = 1;
- } else if (224 <= v && v < 232) {
- a = v << 12;
- state = 2;
- } else {
- builder.append('?');
- }
- break;
- }
- case 1: {
- if (v == 0) {
- builder.append('?');
- return builder.toString();
- } else if (128 <= v && v < 192) {
- a |= v & 0x3f;
- builder.append((char) a);
- } else {
- builder.append('?');
- }
- state = 0;
- break;
- }
- case 2: {
- if (v == 0) {
- builder.append('?');
- return builder.toString();
- } else if (128 <= v && v < 192) {
- a |= (v & 0x3f) << 6;
- state = 1;
- } else {
- builder.append('?');
- state = 0;
- }
- break;
- }
- default:
- throw new IllegalStateException("wrong state");
- }
- }
- return builder.toString();
- }
-
- // client endpoint
-
- private final class RequestHandlerImpl extends AbstractAutoCloseable<RequestHandler> implements RequestHandler {
-
- private final int identifier;
- private final BufferAllocator<ByteBuffer> allocator;
-
- public RequestHandlerImpl(final int identifier, final BufferAllocator<ByteBuffer> allocator) {
- super(executor);
- if (allocator == null) {
- throw new NullPointerException("allocator is null");
- }
- this.identifier = identifier;
- this.allocator = allocator;
- addCloseHandler(new CloseHandler<RequestHandler>() {
- public void handleClose(final RequestHandler closed) {
- remoteClients.remove(identifier, this);
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
- buffer.putInt(identifier);
- buffer.flip();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- } catch (InterruptedException e) {
- log.warn("Client close notification was interrupted before it could be sent");
- }
- }
- });
- }
-
- public void receiveRequest(final Object request) {
- log.trace("Sending outbound one-way request of type %s", request == null ? "null" : request.getClass());
- try {
- final List<ByteBuffer> bufferList;
- final Marshaller marshaller = marshallerFactory.createMarshaller();
- try {
- bufferList = new ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.write(MessageType.REQUEST_ONEWAY.getId());
- marshaller.writeInt(identifier);
- marshaller.writeObject(request);
- marshaller.close();
- output.close();
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
- } catch (InterruptedException e) {
- log.trace(e, "receiveRequest was interrupted");
- Thread.currentThread().interrupt();
- return;
- }
- } catch (Throwable t) {
- // ignore
- log.trace(t, "receiveRequest failed with an exception");
- return;
- }
- }
-
- public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler handler) {
- log.trace("Sending outbound request of type %s", request == null ? "null" : request.getClass());
- try {
- final List<ByteBuffer> bufferList;
- final Marshaller marshaller = marshallerFactory.createMarshaller();
- try {
- bufferList = new ArrayList<ByteBuffer>();
- final ByteOutput output = createByteOutput(allocator, bufferList);
- try {
- marshaller.write(MessageType.REQUEST.getId());
- marshaller.writeInt(identifier);
-
- int id;
- do {
- id = requestSequence.getAndIncrement();
- } while (remoteRequests.putIfAbsent(id, handler) != null);
- marshaller.writeInt(id);
- marshaller.writeObject(request);
- marshaller.close();
- output.close();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- executor.execute(new Runnable() {
- public void run() {
- SpiUtils.safeHandleCancellation(handler);
- }
- });
- return SpiUtils.getBlankRemoteRequestContext();
- }
- log.trace("Sent request %s", request);
- return new RemoteRequestContextImpl(id, allocator, channel);
- } finally {
- IoUtils.safeClose(output);
- }
- } finally {
- IoUtils.safeClose(marshaller);
- }
- } catch (final IOException t) {
- log.trace(t, "receiveRequest failed with an exception");
- executor.execute(new Runnable() {
- public void run() {
- SpiUtils.safeHandleException(handler, "Failed to build request", t);
- }
- });
- return SpiUtils.getBlankRemoteRequestContext();
- }
- }
-
- public String toString() {
- return "forwarding request handler <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
- }
- }
-
- public final class RemoteRequestContextImpl implements RemoteRequestContext {
-
- private final BufferAllocator<ByteBuffer> allocator;
- private final int id;
- private final AllocatedMessageChannel channel;
-
- public RemoteRequestContextImpl(final int id, final BufferAllocator<ByteBuffer> allocator, final AllocatedMessageChannel channel) {
- this.id = id;
- this.allocator = allocator;
- this.channel = channel;
- }
-
- public void cancel() {
- try {
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
- buffer.putInt(id);
- buffer.flip();
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- } catch (InterruptedException e) {
- // todo log that cancel attempt failed
- Thread.currentThread().interrupt();
- } catch (Throwable t) {
- // todo log that cancel attempt failed
- }
- }
- }
-
- public final class RequestHandlerSourceImpl extends AbstractAutoCloseable<RequestHandlerSource> implements RequestHandlerSource {
-
- private final BufferAllocator<ByteBuffer> allocator;
- private final int identifier;
-
- protected RequestHandlerSourceImpl(final BufferAllocator<ByteBuffer> allocator, final int identifier) {
- super(executor);
- this.allocator = allocator;
- this.identifier = identifier;
- addCloseHandler(new CloseHandler<RequestHandlerSource>() {
- public void handleClose(final RequestHandlerSource closed) {
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.SERVICE_CLOSE.getId());
- buffer.putInt(identifier);
- buffer.flip();
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- } catch (InterruptedException e) {
- log.warn("Service close notification was interrupted before it could be sent");
- }
- }
- });
- }
-
- public Handle<RequestHandler> createRequestHandler() throws IOException {
- int id;
- do {
- id = remoteClientSequence.getAndIncrement() << 1;
- } while (remoteClients.putIfAbsent(id, new RequestHandlerImpl(id, BasicHandler.this.allocator)) != null);
- final int clientId = id;
- final ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.CLIENT_OPEN.getId());
- buffer.putInt(identifier);
- buffer.putInt(clientId);
- buffer.flip();
- // todo - probably should bail out if we're interrupted?
- boolean intr = false;
- for (;;) {
- try {
- registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
- try {
- return new RequestHandlerImpl(clientId, allocator).getHandle();
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- } catch (InterruptedException e) {
- intr = true;
- }
- }
- }
-
- public String toString() {
- return "forwarding request handler source <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
- }
- }
-
- public static ByteInput createByteInput(final ByteBuffer buffer, final boolean eof) {
- return new ByteInput() {
- public int read() throws IOException {
- if (buffer.hasRemaining()) {
- return buffer.get() & 0xff;
- } else {
- return eof ? -1 : 0;
- }
- }
-
- public int read(final byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- public int read(final byte[] b, final int off, final int len) throws IOException {
- int r = Math.min(buffer.remaining(), len);
- if (r > 0) {
- buffer.get(b, off, r);
- return r;
- } else {
- return eof ? -1 : 0;
- }
- }
-
- public int available() throws IOException {
- return buffer.remaining();
- }
-
- public long skip(final long n) throws IOException {
- final int cnt = n > (long) Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) n;
- int r = Math.min(buffer.remaining(), cnt);
- if (r > 0) {
- final int oldPos = buffer.position();
- final int newPos = oldPos + r;
- if (newPos < 0) {
- final int lim = buffer.limit();
- buffer.position(lim);
- return lim - oldPos;
- }
- }
- return r;
- }
-
- public void close() {
- }
- };
- }
-
- public static ByteOutput createByteOutput(final BufferAllocator<ByteBuffer> allocator, final Collection<ByteBuffer> target) {
- return new ByteOutput() {
- private ByteBuffer current;
-
- private ByteBuffer getCurrent() {
- final ByteBuffer buffer = current;
- return buffer == null ? (current = allocator.allocate()) : buffer;
- }
-
- public void write(final int i) throws IOException {
- final ByteBuffer buffer = getCurrent();
- buffer.put((byte) i);
- if (! buffer.hasRemaining()) {
- buffer.flip();
- target.add(buffer);
- current = null;
- }
- }
-
- public void write(final byte[] bytes) throws IOException {
- write(bytes, 0, bytes.length);
- }
-
- public void write(final byte[] bytes, int offs, int len) throws IOException {
- while (len > 0) {
- final ByteBuffer buffer = getCurrent();
- final int c = Math.min(len, buffer.remaining());
- buffer.put(bytes, offs, c);
- offs += c;
- len -= c;
- if (! buffer.hasRemaining()) {
- buffer.flip();
- target.add(buffer);
- current = null;
- }
- }
- }
-
- public void close() throws IOException {
- flush();
- }
-
- public void flush() throws IOException {
- final ByteBuffer buffer = current;
- if (buffer != null) {
- buffer.flip();
- target.add(buffer);
- current = null;
- }
- }
- };
- }
-}
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicProtocol.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java 2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicProtocol.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,96 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.remoting.RemotingException;
-import org.jboss.remoting.SimpleCloseable;
-import org.jboss.remoting.spi.remote.RequestHandlerSource;
-import org.jboss.remoting.spi.remote.Handle;
-import org.jboss.xnio.IoHandlerFactory;
-import org.jboss.xnio.ChannelSource;
-import org.jboss.xnio.IoFuture;
-import org.jboss.xnio.AbstractConvertingIoFuture;
-import org.jboss.xnio.IoHandler;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.log.Logger;
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public final class BasicProtocol {
-
- private static final Logger log = Logger.getLogger(BasicProtocol.class);
-
- private BasicProtocol() {
- }
-
- /**
- * Create a request server for the basic protocol.
- *
- * @param executor the executor to use for invocations
- * @param allocator the buffer allocator to use
- * @return a handler factory for passing to an XNIO server
- */
- public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Executor executor, final BufferAllocator<ByteBuffer> allocator) {
- return new IoHandlerFactory<AllocatedMessageChannel>() {
- public IoHandler<? super AllocatedMessageChannel> createHandler() {
- final RemotingChannelConfiguration configuration = new RemotingChannelConfiguration();
- configuration.setAllocator(allocator);
- configuration.setExecutor(executor);
- // todo marshaller factory... etc
- return new BasicHandler(configuration);
- }
- };
- }
-
- /**
- * Create a request client for the basic protocol.
- *
- * @param executor the executor to use for invocations
- * @param channelSource the XNIO channel source to use to establish the connection
- * @param allocator the buffer allocator to use
- * @return a handle which may be used to close the connection
- * @throws IOException if an error occurs
- */
- public static IoFuture<SimpleCloseable> connect(final Executor executor, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
- final RemotingChannelConfiguration configuration = new RemotingChannelConfiguration();
- configuration.setAllocator(allocator);
- configuration.setExecutor(executor);
- // todo marshaller factory... etc
- final BasicHandler basicHandler = new BasicHandler(configuration);
- final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(basicHandler);
- return new AbstractConvertingIoFuture<SimpleCloseable, AllocatedMessageChannel>(futureChannel) {
- protected SimpleCloseable convert(final AllocatedMessageChannel channel) throws RemotingException {
- return new AbstractConnection(executor) {
- public Handle<RequestHandlerSource> getServiceForId(final int id) throws IOException {
- return basicHandler.getRemoteService(id).getHandle();
- }
- };
- }
- };
- }
-}
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/ConfigValue.java 2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,67 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-/**
- *
- */
-public enum ConfigValue {
-
- /**
- * The protocol version to use. Value type is {@code int}.
- */
- PROTOCOL_VERSION(0),
- /**
- * The name of the marshaller to use. Value type is {@code String}.
- */
- MARSHALLER_NAME(1),
- ;
- private final int id;
-
- private ConfigValue(final int id) {
- this.id = id;
- }
-
- /**
- * Get the integer ID for this config value.
- *
- * @return the integer ID
- */
- public int getId() {
- return id;
- }
-
- /**
- * Get the config value for an integer ID.
- *
- * @param id the integer ID
- * @return the config value instance
- */
- public static ConfigValue getConfigValue(final int id) {
- switch (id) {
- case 0: return PROTOCOL_VERSION;
- case 1: return MARSHALLER_NAME;
- default: throw new IllegalArgumentException("Invalid config value ID");
- }
- }
-}
Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/ConfigValue.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+/**
+ *
+ */
+public enum ConfigValue {
+
+ /**
+ * The protocol version to use. Value type is {@code int}.
+ */
+ PROTOCOL_VERSION(0),
+ /**
+ * The name of the marshaller to use. Value type is {@code String}.
+ */
+ MARSHALLER_NAME(1),
+ ;
+ private final int id;
+
+ private ConfigValue(final int id) {
+ this.id = id;
+ }
+
+ /**
+ * Get the integer ID for this config value.
+ *
+ * @return the integer ID
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Get the config value for an integer ID.
+ *
+ * @param id the integer ID
+ * @return the config value instance
+ */
+ public static ConfigValue getConfigValue(final int id) {
+ switch (id) {
+ case 0: return PROTOCOL_VERSION;
+ case 1: return MARSHALLER_NAME;
+ default: throw new IllegalArgumentException("Invalid config value ID");
+ }
+ }
+}
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/ConnectionListener.java 2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,32 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.remoting.SimpleCloseable;
-
-/**
- *
- */
-public interface ConnectionListener {
- void handleOpened(SimpleCloseable connection);
-}
Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/ConnectionListener.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.SimpleCloseable;
+
+/**
+ *
+ */
+public interface ConnectionListener {
+ void handleOpened(SimpleCloseable connection);
+}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,154 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.util.Arrays;
+
+/**
+ *
+ */
+public final class IdentityIntMap<T> {
+
+ private int[] values;
+ private Object[] keys;
+ private int count;
+ private int resizeCount;
+
+ public IdentityIntMap(int initialCapacity, final float loadFactor) {
+ if (initialCapacity < 1) {
+ throw new IllegalArgumentException("initialCapacity must be > 0");
+ }
+ if (loadFactor <= 0.0f || loadFactor >= 1.0f) {
+ throw new IllegalArgumentException("loadFactor must be > 0.0 and < 1.0");
+ }
+ if (initialCapacity < 16) {
+ initialCapacity = 16;
+ } else {
+ // round up
+ final int c = Integer.highestOneBit(initialCapacity) - 1;
+ initialCapacity = Integer.highestOneBit(initialCapacity + c);
+ }
+ keys = new Object[initialCapacity];
+ values = new int[initialCapacity];
+ resizeCount = (int) ((double) initialCapacity * (double) loadFactor);
+ }
+
+ public IdentityIntMap(final float loadFactor) {
+ this(64, loadFactor);
+ }
+
+ public IdentityIntMap(final int initialCapacity) {
+ this(initialCapacity, 0.5f);
+ }
+
+ public IdentityIntMap() {
+ this(0.5f);
+ }
+
+ public int get(T key, int defVal) {
+ if (key == null) {
+ throw new NullPointerException("key is null");
+ }
+ final Object[] keys = this.keys;
+ final int mask = keys.length - 1;
+ int hc = System.identityHashCode(key) & mask;
+ Object v;
+ for (;;) {
+ v = keys[hc];
+ if (v == key) {
+ return values[hc];
+ }
+ if (v == null) {
+ // not found
+ return defVal;
+ }
+ hc = (hc + 1) & mask;
+ }
+ }
+
+ public void put(T key, int value) {
+ if (key == null) {
+ throw new NullPointerException("key is null");
+ }
+ final Object[] keys = this.keys;
+ final int mask = keys.length - 1;
+ final int[] values = this.values;
+ Object v;
+ int hc = System.identityHashCode(key) & mask;
+ for (int idx = hc;; idx = hc++ & mask) {
+ v = keys[idx];
+ if (v == null) {
+ keys[idx] = key;
+ values[idx] = value;
+ if (++count > resizeCount) {
+ resize();
+ }
+ return;
+ }
+ if (v == key) {
+ values[idx] = value;
+ return;
+ }
+ hc++;
+ }
+ }
+
+ private final void resize() {
+ final Object[] oldKeys = keys;
+ final int oldsize = oldKeys.length;
+ final int[] oldValues = values;
+ if (oldsize >= 0x40000000) {
+ throw new IllegalStateException("Table full");
+ }
+ final int newsize = oldsize << 1;
+ final int mask = newsize - 1;
+ final Object[] newKeys = new Object[newsize];
+ final int[] newValues = new int[newsize];
+ keys = newKeys;
+ values = newValues;
+ if ((resizeCount <<= 1) == 0) {
+ resizeCount = Integer.MAX_VALUE;
+ }
+ for (int oi = 0; oi < oldsize; oi ++) {
+ final Object key = oldKeys[oi];
+ if (key != null) {
+ int ni = System.identityHashCode(key) & mask;
+ for (;;) {
+ final Object v = newKeys[ni];
+ if (v == null) {
+ // found
+ newKeys[ni] = key;
+ newValues[ni] = oldValues[oi];
+ break;
+ }
+ ni = (ni + 1) & mask;
+ }
+ }
+ }
+ }
+
+ public void clear() {
+ Arrays.fill(keys, null);
+ count = 0;
+ }
+}
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/MessageType.java 2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,91 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-/**
- * The type of a protocol message.
- */
-public enum MessageType {
-
- // One-way request, no return value may be sent
- REQUEST_ONEWAY(1),
- // Two-way request, return value is expected
- REQUEST(2),
- // Reply
- REPLY(3),
- // Attempt to cancel a request
- CANCEL_REQUEST(4),
- // Acknowledge that a request was cancelled
- CANCEL_ACK(5),
- // Request failed due to protocol or unmarshalling problem
- REQUEST_RECEIVE_FAILED(6),
- // Request failed due to exception
- REQUEST_FAILED(7),
- // Request completed but no reply or exception was sent
- REQUEST_OUTCOME_UNKNOWN(8),
- // Remote side called .close() on a forwarded RequestHandler
- CLIENT_CLOSE(9),
- // Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
- CLIENT_OPEN(10),
- // Remote side called .close() on a forwarded RequestHandlerSource
- SERVICE_CLOSE(11),
- // Remote side brought a new service online
- SERVICE_ADVERTISE(12),
- // Remote side's service is no longer available
- SERVICE_UNADVERTISE(13),
- ;
- private final int id;
-
- private MessageType(int id) {
- this.id = id;
- }
-
- public int getId() {
- return id;
- }
-
- /**
- * Get the message type for an integer ID.
- *
- * @param id the integer ID
- * @return the message type instance
- */
- public static MessageType getMessageType(final int id) {
- switch (id) {
- case 1: return REQUEST_ONEWAY;
- case 2: return REQUEST;
- case 3: return REPLY;
- case 4: return CANCEL_REQUEST;
- case 5: return CANCEL_ACK;
- case 6: return REQUEST_RECEIVE_FAILED;
- case 7: return REQUEST_FAILED;
- case 8: return REQUEST_OUTCOME_UNKNOWN;
- case 9: return CLIENT_CLOSE;
- case 10: return CLIENT_OPEN;
- case 11: return SERVICE_CLOSE;
- case 12: return SERVICE_ADVERTISE;
- case 13: return SERVICE_UNADVERTISE;
- default: throw new IllegalArgumentException("Invalid message type ID");
- }
- }
-}
Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/MessageType.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+/**
+ * The type of a protocol message.
+ */
+public enum MessageType {
+
+ // One-way request, no return value may be sent
+ REQUEST_ONEWAY(1),
+ // Two-way request, return value is expected
+ REQUEST(2),
+ // Reply
+ REPLY(3),
+ // Attempt to cancel a request
+ CANCEL_REQUEST(4),
+ // Acknowledge that a request was cancelled
+ CANCEL_ACK(5),
+ // Request failed due to protocol or unmarshalling problem
+ REQUEST_RECEIVE_FAILED(6),
+ // Request failed due to exception
+ REQUEST_FAILED(7),
+ // Request completed but no reply or exception was sent
+ REQUEST_OUTCOME_UNKNOWN(8),
+ // Remote side called .close() on a forwarded RequestHandler
+ CLIENT_CLOSE(9),
+ // Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
+ CLIENT_OPEN(10),
+ // Remote side called .close() on a forwarded RequestHandlerSource
+ SERVICE_CLOSE(11),
+ // Remote side brought a new service online
+ SERVICE_ADVERTISE(12),
+ // Remote side's service is no longer available
+ SERVICE_UNADVERTISE(13),
+ ;
+ private final int id;
+
+ private MessageType(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Get the message type for an integer ID.
+ *
+ * @param id the integer ID
+ * @return the message type instance
+ */
+ public static MessageType getMessageType(final int id) {
+ switch (id) {
+ case 1: return REQUEST_ONEWAY;
+ case 2: return REQUEST;
+ case 3: return REPLY;
+ case 4: return CANCEL_REQUEST;
+ case 5: return CANCEL_ACK;
+ case 6: return REQUEST_RECEIVE_FAILED;
+ case 7: return REQUEST_FAILED;
+ case 8: return REQUEST_OUTCOME_UNKNOWN;
+ case 9: return CLIENT_CLOSE;
+ case 10: return CLIENT_OPEN;
+ case 11: return SERVICE_CLOSE;
+ case 12: return SERVICE_ADVERTISE;
+ case 13: return SERVICE_UNADVERTISE;
+ default: throw new IllegalArgumentException("Invalid message type ID");
+ }
+ }
+}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,907 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.Connector;
+import org.jboss.xnio.Acceptor;
+import org.jboss.xnio.log.Logger;
+import org.jboss.remoting.spi.remote.RequestHandler;
+import org.jboss.remoting.spi.remote.RequestHandlerSource;
+import org.jboss.remoting.spi.remote.ReplyHandler;
+import org.jboss.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.remoting.spi.remote.Handle;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.spi.AbstractAutoCloseable;
+import org.jboss.remoting.spi.stream.StreamDetector;
+import org.jboss.remoting.spi.stream.StreamSerializerFactory;
+import org.jboss.remoting.spi.stream.StreamProvider;
+import static org.jboss.remoting.util.CollectionUtil.concurrentIntegerMap;
+import org.jboss.remoting.util.CollectionUtil;
+import org.jboss.remoting.util.ConcurrentIntegerMap;
+import org.jboss.remoting.CloseHandler;
+import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.SimpleCloseable;
+import org.jboss.remoting.RemoteExecutionException;
+import org.jboss.remoting.IndeterminateOutcomeException;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.ObjectTable;
+import org.jboss.marshalling.Marshalling;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.io.IOException;
+import java.io.InvalidClassException;
+
+/**
+ * Protocol handler for the basic message-oriented Remoting protocol.
+ *
+ * @param <A> stream channel address type (Void if streams are not supported)
+ */
+public final class MultiplexHandler<A> implements IoHandler<AllocatedMessageChannel> {
+
+ private static final Logger log = Logger.getLogger(MultiplexHandler.class);
+
+ //--== Connection configuration items ==--
+ private final MarshallerFactory marshallerFactory;
+ private final Configuration marshallingConfiguration;
+ private final int linkMetric;
+ private final Executor executor;
+ // buffer allocator for outbound message assembly
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final StreamDetector streamDetector;
+ private final Connector<A, AllocatedMessageChannel> messageConnector;
+ private final Acceptor<A, AllocatedMessageChannel> messageAcceptor;
+ private final Connector<A, StreamChannel> streamConnector;
+ private final Acceptor<A, StreamChannel> streamAcceptor;
+
+ // running on remote node
+ private final ConcurrentIntegerMap<ReplyHandler> remoteRequests = concurrentIntegerMap();
+ // running on local node
+ private final ConcurrentIntegerMap<RemoteRequestContext> localRequests = concurrentIntegerMap();
+ // sequence for remote requests
+ private final AtomicInteger requestSequence = new AtomicInteger();
+
+ // clients whose requests get forwarded to the remote side
+ // even #s were opened from services forwarded to us (our sequence)
+ // odd #s were forwarded directly to us (remote sequence)
+ private final ConcurrentIntegerMap<RequestHandler> remoteClients = concurrentIntegerMap();
+ // forwarded to remote side (handled on this side)
+ private final ConcurrentIntegerMap<Handle<RequestHandler>> forwardedClients = concurrentIntegerMap();
+ // sequence for forwarded clients (unsigned; shift left one bit, add one)
+ private final AtomicInteger forwardedClientSequence = new AtomicInteger();
+ // sequence for clients created from services forwarded to us (unsigned; shift left one bit)
+ private final AtomicInteger remoteClientSequence = new AtomicInteger();
+
+ // services forwarded to us
+ private final ConcurrentIntegerMap<RequestHandlerSource> remoteServices = concurrentIntegerMap();
+ // forwarded to remote side (handled on this side)
+ private final ConcurrentIntegerMap<Handle<RequestHandlerSource>> forwardedServices = concurrentIntegerMap();
+ // sequence for forwarded services
+ private final AtomicInteger serviceSequence = new AtomicInteger();
+
+ private final Endpoint endpoint;
+
+ private volatile AllocatedMessageChannel channel;
+
+ public MultiplexHandler(final Endpoint endpoint, final RemotingChannelConfiguration configuration, final StreamProvider<A> streamProvider) {
+ this.endpoint = endpoint;
+ messageConnector = streamProvider.getMessageChannelConnector();
+ messageAcceptor = streamProvider.getMessageChannelAcceptor();
+ streamConnector = streamProvider.getStreamChannelConnector();
+ streamAcceptor = streamProvider.getStreamChannelAcceptor();
+ allocator = configuration.getAllocator();
+ executor = configuration.getExecutor();
+ marshallerFactory = configuration.getMarshallerFactory();
+ marshallingConfiguration = configuration.getMarshallingConfiguration();
+ linkMetric = configuration.getLinkMetric();
+ streamDetector = configuration.getStreamDetector();
+ }
+
+ public void handleOpened(final AllocatedMessageChannel channel) {
+ channel.resumeReads();
+ }
+
+ public void handleReadable(final AllocatedMessageChannel channel) {
+ for (;;) try {
+ final ByteBuffer buffer;
+ try {
+ buffer = channel.receive();
+ } catch (IOException e) {
+ log.error(e, "I/O error in protocol channel; closing channel");
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (buffer == null) {
+ // todo release all handles...
+ // todo what if the write queue is not empty?
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (! buffer.hasRemaining()) {
+ // would block
+ channel.resumeReads();
+ return;
+ }
+ final MessageType msgType;
+ try {
+ msgType = MessageType.getMessageType(buffer.get() & 0xff);
+ } catch (IllegalArgumentException ex) {
+ log.trace("Received invalid message type");
+ return;
+ }
+ log.trace("Received message %s, type %s", buffer, msgType);
+ switch (msgType) {
+ case REQUEST_ONEWAY: {
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandler> handle = forwardedClients.get(clientId);
+ if (handle == null) {
+ log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
+ return;
+ }
+ final Object payload;
+ try {
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ try {
+ payload = unmarshaller.readObject();
+ unmarshaller.finish();
+ } catch (ClassNotFoundException e) {
+ log.trace("Class not found in one-way request for client ID %d", Integer.valueOf(clientId));
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.error(ex, "Failed to unmarshal a one-way request");
+ break;
+ }
+ final RequestHandler requestHandler = handle.getResource();
+ try {
+ requestHandler.receiveRequest(payload);
+ } catch (Throwable t) {
+ log.error(t, "One-way request handler unexpectedly threw an exception");
+ }
+ break;
+ }
+ case REQUEST: {
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandler> handle = forwardedClients.get(clientId);
+ if (handle == null) {
+ log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
+ break;
+ }
+ final int requestId = buffer.getInt();
+ final Object payload;
+ try {
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ try {
+ payload = unmarshaller.readObject();
+ unmarshaller.finish();
+ } catch (ClassNotFoundException e) {
+ log.trace("Class not found in request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
+ // todo - send request receive failed message
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.trace("Failed to unmarshal a request (%s), sending %s", ex, MessageType.REQUEST_RECEIVE_FAILED);
+ // todo send a request failure message
+ break;
+ }
+ final RequestHandler requestHandler = handle.getResource();
+ requestHandler.receiveRequest(payload, new ReplyHandlerImpl(channel, requestId, allocator));
+ break;
+ }
+ case REPLY: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+ break;
+ }
+ final Object payload;
+ try {
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ try {
+ payload = unmarshaller.readObject();
+ unmarshaller.finish();
+ } catch (ClassNotFoundException e) {
+ replyHandler.handleException(new InvalidClassException("Class not found: " + e.toString()));
+ log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.trace("Failed to unmarshal a reply (%s), sending a ReplyException");
+ // todo
+ SpiUtils.safeHandleException(replyHandler, ex);
+ break;
+ }
+ SpiUtils.safeHandleReply(replyHandler, payload);
+ break;
+ }
+ case CANCEL_REQUEST: {
+ final int requestId = buffer.getInt();
+ final RemoteRequestContext context = localRequests.get(requestId);
+ if (context != null) {
+ context.cancel();
+ }
+ break;
+ }
+ case CANCEL_ACK: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.get(requestId);
+ if (replyHandler != null) {
+ replyHandler.handleCancellation();
+ }
+ break;
+ }
+ case REQUEST_RECEIVE_FAILED: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+ break;
+ }
+ final String reason = readUTFZ(buffer);
+
+ // todo - throw a new ReplyException
+ break;
+ }
+ case REQUEST_FAILED: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+ break;
+ }
+ final Throwable cause;
+ try {
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ try {
+ cause = (Throwable) unmarshaller.readObject();
+ } catch (ClassNotFoundException e) {
+ replyHandler.handleException(new InvalidClassException("Class not found: " + e.toString()));
+ log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
+ break;
+ } catch (ClassCastException e) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred when attempting to unmarshal the cause)"));
+ break;
+ }
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException ex) {
+ log.trace("Failed to unmarshal an exception reply (%s), sending a generic execution exception");
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred when attempting to read the cause)"));
+ break;
+ }
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote execution failed", cause));
+ break;
+ }
+ case REQUEST_OUTCOME_UNKNOWN: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+ break;
+ }
+ final String reason = readUTFZ(buffer);
+ SpiUtils.safeHandleException(replyHandler, new IndeterminateOutcomeException(reason));
+ break;
+ }
+ case CLIENT_CLOSE: {
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandler> handle = forwardedClients.remove(clientId);
+ if (handle == null) {
+ log.warn("Got client close message for unknown client %d", Integer.valueOf(clientId));
+ break;
+ }
+ IoUtils.safeClose(handle);
+ break;
+ }
+ case CLIENT_OPEN: {
+ final int serviceId = buffer.getInt();
+ final int clientId = buffer.getInt();
+ final Handle<RequestHandlerSource> handle = forwardedServices.get(serviceId);
+ if (handle == null) {
+ log.warn("Received client open message for unknown service %d", Integer.valueOf(serviceId));
+ break;
+ }
+ try {
+ final RequestHandlerSource requestHandlerSource = handle.getResource();
+ final Handle<RequestHandler> clientHandle = requestHandlerSource.createRequestHandler();
+ // todo check for duplicate
+ // todo validate the client ID
+ log.trace("Opening client %d from service %d", Integer.valueOf(clientId), Integer.valueOf(serviceId));
+ forwardedClients.put(clientId, clientHandle);
+ } catch (IOException ex) {
+ log.error(ex, "Failed to create a request handler for client ID %d", Integer.valueOf(clientId));
+ break;
+ } finally {
+ IoUtils.safeClose(handle);
+ }
+ break;
+ }
+ case SERVICE_CLOSE: {
+ final Handle<RequestHandlerSource> handle = forwardedServices.remove(buffer.getInt());
+ if (handle == null) {
+ break;
+ }
+ IoUtils.safeClose(handle);
+ break;
+ }
+ case SERVICE_ADVERTISE: {
+ final int serviceId = buffer.getInt();
+ final String serviceType = readUTFZ(buffer);
+ final String groupName = readUTFZ(buffer);
+ final String endpointName = readUTFZ(buffer);
+ final int baseMetric = buffer.getInt();
+ int id = -1;
+ final RequestHandlerSource handlerSource = new RequestHandlerSourceImpl(allocator, id);
+ final int calcMetric = baseMetric + linkMetric;
+ if (calcMetric > 0) {
+ try {
+ final SimpleCloseable closeable = endpoint.registerRemoteService(serviceType, groupName, endpointName, handlerSource, calcMetric);
+ // todo - something with that closeable
+ } catch (IOException e) {
+ log.error(e, "Unable to register remote service");
+ }
+ }
+ break;
+ }
+ case SERVICE_UNADVERTISE: {
+ final int serviceId = buffer.getInt();
+ IoUtils.safeClose(remoteServices.get(serviceId));
+ break;
+ }
+ default: {
+ log.error("Malformed packet received (invalid message type %s)", msgType);
+ }
+ }
+ } catch (BufferUnderflowException e) {
+ log.error("Malformed packet received (buffer underflow)");
+ }
+ }
+
+ public void handleWritable(final AllocatedMessageChannel channel) {
+ for (;;) {
+ final WriteHandler handler = outputQueue.peek();
+ if (handler == null) {
+ return;
+ }
+ try {
+ if (handler.handleWrite(channel)) {
+ log.trace("Handled write with handler %s", handler);
+ pending.decrementAndGet();
+ outputQueue.remove();
+ } else {
+ channel.resumeWrites();
+ return;
+ }
+ } catch (Throwable t) {
+ pending.decrementAndGet();
+ outputQueue.remove();
+ }
+ }
+ }
+
+ public void handleClosed(final AllocatedMessageChannel channel) {
+ }
+
+ RequestHandlerSource getRemoteService(final int id) {
+ return new RequestHandlerSourceImpl(allocator, id);
+ }
+
+ private final class ReplyHandlerImpl implements ReplyHandler {
+
+ private final AllocatedMessageChannel channel;
+ private final int requestId;
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ private ReplyHandlerImpl(final AllocatedMessageChannel channel, final int requestId, final BufferAllocator<ByteBuffer> allocator) {
+ if (channel == null) {
+ throw new NullPointerException("channel is null");
+ }
+ if (allocator == null) {
+ throw new NullPointerException("allocator is null");
+ }
+ this.channel = channel;
+ this.requestId = requestId;
+ this.allocator = allocator;
+ }
+
+ public void handleReply(final Object reply) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.REPLY.getId());
+ buffer.putInt(requestId);
+ try {
+ final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = createByteOutput(allocator, bufferList);
+ try {
+ marshaller.start(output);
+ marshaller.writeObject(reply);
+ marshaller.close();
+ output.close();
+ registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (IOException e) {
+ log.error(e, "Failed to send a reply to the remote side");
+ } catch (InterruptedException e) {
+ log.error(e, "Reply handler thread interrupted before a reply could be sent");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void handleException(final IOException exception) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.REQUEST_FAILED.getId());
+ buffer.putInt(requestId);
+ try {
+ final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = createByteOutput(allocator, bufferList);
+ try {
+ marshaller.start(output);
+ marshaller.writeObject(exception);
+ marshaller.close();
+ output.close();
+ registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (IOException e) {
+ log.error(e, "Failed to send an exception to the remote side");
+ } catch (InterruptedException e) {
+ log.error(e, "Reply handler thread interrupted before an exception could be sent");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void handleCancellation() {
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CANCEL_ACK.getId());
+ buffer.putInt(requestId);
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ // todo log
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ // Writer members
+
+ private final BlockingQueue<WriteHandler> outputQueue = CollectionUtil.blockingQueue(64);
+ private final AtomicInteger pending = new AtomicInteger();
+
+ private void registerWriter(final AllocatedMessageChannel channel, final WriteHandler writeHandler) throws InterruptedException {
+ outputQueue.put(writeHandler);
+ if (pending.getAndIncrement() == 0) {
+ channel.resumeWrites();
+ }
+ }
+
+ // Reader utils
+
+ private String readUTFZ(ByteBuffer buffer) {
+ StringBuilder builder = new StringBuilder();
+ int state = 0, a = 0;
+ while (buffer.hasRemaining()) {
+ final int v = buffer.get() & 0xff;
+ switch (state) {
+ case 0: {
+ if (v == 0) {
+ return builder.toString();
+ } else if (v < 128) {
+ builder.append((char) v);
+ } else if (192 <= v && v < 224) {
+ a = v << 6;
+ state = 1;
+ } else if (224 <= v && v < 232) {
+ a = v << 12;
+ state = 2;
+ } else {
+ builder.append('?');
+ }
+ break;
+ }
+ case 1: {
+ if (v == 0) {
+ builder.append('?');
+ return builder.toString();
+ } else if (128 <= v && v < 192) {
+ a |= v & 0x3f;
+ builder.append((char) a);
+ } else {
+ builder.append('?');
+ }
+ state = 0;
+ break;
+ }
+ case 2: {
+ if (v == 0) {
+ builder.append('?');
+ return builder.toString();
+ } else if (128 <= v && v < 192) {
+ a |= (v & 0x3f) << 6;
+ state = 1;
+ } else {
+ builder.append('?');
+ state = 0;
+ }
+ break;
+ }
+ default:
+ throw new IllegalStateException("wrong state");
+ }
+ }
+ return builder.toString();
+ }
+
+ // client endpoint
+
+ private final class RequestHandlerImpl extends AbstractAutoCloseable<RequestHandler> implements RequestHandler {
+
+ private final int identifier;
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ public RequestHandlerImpl(final int identifier, final BufferAllocator<ByteBuffer> allocator) {
+ super(executor);
+ if (allocator == null) {
+ throw new NullPointerException("allocator is null");
+ }
+ this.identifier = identifier;
+ this.allocator = allocator;
+ addCloseHandler(new CloseHandler<RequestHandler>() {
+ public void handleClose(final RequestHandler closed) {
+ remoteClients.remove(identifier, this);
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
+ buffer.putInt(identifier);
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ log.warn("Client close notification was interrupted before it could be sent");
+ }
+ }
+ });
+ }
+
+ public void receiveRequest(final Object request) {
+ log.trace("Sending outbound one-way request of type %s", request == null ? "null" : request.getClass());
+ try {
+ final List<ByteBuffer> bufferList;
+ final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = createByteOutput(allocator, bufferList);
+ try {
+ marshaller.write(MessageType.REQUEST_ONEWAY.getId());
+ marshaller.writeInt(identifier);
+ marshaller.writeObject(request);
+ marshaller.close();
+ output.close();
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+ } catch (InterruptedException e) {
+ log.trace(e, "receiveRequest was interrupted");
+ Thread.currentThread().interrupt();
+ return;
+ }
+ } catch (Throwable t) {
+ // ignore
+ log.trace(t, "receiveRequest failed with an exception");
+ return;
+ }
+ }
+
+ public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler handler) {
+ log.trace("Sending outbound request of type %s", request == null ? "null" : request.getClass());
+ try {
+ final List<ByteBuffer> bufferList;
+ final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ bufferList = new ArrayList<ByteBuffer>();
+ final ByteOutput output = createByteOutput(allocator, bufferList);
+ try {
+ marshaller.write(MessageType.REQUEST.getId());
+ marshaller.writeInt(identifier);
+
+ int id;
+ do {
+ id = requestSequence.getAndIncrement();
+ } while (remoteRequests.putIfAbsent(id, handler) != null);
+ marshaller.writeInt(id);
+ marshaller.writeObject(request);
+ marshaller.close();
+ output.close();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ executor.execute(new Runnable() {
+ public void run() {
+ SpiUtils.safeHandleCancellation(handler);
+ }
+ });
+ return SpiUtils.getBlankRemoteRequestContext();
+ }
+ log.trace("Sent request %s", request);
+ return new RemoteRequestContextImpl(id, allocator, channel);
+ } finally {
+ IoUtils.safeClose(output);
+ }
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (final IOException t) {
+ log.trace(t, "receiveRequest failed with an exception");
+ executor.execute(new Runnable() {
+ public void run() {
+ SpiUtils.safeHandleException(handler, t);
+ }
+ });
+ return SpiUtils.getBlankRemoteRequestContext();
+ }
+ }
+
+ public String toString() {
+ return "forwarding request handler <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
+ }
+ }
+
+ public final class RemoteRequestContextImpl implements RemoteRequestContext {
+
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final int id;
+ private final AllocatedMessageChannel channel;
+
+ public RemoteRequestContextImpl(final int id, final BufferAllocator<ByteBuffer> allocator, final AllocatedMessageChannel channel) {
+ this.id = id;
+ this.allocator = allocator;
+ this.channel = channel;
+ }
+
+ public void cancel() {
+ try {
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
+ buffer.putInt(id);
+ buffer.flip();
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ // todo log that cancel attempt failed
+ Thread.currentThread().interrupt();
+ } catch (Throwable t) {
+ // todo log that cancel attempt failed
+ }
+ }
+ }
+
+ public final class RequestHandlerSourceImpl extends AbstractAutoCloseable<RequestHandlerSource> implements RequestHandlerSource {
+
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final int identifier;
+
+ protected RequestHandlerSourceImpl(final BufferAllocator<ByteBuffer> allocator, final int identifier) {
+ super(executor);
+ this.allocator = allocator;
+ this.identifier = identifier;
+ addCloseHandler(new CloseHandler<RequestHandlerSource>() {
+ public void handleClose(final RequestHandlerSource closed) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.SERVICE_CLOSE.getId());
+ buffer.putInt(identifier);
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ log.warn("Service close notification was interrupted before it could be sent");
+ }
+ }
+ });
+ }
+
+ public Handle<RequestHandler> createRequestHandler() throws IOException {
+ int id;
+ do {
+ id = remoteClientSequence.getAndIncrement() << 1;
+ } while (remoteClients.putIfAbsent(id, new RequestHandlerImpl(id, MultiplexHandler.this.allocator)) != null);
+ final int clientId = id;
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CLIENT_OPEN.getId());
+ buffer.putInt(identifier);
+ buffer.putInt(clientId);
+ buffer.flip();
+ // todo - probably should bail out if we're interrupted?
+ boolean intr = false;
+ for (;;) {
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ try {
+ return new RequestHandlerImpl(clientId, allocator).getHandle();
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ }
+ }
+
+ public String toString() {
+ return "forwarding request handler source <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
+ }
+ }
+
+ public static ByteOutput createByteOutput(final BufferAllocator<ByteBuffer> allocator, final Collection<ByteBuffer> target) {
+ return new ByteOutput() {
+ private ByteBuffer current;
+
+ private ByteBuffer getCurrent() {
+ final ByteBuffer buffer = current;
+ return buffer == null ? (current = allocator.allocate()) : buffer;
+ }
+
+ public void write(final int i) {
+ final ByteBuffer buffer = getCurrent();
+ buffer.put((byte) i);
+ if (! buffer.hasRemaining()) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+
+ public void write(final byte[] bytes) {
+ write(bytes, 0, bytes.length);
+ }
+
+ public void write(final byte[] bytes, int offs, int len) {
+ while (len > 0) {
+ final ByteBuffer buffer = getCurrent();
+ final int c = Math.min(len, buffer.remaining());
+ buffer.put(bytes, offs, c);
+ offs += c;
+ len -= c;
+ if (! buffer.hasRemaining()) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+ }
+
+ public void close() {
+ flush();
+ }
+
+ public void flush() {
+ final ByteBuffer buffer = current;
+ if (buffer != null) {
+ buffer.flip();
+ target.add(buffer);
+ current = null;
+ }
+ }
+ };
+ }
+
+ public class ProtocolObjectTableWriter implements ObjectTable.Writer {
+
+ public void writeObject(final Marshaller marshaller, final Object o) throws IOException {
+
+ }
+ }
+
+ public class ProtocolObjectTable implements ObjectTable {
+
+ public Writer getObjectWriter(final Object o) throws IOException /* fixed in beta2 */ {
+ if (o instanceof RequestHandler) {
+ final RequestHandler requestHandler = (RequestHandler) o;
+
+ } else if (o instanceof RequestHandlerSource) {
+ final RequestHandlerSource requestHandlerSource = (RequestHandlerSource) o;
+
+ } else {
+ final StreamSerializerFactory ssf = streamDetector.detectStream(o);
+ if (ssf != null) {
+ final IoHandler<? super AllocatedMessageChannel> streamHandler = ssf.getLocalSide(o, new StreamContextImpl(executor, marshallerFactory, marshallingConfiguration));
+ // todo - this should really be the "server" side
+ final IoFuture<AllocatedMessageChannel> futureChannel = messageConnector.connectTo(null, streamHandler);
+
+ }
+ }
+ return null;
+ }
+
+ public Object readObject(final Unmarshaller unmarshaller) throws IOException, ClassNotFoundException {
+ switch (unmarshaller.readByte()) {
+ case 1: {
+ // remote client
+ final int id = unmarshaller.readInt();
+ }
+ case 2: {
+ // remote client source
+ }
+ case 3: {
+ // stream
+ }
+ default: {
+ // invalid
+ }
+ }
+ return null;
+ }
+ }
+}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.RemotingException;
+import org.jboss.remoting.SimpleCloseable;
+import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.spi.remote.RequestHandlerSource;
+import org.jboss.remoting.spi.remote.Handle;
+import org.jboss.remoting.spi.stream.StreamProvider;
+import org.jboss.xnio.IoHandlerFactory;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.AbstractConvertingIoFuture;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public final class MultiplexProtocol {
+
+ private static final Logger log = Logger.getLogger(MultiplexProtocol.class);
+
+ private MultiplexProtocol() {
+ }
+
+ /**
+ * Create a request server for the multiplex protocol.
+ *
+ * @param executor the executor to use for invocations
+ * @param allocator the buffer allocator to use
+ * @return a handler factory for passing to an XNIO server
+ * @param <A> stream channel address type
+ */
+ public static <A> IoHandlerFactory<AllocatedMessageChannel> createServer(final Endpoint endpoint, final Executor executor, final BufferAllocator<ByteBuffer> allocator, final StreamProvider<A> streamProvider) {
+ return new IoHandlerFactory<AllocatedMessageChannel>() {
+ public IoHandler<? super AllocatedMessageChannel> createHandler() {
+ final RemotingChannelConfiguration configuration = new RemotingChannelConfiguration();
+ configuration.setAllocator(allocator);
+ configuration.setExecutor(executor);
+ // todo marshaller factory... etc
+ return new MultiplexHandler<A>(endpoint, configuration, streamProvider);
+ }
+ };
+ }
+
+ /**
+ * Create a request client for the multiplex protocol.
+ *
+ * @return a handle which may be used to close the connection
+ * @throws IOException if an error occurs @param executor the executor to use for invocations
+ * @param channelSource the XNIO channel source to use to establish the connection
+ * @param allocator the buffer allocator to use
+ * @param streamProvider
+ */
+ public static <A> IoFuture<SimpleCloseable> connect(final Endpoint endpoint, final Executor executor, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator, final StreamProvider<A> streamProvider) throws IOException {
+ final RemotingChannelConfiguration configuration = new RemotingChannelConfiguration();
+ configuration.setAllocator(allocator);
+ configuration.setExecutor(executor);
+ // todo marshaller factory... etc
+ final MultiplexHandler multiplexHandler = new MultiplexHandler<A>(endpoint, configuration, streamProvider);
+ final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(multiplexHandler);
+ return new AbstractConvertingIoFuture<SimpleCloseable, AllocatedMessageChannel>(futureChannel) {
+ protected SimpleCloseable convert(final AllocatedMessageChannel channel) throws RemotingException {
+ return new AbstractConnection(executor) {
+ public Handle<RequestHandlerSource> getServiceForId(final int id) throws IOException {
+ return multiplexHandler.getRemoteService(id).getHandle();
+ }
+ };
+ }
+ };
+ }
+}
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/RemotingChannelConfiguration.java 2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,82 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import java.util.concurrent.Executor;
-import java.nio.ByteBuffer;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.marshalling.MarshallerFactory;
-
-/**
- *
- */
-public final class RemotingChannelConfiguration {
- private MarshallerFactory marshallerFactory;
- private int linkMetric;
- private Executor executor;
- private ClassLoader classLoader;
- private BufferAllocator<ByteBuffer> allocator;
-
- public RemotingChannelConfiguration() {
- }
-
- public MarshallerFactory getMarshallerFactory() {
- return marshallerFactory;
- }
-
- public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
- this.marshallerFactory = marshallerFactory;
- }
-
- public int getLinkMetric() {
- return linkMetric;
- }
-
- public void setLinkMetric(final int linkMetric) {
- this.linkMetric = linkMetric;
- }
-
- public Executor getExecutor() {
- return executor;
- }
-
- public void setExecutor(final Executor executor) {
- this.executor = executor;
- }
-
- public ClassLoader getClassLoader() {
- return classLoader;
- }
-
- public void setClassLoader(final ClassLoader classLoader) {
- this.classLoader = classLoader;
- }
-
- public BufferAllocator<ByteBuffer> getAllocator() {
- return allocator;
- }
-
- public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
- this.allocator = allocator;
- }
-}
Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/RemotingChannelConfiguration.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,93 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.util.concurrent.Executor;
+import java.nio.ByteBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Configuration;
+import org.jboss.remoting.spi.stream.StreamDetector;
+
+/**
+ *
+ */
+public final class RemotingChannelConfiguration {
+ private MarshallerFactory marshallerFactory;
+ private Configuration marshallingConfiguration;
+ private int linkMetric;
+ private Executor executor;
+ private BufferAllocator<ByteBuffer> allocator;
+ private StreamDetector streamDetector;
+
+ public RemotingChannelConfiguration() {
+ }
+
+ public MarshallerFactory getMarshallerFactory() {
+ return marshallerFactory;
+ }
+
+ public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
+ this.marshallerFactory = marshallerFactory;
+ }
+
+ public Configuration getMarshallingConfiguration() {
+ return marshallingConfiguration;
+ }
+
+ public void setMarshallingConfiguration(final Configuration marshallingConfiguration) {
+ this.marshallingConfiguration = marshallingConfiguration;
+ }
+
+ public int getLinkMetric() {
+ return linkMetric;
+ }
+
+ public void setLinkMetric(final int linkMetric) {
+ this.linkMetric = linkMetric;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(final Executor executor) {
+ this.executor = executor;
+ }
+
+ public BufferAllocator<ByteBuffer> getAllocator() {
+ return allocator;
+ }
+
+ public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
+ this.allocator = allocator;
+ }
+
+ public StreamDetector getStreamDetector() {
+ return streamDetector;
+ }
+
+ public void setStreamDetector(final StreamDetector streamDetector) {
+ this.streamDetector = streamDetector;
+ }
+}
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/SimpleWriteHandler.java 2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,84 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.log.Logger;
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-/**
- *
- */
-public final class SimpleWriteHandler implements WriteHandler {
- private static final Logger log = Logger.getLogger(SimpleWriteHandler.class);
-
- private final BufferAllocator<ByteBuffer> allocator;
- private final ByteBuffer[] buffers;
-
- public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final List<ByteBuffer> buffers) {
- this.allocator = allocator;
- this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
- logBufferSize();
- }
-
- public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer[] buffers) {
- this.allocator = allocator;
- this.buffers = buffers;
- logBufferSize();
- }
-
- public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer buffer) {
- this.allocator = allocator;
- buffers = new ByteBuffer[] { buffer };
- logBufferSize();
- }
-
- private void logBufferSize() {
- if (log.isTrace()) {
- long t = 0L;
- for (ByteBuffer buf : buffers) {
- t += (long)buf.remaining();
- }
- log.trace("Writing a message of size %d", Long.valueOf(t));
- }
- }
-
- public boolean handleWrite(final WritableMessageChannel channel) {
- boolean done = true;
- try {
- return (done = channel.send(buffers));
- } catch (IOException e) {
- log.trace(e, "Write failed");
- return true;
- } finally {
- if (done) {
- for (ByteBuffer buffer : buffers) {
- allocator.free(buffer);
- }
- }
- }
- }
-}
Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/SimpleWriteHandler.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+/**
+ *
+ */
+public final class SimpleWriteHandler implements WriteHandler {
+ private static final Logger log = Logger.getLogger(SimpleWriteHandler.class);
+
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final ByteBuffer[] buffers;
+
+ public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final List<ByteBuffer> buffers) {
+ this.allocator = allocator;
+ this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
+ logBufferSize();
+ }
+
+ public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer[] buffers) {
+ this.allocator = allocator;
+ this.buffers = buffers;
+ logBufferSize();
+ }
+
+ public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer buffer) {
+ this.allocator = allocator;
+ buffers = new ByteBuffer[] { buffer };
+ logBufferSize();
+ }
+
+ private void logBufferSize() {
+ if (log.isTrace()) {
+ long t = 0L;
+ for (ByteBuffer buf : buffers) {
+ t += (long)buf.remaining();
+ }
+ log.trace("Writing a message of size %d", Long.valueOf(t));
+ }
+ }
+
+ public boolean handleWrite(final WritableMessageChannel channel) {
+ boolean done = true;
+ try {
+ return (done = channel.send(buffers));
+ } catch (IOException e) {
+ log.trace(e, "Write failed");
+ return true;
+ } finally {
+ if (done) {
+ for (ByteBuffer buffer : buffers) {
+ allocator.free(buffer);
+ }
+ }
+ }
+ }
+}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/StreamContextImpl.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/StreamContextImpl.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/StreamContextImpl.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.stream.StreamContext;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.MarshallerFactory;
+import java.util.concurrent.Executor;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class StreamContextImpl implements StreamContext {
+
+ private final Executor executor;
+ private final MarshallerFactory marshallerFactory;
+ private final Configuration marshallerConfiguration;
+
+ StreamContextImpl(final Executor executor, final MarshallerFactory marshallerFactory, final Configuration marshallerConfiguration) {
+ this.executor = executor;
+ this.marshallerFactory = marshallerFactory;
+ this.marshallerConfiguration = marshallerConfiguration;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public Marshaller createMarshaller() throws IOException {
+ return marshallerFactory.createMarshaller(marshallerConfiguration);
+ }
+
+ public Unmarshaller createUnmarshaller() throws IOException {
+ return marshallerFactory.createUnmarshaller(marshallerConfiguration);
+ }
+}
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/WriteHandler.java 2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,32 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-/**
- *
- */
-public interface WriteHandler {
- boolean handleWrite(WritableMessageChannel channel);
-}
Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/WriteHandler.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+/**
+ *
+ */
+public interface WriteHandler {
+ boolean handleWrite(WritableMessageChannel channel);
+}
Copied: remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex (from rev 4573, remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic)
Deleted: remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/ConnectionTestCase.java 2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,81 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import junit.framework.TestCase;
-import org.jboss.remoting.core.EndpointImpl;
-import org.jboss.remoting.test.support.LoggingHelper;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.Xnio;
-import org.jboss.xnio.CloseableExecutor;
-import org.jboss.xnio.nio.NioXnio;
-
-/**
- *
- */
-public final class ConnectionTestCase extends TestCase {
- static {
- LoggingHelper.init();
- }
-
- public void testConnection() throws Throwable {
- final String REQUEST = "request";
- final String REPLY = "reply";
- final List<Throwable> problems = Collections.synchronizedList(new LinkedList<Throwable>());
- final CloseableExecutor closeableExecutor = IoUtils.closeableExecutor(Executors.newCachedThreadPool(), 500L, TimeUnit.MILLISECONDS);
- try {
- final BufferAllocator<ByteBuffer> allocator = new BufferAllocator<ByteBuffer>() {
- public ByteBuffer allocate() {
- return ByteBuffer.allocate(1024);
- }
-
- public void free(final ByteBuffer buffer) {
- }
- };
- final Xnio xnio = NioXnio.create();
- try {
- final EndpointImpl endpoint = new EndpointImpl();
- endpoint.setExecutor(closeableExecutor);
- endpoint.start();
- try {
- } finally {
- endpoint.stop();
- }
- } finally {
- IoUtils.safeClose(xnio);
- }
- } finally {
- IoUtils.safeClose(closeableExecutor);
- }
- for (Throwable t : problems) {
- throw t;
- }
- }
-}
Copied: remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java (from rev 4600, remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/ConnectionTestCase.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,81 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import junit.framework.TestCase;
+import org.jboss.remoting.core.EndpointImpl;
+import org.jboss.remoting.test.support.LoggingHelper;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Xnio;
+import org.jboss.xnio.CloseableExecutor;
+import org.jboss.xnio.nio.NioXnio;
+
+/**
+ *
+ */
+public final class ConnectionTestCase extends TestCase {
+ static {
+ LoggingHelper.init();
+ }
+
+ public void testConnection() throws Throwable {
+ final String REQUEST = "request";
+ final String REPLY = "reply";
+ final List<Throwable> problems = Collections.synchronizedList(new LinkedList<Throwable>());
+ final CloseableExecutor closeableExecutor = IoUtils.closeableExecutor(Executors.newCachedThreadPool(), 500L, TimeUnit.MILLISECONDS);
+ try {
+ final BufferAllocator<ByteBuffer> allocator = new BufferAllocator<ByteBuffer>() {
+ public ByteBuffer allocate() {
+ return ByteBuffer.allocate(1024);
+ }
+
+ public void free(final ByteBuffer buffer) {
+ }
+ };
+ final Xnio xnio = NioXnio.create();
+ try {
+ final EndpointImpl endpoint = new EndpointImpl();
+ endpoint.setExecutor(closeableExecutor);
+ endpoint.start();
+ try {
+ } finally {
+ endpoint.stop();
+ }
+ } finally {
+ IoUtils.safeClose(xnio);
+ }
+ } finally {
+ IoUtils.safeClose(closeableExecutor);
+ }
+ for (Throwable t : problems) {
+ throw t;
+ }
+ }
+}
Modified: remoting3/trunk/testing-support/src/main/resources/testing.policy
===================================================================
--- remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-10-21 02:39:24 UTC (rev 4601)
@@ -15,7 +15,6 @@
{
permission java.lang.RuntimePermission "modifyThread"; // for executor control
permission java.net.SocketPermission "*:*", "accept, connect, resolve";
- permission java.util.PropertyPermission "xnio.provider", "read"; // todo - fixed in XNIO trunk...
};
// Permissions for Remoting itself
@@ -27,14 +26,11 @@
grant codeBase "file:${build.home}/core/target/main/classes/-"
{
- // TODO: this is for the marshallers, which ought to be in their own module/module set
- permission java.io.SerializablePermission "enableSubstitution";
permission java.util.PropertyPermission "jboss.remoting.*", "read";
};
grant codeBase "file:${build.home}/protocol/basic/target/main/classes/-"
{
- permission java.net.SocketPermission "*:*", "accept, connect, resolve"; // todo - need a better solution
permission java.util.PropertyPermission "jboss.remoting.*", "read";
};
@@ -66,7 +62,7 @@
permission java.security.AllPermission;
};
-grant codeBase "file:${lib.xnio-standalone.local}"
+grant codeBase "file:${lib.xnio-nio.local}"
{
permission java.security.AllPermission;
};
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/OrderedExecutor.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/OrderedExecutor.java 2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/OrderedExecutor.java 2008-10-21 02:39:24 UTC (rev 4601)
@@ -24,6 +24,7 @@
import java.util.concurrent.Executor;
import java.util.LinkedList;
+import org.jboss.xnio.log.Logger;
/**
* An executor that always runs all tasks in order, using a delegate executor to run the tasks.
@@ -32,6 +33,8 @@
* same method, will result in B's task running after A's.
*/
public final class OrderedExecutor implements Executor {
+ private static final Logger log = Logger.getLogger(OrderedExecutor.class);
+
// @protectedby tasks
private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
// @protectedby tasks
@@ -60,7 +63,7 @@
try {
task.run();
} catch (Throwable t) {
- // eat it!
+ log.error(t, "Runnable task %s failed", task);
}
}
}
@@ -77,7 +80,15 @@
tasks.add(command);
if (! running) {
running = true;
- parent.execute(runner);
+ boolean ok = false;
+ try {
+ parent.execute(runner);
+ ok = true;
+ } finally {
+ if (! ok) {
+ running = false;
+ }
+ }
}
}
}
16 years, 2 months
JBoss Remoting SVN: r4600 - remoting3/trunk/api/src/main/resources/META-INF.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-10-20 09:51:33 -0400 (Mon, 20 Oct 2008)
New Revision: 4600
Modified:
remoting3/trunk/api/src/main/resources/META-INF/jboss-classloading.xml
Log:
Use the same naming convention as other jbossas projects
Modified: remoting3/trunk/api/src/main/resources/META-INF/jboss-classloading.xml
===================================================================
--- remoting3/trunk/api/src/main/resources/META-INF/jboss-classloading.xml 2008-10-17 04:10:05 UTC (rev 4599)
+++ remoting3/trunk/api/src/main/resources/META-INF/jboss-classloading.xml 2008-10-20 13:51:33 UTC (rev 4600)
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
-<classloading name="JBossRemotingAPI" version="3.0.0" xmlns="urn:jboss:classloading:1.0">
+<classloading name="jboss-remoting-api" version="3.0.0" xmlns="urn:jboss:classloading:1.0">
<capabilities>
<package name="org.jboss.remoting" version="3.0.0"/>
<package name="org.jboss.remoting.spi" version="3.0.0"/>
@@ -10,7 +10,7 @@
<package name="org.jboss.remoting.stream" version="3.0.0"/>
</capabilities>
<requirements>
- <module name="XnioAPI" from-inclusive="true" from="1.1.0" reExport="true"/>
- <module name="MarshallingAPI" from-inclusive="true" from="1.0.0" reExport="true"/>
+ <module name="xnio-api" from-inclusive="true" from="1.1.0" reExport="true"/>
+ <module name="marshalling-api" from-inclusive="true" from="1.0.0" reExport="true"/>
</requirements>
</classloading>
16 years, 2 months
JBoss Remoting SVN: r4599 - remoting3/trunk/util/src/main/java/org/jboss/remoting/util.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-10-17 00:10:05 -0400 (Fri, 17 Oct 2008)
New Revision: 4599
Modified:
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java
Log:
javadocs
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java 2008-10-14 06:04:22 UTC (rev 4598)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java 2008-10-17 04:10:05 UTC (rev 4599)
@@ -32,6 +32,12 @@
private CollectionUtil() {
}
+ /**
+ * Create an enum map for the given key type.
+ *
+ * @param keyType the key type
+ * @return the new map
+ */
public static <K extends Enum<K>, V> EnumMap<K, V> enumMap(Class<K> keyType) {
return new EnumMap<K, V>(keyType);
}
@@ -63,6 +69,11 @@
return new ConcurrentReferenceHashMap<K, V>(16, STRONG, WEAK);
}
+ /**
+ * Create a concurrent integer-keyed map.
+ *
+ * @return a concurrent integer-keyed map
+ */
public static <V> ConcurrentIntegerMap<V> concurrentIntegerMap() {
return new EmulatedConcurrentIntegerHashMap<V>(CollectionUtil.<Integer, V>concurrentMap());
}
@@ -227,6 +238,53 @@
}
/**
+ * Create an immutable map entry.
+ *
+ * @param key the key
+ * @param value the value
+ * @return the entry
+ */
+ public static <K, V> Map.Entry<K, V> entry(final K key, final V value) {
+ return new Map.Entry<K, V>() {
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+ public V setValue(final V value) {
+ throw new UnsupportedOperationException("setValue");
+ }
+ };
+ }
+
+ /**
+ * Create a prepopulated hash map. The map will be sized for the number of elements given.
+ *
+ * @param entries the map entries
+ * @return the prepopulated hash map
+ */
+ public static <K, V> Map<K, V> hashMap(Map.Entry<K, V>... entries) {
+ final Map<K, V> map = new HashMap<K, V>(entries.length);
+ for (Map.Entry<K,V> e : entries) {
+ map.put(e.getKey(), e.getValue());
+ }
+ return map;
+ }
+
+ /**
+ * Create an unmodifiable prepopulated hash map.
+ *
+ * @param entries the map entries
+ * @return the unmodifiable prepopulated hash map
+ */
+ public static <K, V> Map<K, V> unmodifiableHashMap(Map.Entry<K, V>... entries) {
+ return Collections.unmodifiableMap(hashMap(entries));
+ }
+
+ /**
* Create an {@code Iterable} view of another {@code Iterable} that exposes no other methods.
*
* @param original the wrapped instance
@@ -563,6 +621,11 @@
}
}
+ /**
+ * Get the empty iterable.
+ *
+ * @return the empty iterable
+ */
@SuppressWarnings ({"unchecked"})
public static <T> Iterable<T> emptyIterable() {
return (Iterable<T>) EMPTY_ITERABLE;
@@ -576,6 +639,11 @@
}
}
+ /**
+ * Get the empty iterator.
+ *
+ * @return the empty iterator
+ */
@SuppressWarnings ({"unchecked"})
public static <T> Iterator<T> emptyIterator() {
return (Iterator<T>) EMPTY_ITERATOR;
@@ -599,6 +667,12 @@
}
+ /**
+ * Get a reversed view of a list iterator.
+ *
+ * @param original the original iterator
+ * @return the reversed view
+ */
public static <T> ListIterator<T> reverse(ListIterator<T> original) {
if (original instanceof ReverseListIterator) {
return ((ReverseListIterator<T>)original).original;
@@ -607,6 +681,12 @@
}
}
+ /**
+ * Get an iterable reversed view of a list.
+ *
+ * @param list the list
+ * @return the reversed view
+ */
public static <T> Iterable<T> reverse(final List<T> list) {
return new Iterable<T>() {
public Iterator<T> iterator() {
16 years, 2 months
JBoss Remoting SVN: r4598 - remoting3/trunk.
by jboss-remoting-commits@lists.jboss.org
Author: trustin
Date: 2008-10-14 02:04:22 -0400 (Tue, 14 Oct 2008)
New Revision: 4598
Modified:
remoting3/trunk/build.xml
Log:
Fixed a problem where 'ant clean' doesn't clean all directories (i.e. srp and version)
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-10-08 23:36:42 UTC (rev 4597)
+++ remoting3/trunk/build.xml 2008-10-14 06:04:22 UTC (rev 4598)
@@ -1360,7 +1360,7 @@
<target name="all" description="Build everything" depends="all-core,all-jars,api-javadoc"/>
- <target name="clean" description="Clean out all build files" depends="clean-core,clean-http"/>
+ <target name="clean" description="Clean out all build files" depends="clean-core,clean-http,version.clean,srp.clean"/>
<target name="test" description="Run all tests" depends="api.test,core.test,protocol.basic.test"/>
16 years, 2 months
JBoss Remoting SVN: r4597 - in remoting3/trunk: core/src/main/java/org/jboss/remoting/core and 1 other directory.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-10-08 19:36:42 -0400 (Wed, 08 Oct 2008)
New Revision: 4597
Modified:
remoting3/trunk/api/src/main/resources/META-INF/jboss-classloading.xml
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
Log:
2 minor fixes
Modified: remoting3/trunk/api/src/main/resources/META-INF/jboss-classloading.xml
===================================================================
--- remoting3/trunk/api/src/main/resources/META-INF/jboss-classloading.xml 2008-10-07 22:57:23 UTC (rev 4596)
+++ remoting3/trunk/api/src/main/resources/META-INF/jboss-classloading.xml 2008-10-08 23:36:42 UTC (rev 4597)
@@ -11,6 +11,6 @@
</capabilities>
<requirements>
<module name="XnioAPI" from-inclusive="true" from="1.1.0" reExport="true"/>
- <module name="RiverAPI" from-inclusive="true" from="1.0.0" reExport="true"/>
+ <module name="MarshallingAPI" from-inclusive="true" from="1.0.0" reExport="true"/>
</requirements>
</classloading>
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java 2008-10-07 22:57:23 UTC (rev 4596)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java 2008-10-08 23:36:42 UTC (rev 4597)
@@ -215,7 +215,6 @@
}
public <I, O> IoFuture<ClientSource<I, O>> locateService(final URI serviceUri) throws IllegalArgumentException {
- // todo - should this be typesafe?
if (serviceUri == null) {
throw new NullPointerException("serviceUri is null");
}
16 years, 2 months
JBoss Remoting SVN: r4596 - remoting3/trunk/core/src/main/java/org/jboss/remoting/core.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-10-07 18:57:23 -0400 (Tue, 07 Oct 2008)
New Revision: 4596
Added:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
Modified:
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
Log:
Separated externalizers for Remoting elements
Added: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java 2008-10-07 22:57:23 UTC (rev 4596)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.core;
+
+import org.jboss.marshalling.Externalizer;
+import org.jboss.marshalling.Creator;
+import org.jboss.remoting.spi.remote.RequestHandler;
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+
+/**
+ *
+ */
+public final class ClientExternalizer implements Externalizer {
+
+ private static final long serialVersionUID = 814228455390899997L;
+
+ private final EndpointImpl endpoint;
+
+ ClientExternalizer(final EndpointImpl endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public void writeExternal(final Object o, final ObjectOutput output) throws IOException {
+ output.writeObject(((ClientImpl)o).getRequestHandlerHandle().getResource());
+ }
+
+ public Object createExternal(final Class<?> aClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {
+ final RequestHandler handler = (RequestHandler) input.readObject();
+ return new ClientImpl(handler.getHandle(), endpoint.getExecutor());
+ }
+
+ public void readExternal(final Object o, final ObjectInput input) throws IOException, ClassNotFoundException {
+ // no op
+ }
+}
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java 2008-10-07 03:06:01 UTC (rev 4595)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java 2008-10-07 22:57:23 UTC (rev 4596)
@@ -93,27 +93,7 @@
return "client instance <" + Integer.toString(hashCode()) + ">";
}
- static final class ExternalizerImpl implements Externalizer {
-
- private static final long serialVersionUID = 814228455390899997L;
-
- private final EndpointImpl endpoint;
-
- ExternalizerImpl(final EndpointImpl endpoint) {
- this.endpoint = endpoint;
- }
-
- public void writeExternal(final Object o, final ObjectOutput output) throws IOException {
- output.writeObject(((ClientImpl)o).handle.getResource());
- }
-
- public Object createExternal(final Class<?> aClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {
- final RequestHandler handler = (RequestHandler) input.readObject();
- return new ClientImpl(handler.getHandle(), endpoint.getExecutor());
- }
-
- public void readExternal(final Object o, final ObjectInput input) throws IOException, ClassNotFoundException {
- // no op
- }
+ Handle<RequestHandler> getRequestHandlerHandle() {
+ return handle;
}
}
Added: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java 2008-10-07 22:57:23 UTC (rev 4596)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.core;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.jboss.marshalling.Creator;
+import org.jboss.marshalling.Externalizer;
+import org.jboss.remoting.spi.remote.RequestHandlerSource;
+
+/**
+ *
+ */
+public final class ClientSourceExternalizer implements Externalizer {
+
+ private static final long serialVersionUID = 814228455390899997L;
+
+ private final EndpointImpl endpoint;
+
+ ClientSourceExternalizer(final EndpointImpl endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public void writeExternal(final Object o, final ObjectOutput output) throws IOException {
+ output.writeObject(((ClientSourceImpl) o).getRequestHandlerSourceHandle().getResource());
+ }
+
+ public Object createExternal(final Class<?> aClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {
+ final RequestHandlerSource handler = (RequestHandlerSource) input.readObject();
+ return new ClientSourceImpl(handler.getHandle(), endpoint);
+ }
+
+ public void readExternal(final Object o, final ObjectInput input) throws IOException, ClassNotFoundException {
+ // no op
+ }
+}
\ No newline at end of file
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java 2008-10-07 03:06:01 UTC (rev 4595)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java 2008-10-07 22:57:23 UTC (rev 4596)
@@ -66,4 +66,8 @@
public String toString() {
return "client source instance <" + Integer.toString(hashCode()) + ">";
}
+
+ Handle<RequestHandlerSource> getRequestHandlerSourceHandle() {
+ return handle;
+ }
}
16 years, 2 months
JBoss Remoting SVN: r4595 - in remoting3/trunk: util/src/main/java/org/jboss/remoting/util and 1 other directory.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-10-06 23:06:01 -0400 (Mon, 06 Oct 2008)
New Revision: 4595
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/RemotingChannelConfiguration.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/ConcurrentIntegerMap.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/EmulatedConcurrentIntegerHashMap.java
Log:
Factor out classloaders; simplify concurrent integer map
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandler.java 2008-10-07 00:08:21 UTC (rev 4594)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandler.java 2008-10-07 03:06:01 UTC (rev 4595)
@@ -65,10 +65,9 @@
//--== Connection configuration items ==--
private final MarshallerFactory marshallerFactory;
- private final Configuration configuration;
+ private final Configuration marshallingConfiguration;
private final int linkMetric;
private final Executor executor;
- private final ClassLoader classLoader;
// buffer allocator for outbound message assembly
private final BufferAllocator<ByteBuffer> allocator;
@@ -102,9 +101,8 @@
public BasicHandler(final RemotingChannelConfiguration configuration) {
allocator = configuration.getAllocator();
executor = configuration.getExecutor();
- classLoader = configuration.getClassLoader();
marshallerFactory = configuration.getMarshallerFactory();
- this.configuration = new Configuration();
+ marshallingConfiguration = configuration.getMarshallingConfiguration();
linkMetric = configuration.getLinkMetric();
}
@@ -151,7 +149,7 @@
}
final Object payload;
try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration);
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
try {
unmarshaller.start(createByteInput(buffer, true));
try {
@@ -186,7 +184,7 @@
final int requestId = buffer.getInt();
final Object payload;
try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration);
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
try {
unmarshaller.start(createByteInput(buffer, true));
try {
@@ -218,7 +216,7 @@
}
final Object payload;
try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration);
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
try {
unmarshaller.start(createByteInput(buffer, true));
try {
@@ -277,7 +275,7 @@
}
final Throwable cause;
try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration);
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
try {
unmarshaller.start(createByteInput(buffer, true));
try {
@@ -442,7 +440,7 @@
buffer.put((byte) MessageType.REPLY.getId());
buffer.putInt(requestId);
try {
- final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
+ final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
try {
final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
final ByteOutput output = createByteOutput(allocator, bufferList);
@@ -471,7 +469,7 @@
buffer.put((byte) MessageType.REQUEST_FAILED.getId());
buffer.putInt(requestId);
try {
- final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
+ final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
try {
final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
final ByteOutput output = createByteOutput(allocator, bufferList);
@@ -612,7 +610,7 @@
log.trace("Sending outbound one-way request of type %s", request == null ? "null" : request.getClass());
try {
final List<ByteBuffer> bufferList;
- final Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
+ final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
try {
bufferList = new ArrayList<ByteBuffer>();
final ByteOutput output = createByteOutput(allocator, bufferList);
@@ -646,7 +644,7 @@
log.trace("Sending outbound request of type %s", request == null ? "null" : request.getClass());
try {
final List<ByteBuffer> bufferList;
- final Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
+ final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
try {
bufferList = new ArrayList<ByteBuffer>();
final ByteOutput output = createByteOutput(allocator, bufferList);
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/RemotingChannelConfiguration.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/RemotingChannelConfiguration.java 2008-10-07 00:08:21 UTC (rev 4594)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/RemotingChannelConfiguration.java 2008-10-07 03:06:01 UTC (rev 4595)
@@ -26,15 +26,16 @@
import java.nio.ByteBuffer;
import org.jboss.xnio.BufferAllocator;
import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Configuration;
/**
*
*/
public final class RemotingChannelConfiguration {
private MarshallerFactory marshallerFactory;
+ private Configuration marshallingConfiguration;
private int linkMetric;
private Executor executor;
- private ClassLoader classLoader;
private BufferAllocator<ByteBuffer> allocator;
public RemotingChannelConfiguration() {
@@ -48,6 +49,14 @@
this.marshallerFactory = marshallerFactory;
}
+ public Configuration getMarshallingConfiguration() {
+ return marshallingConfiguration;
+ }
+
+ public void setMarshallingConfiguration(final Configuration marshallingConfiguration) {
+ this.marshallingConfiguration = marshallingConfiguration;
+ }
+
public int getLinkMetric() {
return linkMetric;
}
@@ -64,14 +73,6 @@
this.executor = executor;
}
- public ClassLoader getClassLoader() {
- return classLoader;
- }
-
- public void setClassLoader(final ClassLoader classLoader) {
- this.classLoader = classLoader;
- }
-
public BufferAllocator<ByteBuffer> getAllocator() {
return allocator;
}
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/ConcurrentIntegerMap.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/ConcurrentIntegerMap.java 2008-10-07 00:08:21 UTC (rev 4594)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/ConcurrentIntegerMap.java 2008-10-07 03:06:01 UTC (rev 4595)
@@ -22,25 +22,17 @@
package org.jboss.remoting.util;
-import java.util.Set;
-import java.util.Collection;
-
/**
- *
+ * A concurrent map that is keyed by integer.
*/
public interface ConcurrentIntegerMap<V> {
- boolean containsKey(int key);
- boolean containsValue(Object value);
-
V get(int key);
V put(int key, V value);
V putIfAbsent(int key, V value);
- void putAll(ConcurrentIntegerMap<? extends V> m);
-
V remove(int key);
boolean remove(int key, Object oldValue);
@@ -51,27 +43,9 @@
void clear();
- int size();
-
boolean isEmpty();
- Set<Entry<V>> entrySet();
-
- Collection<V> values();
-
boolean equals(Object other);
int hashCode();
-
- interface Entry<V> {
- int getKey();
-
- V getValue();
-
- V setValue(V value);
-
- int hashCode();
-
- boolean equals(Object other);
- }
}
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/EmulatedConcurrentIntegerHashMap.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/EmulatedConcurrentIntegerHashMap.java 2008-10-07 00:08:21 UTC (rev 4594)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/EmulatedConcurrentIntegerHashMap.java 2008-10-07 03:06:01 UTC (rev 4595)
@@ -22,10 +22,6 @@
package org.jboss.remoting.util;
-import java.util.Set;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
/**
@@ -39,14 +35,6 @@
this.delegate = delegate;
}
- public boolean containsKey(final int key) {
- return delegate.containsKey(Integer.valueOf(key));
- }
-
- public boolean containsValue(final Object value) {
- return delegate.containsValue(value);
- }
-
public V get(final int key) {
return delegate.get(Integer.valueOf(key));
}
@@ -59,10 +47,6 @@
return delegate.putIfAbsent(Integer.valueOf(key), value);
}
- public void putAll(final ConcurrentIntegerMap<? extends V> m) {
- throw new UnsupportedOperationException("maybe later");
- }
-
public V remove(final int key) {
return delegate.remove(Integer.valueOf(key));
}
@@ -83,22 +67,10 @@
delegate.clear();
}
- public int size() {
- return delegate.size();
- }
-
public boolean isEmpty() {
return delegate.isEmpty();
}
- public Set<Entry<V>> entrySet() {
- return new EmulatedEntrySet<V>(delegate.entrySet());
- }
-
- public Collection<V> values() {
- return delegate.values();
- }
-
public boolean equals(final Object obj) {
return super.equals(obj);
}
@@ -106,92 +78,4 @@
public int hashCode() {
return super.hashCode();
}
-
- private static class EmulatedEntrySet<V> implements Set<Entry<V>> {
-
- private final Set<Map.Entry<Integer, V>> delegate;
-
- public EmulatedEntrySet(final Set<Map.Entry<Integer,V>> delegate) {
- this.delegate = delegate;
- }
-
- public int size() {
- return delegate.size();
- }
-
- public boolean isEmpty() {
- return delegate.isEmpty();
- }
-
- public boolean contains(final Object o) {
- // todo
- return false;
- }
-
- public Iterator<Entry<V>> iterator() {
- final Iterator<Map.Entry<Integer, V>> i = delegate.iterator();
- return new Iterator<Entry<V>>() {
- public boolean hasNext() {
- return i.hasNext();
- }
-
- public Entry<V> next() {
- final Map.Entry<Integer, V> n = i.next();
- return new Entry<V>() {
- public int getKey() {
- return n.getKey().intValue();
- }
-
- public V getValue() {
- return n.getValue();
- }
-
- public V setValue(final V value) {
- return n.setValue(value);
- }
- };
- }
-
- public void remove() {
- i.remove();
- }
- };
- }
-
- public Object[] toArray() {
- throw new UnsupportedOperationException("maybe later");
- }
-
- public <T> T[] toArray(final T[] a) {
- throw new UnsupportedOperationException("maybe later");
- }
-
- public boolean add(final Entry<V> o) {
- throw new UnsupportedOperationException("add() not supported");
- }
-
- public boolean remove(final Object o) {
- throw new UnsupportedOperationException("maybe later");
- }
-
- public boolean containsAll(final Collection<?> c) {
- throw new UnsupportedOperationException("maybe later");
- }
-
- public boolean addAll(final Collection<? extends Entry<V>> c) {
- throw new UnsupportedOperationException("addAll() not supported");
- }
-
- public boolean retainAll(final Collection<?> c) {
- throw new UnsupportedOperationException("maybe later");
- }
-
- public boolean removeAll(final Collection<?> c) {
- throw new UnsupportedOperationException("maybe later");
- }
-
- public void clear() {
- delegate.clear();
- }
- }
}
16 years, 2 months
JBoss Remoting SVN: r4594 - in remoting3/trunk: api/src/main/resources/META-INF and 3 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-10-06 20:08:21 -0400 (Mon, 06 Oct 2008)
New Revision: 4594
Modified:
remoting3/trunk/api/src/main/resources/META-INF/jboss-classloading.xml
remoting3/trunk/build.properties
remoting3/trunk/build.xml
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandler.java
remoting3/trunk/testing-support/src/main/resources/testing.policy
Log:
Move to 1.0.0 external marshalling API
Modified: remoting3/trunk/api/src/main/resources/META-INF/jboss-classloading.xml
===================================================================
--- remoting3/trunk/api/src/main/resources/META-INF/jboss-classloading.xml 2008-10-02 05:25:36 UTC (rev 4593)
+++ remoting3/trunk/api/src/main/resources/META-INF/jboss-classloading.xml 2008-10-07 00:08:21 UTC (rev 4594)
@@ -11,6 +11,6 @@
</capabilities>
<requirements>
<module name="XnioAPI" from-inclusive="true" from="1.1.0" reExport="true"/>
- <module name="JBossMarshallingAPI" from-inclusive="true" from="1.0.0" reExport="true"/>
+ <module name="RiverAPI" from-inclusive="true" from="1.0.0" reExport="true"/>
</requirements>
</classloading>
Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties 2008-10-02 05:25:36 UTC (rev 4593)
+++ remoting3/trunk/build.properties 2008-10-07 00:08:21 UTC (rev 4594)
@@ -115,23 +115,21 @@
lib.jboss-managed.local=${local.repository}/${lib.jboss-managed.local-path}
lib.jboss-managed.remote=${remote.repository}/${lib.jboss-managed.remote-path}
-lib.jboss-marshalling-api.version=1.0.0.Alpha.20080923-345
-lib.jboss-marshalling-api.name=marshalling-api.jar
-lib.jboss-marshalling-api.license=lgpl
-lib.jboss-marshalling-api.dir=jboss/marshalling/${lib.jboss-marshalling-api.version}/lib
-lib.jboss-marshalling-api.path=${lib.jboss-marshalling-api.dir}/${lib.jboss-marshalling-api.name}
-lib.jboss-marshalling-api.local=${local.repository}/${lib.jboss-marshalling-api.path}
-lib.jboss-marshalling-api.remote=${remote.repository}/${lib.jboss-marshalling-api.path}
+lib.marshalling-api.version=1.0.0.Beta1
+lib.marshalling-api.name=marshalling-api.jar
+lib.marshalling-api.license=lgpl
+lib.marshalling-api.dir=jboss/marshalling/${lib.marshalling-api.version}/lib
+lib.marshalling-api.path=${lib.marshalling-api.dir}/${lib.marshalling-api.name}
+lib.marshalling-api.local=${local.repository}/${lib.marshalling-api.path}
+lib.marshalling-api.remote=${remote.repository}/${lib.marshalling-api.path}
-lib.jboss-serialization.version=1.1.0-snapshot-r358
-lib.jboss-serialization.name=jboss-serialization.jar
-lib.jboss-serialization.license=lgpl
-lib.jboss-serialization.local-dir=jboss-serialization/${lib.jboss-serialization.version}/lib
-lib.jboss-serialization.remote-dir=jboss/serialization/${lib.jboss-serialization.version}/lib
-lib.jboss-serialization.local-path=${lib.jboss-serialization.local-dir}/${lib.jboss-serialization.name}
-lib.jboss-serialization.remote-path=${lib.jboss-serialization.remote-dir}/${lib.jboss-serialization.name}
-lib.jboss-serialization.local=${local.repository}/${lib.jboss-serialization.local-path}
-lib.jboss-serialization.remote=${remote.repository}/${lib.jboss-serialization.remote-path}
+lib.river.version=${lib.marshalling-api.version}
+lib.river.name=river.jar
+lib.river.license=${lib.marshalling-api.license}
+lib.river.dir=${lib.marshalling-api.dir}
+lib.river.path=${lib.river.dir}/${lib.river.name}
+lib.river.local=${local.repository}/${lib.river.path}
+lib.river.remote=${remote.repository}/${lib.river.path}
lib.jbossxb.version=2.0.0.CR5
lib.jbossxb.name=jboss-xml-binding.jar
@@ -181,7 +179,7 @@
lib.trove.local=${local.repository}/${lib.trove.path}
lib.trove.remote=${remote.repository}/${lib.trove.path}
-lib.xnio.version=1.1.0.Alpha2008072901
+lib.xnio.version=1.1.0.CR1
lib.xnio-api.name=xnio-api-${lib.xnio.version}.jar
lib.xnio-api.license=lgpl
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-10-02 05:25:36 UTC (rev 4593)
+++ remoting3/trunk/build.xml 2008-10-07 00:08:21 UTC (rev 4594)
@@ -127,30 +127,6 @@
<get src="${remote.license.dir}/${lib.jboss-managed.license}.txt" dest="${lib.jboss-managed.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
</target>
- <!-- External library: JBoss Marshalling -->
-
- <target name="lib.jboss-marshalling-api-check">
- <available property="lib.jboss-marshalling-api.exists" file="${lib.jboss-marshalling-api.local}"/>
- </target>
-
- <target name="lib.jboss-marshalling-api" depends="lib.jboss-marshalling-api-check" unless="lib.jboss-marshalling-api.exists">
- <mkdir dir="${local.repository}/${lib.jboss-marshalling-api.dir}"/>
- <get src="${lib.jboss-marshalling-api.remote}" dest="${lib.jboss-marshalling-api.local}" usetimestamp="true" ignoreerrors="false"/>
- <get src="${remote.license.dir}/${lib.jboss-marshalling-api.license}.txt" dest="${lib.jboss-marshalling-api.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
- </target>
-
- <!-- External library: JBoss Serialization -->
-
- <target name="lib.jboss-serialization-check">
- <available property="lib.jboss-serialization.exists" file="${lib.jboss-serialization.local}"/>
- </target>
-
- <target name="lib.jboss-serialization" depends="lib.jboss-serialization-check" unless="lib.jboss-serialization.exists">
- <mkdir dir="${local.repository}/${lib.jboss-serialization.local-dir}"/>
- <get src="${lib.jboss-serialization.remote}" dest="${lib.jboss-serialization.local}" usetimestamp="true" ignoreerrors="false"/>
- <get src="${remote.license.dir}/${lib.jboss-serialization.license}.txt" dest="${lib.jboss-serialization.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
- </target>
-
<!-- External library: JBossXB -->
<target name="lib.jbossxb-check">
@@ -163,6 +139,8 @@
<get src="${remote.license.dir}/${lib.jbossxb.license}.txt" dest="${lib.jbossxb.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
</target>
+ <!-- External library: JUnit -->
+
<target name="lib.junit-check">
<available property="lib.junit.exists" file="${lib.junit.local}"/>
</target>
@@ -173,6 +151,30 @@
<get src="${remote.license.dir}/${lib.junit.license}.txt" dest="${lib.junit.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
</target>
+ <!-- External library: Marshalling API -->
+
+ <target name="lib.marshalling-api-check">
+ <available property="lib.marshalling-api.exists" file="${lib.marshalling-api.local}"/>
+ </target>
+
+ <target name="lib.marshalling-api" depends="lib.marshalling-api-check" unless="lib.marshalling-api.exists">
+ <mkdir dir="${local.repository}/${lib.marshalling-api.dir}"/>
+ <get src="${lib.marshalling-api.remote}" dest="${lib.marshalling-api.local}" usetimestamp="true" ignoreerrors="false"/>
+ <get src="${remote.license.dir}/${lib.marshalling-api.license}.txt" dest="${lib.marshalling-api.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
+ </target>
+
+ <!-- External library: River -->
+
+ <target name="lib.river-check">
+ <available property="lib.river.exists" file="${lib.river.local}"/>
+ </target>
+
+ <target name="lib.river" depends="lib.river-check" unless="lib.river.exists">
+ <mkdir dir="${local.repository}/${lib.river.dir}"/>
+ <get src="${lib.river.remote}" dest="${lib.river.local}" usetimestamp="true" ignoreerrors="false"/>
+ <get src="${remote.license.dir}/${lib.river.license}.txt" dest="${lib.river.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
+ </target>
+
<!-- External library: Servlet API 2.4 -->
<target name="lib.servlet-check">
@@ -248,7 +250,7 @@
debug="true">
<compilerarg value="-Xlint:unchecked"/>
<classpath>
- <pathelement location="${lib.jboss-marshalling-api.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.xnio-api.local}"/>
<path refid="util.classpath"/>
</classpath>
@@ -280,7 +282,7 @@
<path refid="api.classpath"/>
<path refid="testing-support.classpath"/>
<pathelement location="${lib.junit.local}"/>
- <pathelement location="${lib.jboss-marshalling-api.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
@@ -296,6 +298,7 @@
<sysproperty key="build.home" value="${basedir}"/>
<sysproperty key="ant.library.dir" value="${ant.home}/lib"/>
<sysproperty key="lib.junit.local" value="${lib.junit.local}"/>
+ <sysproperty key="lib.marshalling-api.local" value="${lib.marshalling-api.local}"/>
<sysproperty key="lib.xnio-api.local" value="${lib.xnio-api.local}"/>
<jvmarg line="${test.jvmargs}"/>
<formatter type="plain" extension="${extension}"/>
@@ -305,6 +308,7 @@
<path refid="util.classpath"/>
<pathelement location="api/target/test/classes"/>
<pathelement location="${lib.junit.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.xnio-api.local}"/>
</classpath>
<batchtest fork="yes" todir="api/target/test-results"
@@ -325,7 +329,7 @@
<antcall inheritall="true" inheritrefs="true" target="api.test.pseudotarget">
<param name="extension" value="-security.txt"/>
<param name="message" value="Running with security manager"/>
- <param name="test.jvmargs" value="-Djava.security.manager=org.jboss.cx.remoting.test.support.LoggingSecurityManager -Djava.security.policy=${basedir}/testing-support/src/main/resources/testing.policy"/>
+ <param name="test.jvmargs" value="-Djava.security.manager=org.jboss.remoting.test.support.LoggingSecurityManager -Djava.security.policy=${basedir}/testing-support/src/main/resources/testing.policy"/>
</antcall>
</target>
@@ -333,7 +337,7 @@
<delete dir="api/target"/>
</target>
- <target name="api" description="Build the API module" depends="lib.jboss-marshalling-api,lib.xnio-api,util,api.compile">
+ <target name="api" description="Build the API module" depends="lib.marshalling-api,lib.xnio-api,util,api.compile">
<path id="api.classpath">
<pathelement location="api/target/main/classes"/>
</path>
@@ -404,7 +408,7 @@
<path refid="api.classpath"/>
<path refid="util.classpath"/>
<path refid="version.classpath"/>
- <pathelement location="${lib.jboss-marshalling-api.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
@@ -436,7 +440,7 @@
<path refid="core.classpath"/>
<path refid="util.classpath"/>
<path refid="testing-support.classpath"/>
- <pathelement location="${lib.jboss-marshalling-api.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.junit.local}"/>
<pathelement location="${lib.xnio-api.local}"/>
</classpath>
@@ -453,6 +457,7 @@
<sysproperty key="build.home" value="${basedir}"/>
<sysproperty key="ant.library.dir" value="${ant.home}/lib"/>
<sysproperty key="lib.junit.local" value="${lib.junit.local}"/>
+ <sysproperty key="lib.marshalling-api.local" value="${lib.marshalling-api.local}"/>
<sysproperty key="lib.xnio-api.local" value="${lib.xnio-api.local}"/>
<jvmarg line="${test.jvmargs}"/>
<formatter type="plain" extension="${extension}"/>
@@ -463,6 +468,7 @@
<path refid="util.classpath"/>
<pathelement location="core/target/test/classes"/>
<pathelement location="${lib.junit.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.xnio-api.local}"/>
</classpath>
<batchtest fork="yes" todir="core/target/test-results"
@@ -483,7 +489,7 @@
<antcall inheritall="true" inheritrefs="true" target="core.test.pseudotarget">
<param name="extension" value="-security.txt"/>
<param name="message" value="Running with security manager"/>
- <param name="test.jvmargs" value="-Djava.security.manager=org.jboss.cx.remoting.test.support.LoggingSecurityManager -Djava.security.policy=${basedir}/testing-support/src/main/resources/testing.policy"/>
+ <param name="test.jvmargs" value="-Djava.security.manager=org.jboss.remoting.test.support.LoggingSecurityManager -Djava.security.policy=${basedir}/testing-support/src/main/resources/testing.policy"/>
</antcall>
</target>
@@ -758,7 +764,7 @@
<!-- TODO: marshallers should be moved to their own module -->
<path refid="core.classpath"/>
<path refid="util.classpath"/>
- <pathelement location="${lib.jboss-marshalling-api.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
@@ -792,7 +798,8 @@
<path refid="util.classpath"/>
<path refid="testing-support.classpath"/>
<pathelement location="${lib.junit.local}"/>
- <pathelement location="${lib.jboss-marshalling-api.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
+ <pathelement location="${lib.river.local}"/>
<pathelement location="${lib.xnio-standalone.local}"/>
</classpath>
</javac>
@@ -808,6 +815,7 @@
<sysproperty key="build.home" value="${basedir}"/>
<sysproperty key="ant.library.dir" value="${ant.home}/lib"/>
<sysproperty key="lib.junit.local" value="${lib.junit.local}"/>
+ <sysproperty key="lib.marshalling-api.local" value="${lib.marshalling-api.local}"/>
<sysproperty key="lib.xnio-standalone.local" value="${lib.xnio-standalone.local}"/>
<jvmarg line="${test.jvmargs}"/>
<formatter type="plain" extension="${extension}"/>
@@ -819,7 +827,8 @@
<path refid="util.classpath"/>
<pathelement location="protocol/basic/target/test/classes"/>
<pathelement location="${lib.junit.local}"/>
- <pathelement location="${lib.jboss-marshalling-api.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
+ <pathelement location="${lib.river.local}"/>
<pathelement location="${lib.xnio-standalone.local}"/>
</classpath>
<batchtest fork="yes" todir="protocol/basic/target/test-results"
@@ -840,7 +849,7 @@
<antcall inheritall="true" inheritrefs="true" target="protocol.basic.test.pseudotarget">
<param name="extension" value="-security.txt"/>
<param name="message" value="Running with security manager"/>
- <param name="test.jvmargs" value="-Djava.security.manager=org.jboss.cx.remoting.test.support.LoggingSecurityManager -Djava.security.policy=${basedir}/testing-support/src/main/resources/testing.policy -Dsecurity.debug=policy"/>
+ <param name="test.jvmargs" value="-Djava.security.manager=org.jboss.remoting.test.support.LoggingSecurityManager -Djava.security.policy=${basedir}/testing-support/src/main/resources/testing.policy -Dsecurity.debug=policy"/>
</antcall>
</target>
@@ -848,7 +857,7 @@
<delete dir="protocol/basic/target"/>
</target>
- <target name="protocol.basic" description="Build the protocol.basic module" depends="lib.jboss-marshalling-api,lib.xnio-api,api,core,util,protocol.basic.compile">
+ <target name="protocol.basic" description="Build the protocol.basic module" depends="lib.xnio-api,api,core,util,protocol.basic.compile">
<path id="protocol.basic.classpath">
<pathelement location="protocol/basic/target/main/classes"/>
</path>
@@ -1300,7 +1309,7 @@
<!-- JAVADOCS -->
<!-- ============================================== -->
- <target name="api-javadoc" depends="api,core,standalone,util,lib.apiviz,lib.jboss-marshalling-api,lib.xnio-api">
+ <target name="api-javadoc" depends="api,core,standalone,util,lib.apiviz,lib.marshalling-api,lib.xnio-api">
<delete dir="api/target/main/docs"/>
<mkdir dir="api/target/main/docs"/>
<javadoc destdir="api/target/main/docs" author="false" version="false" use="false" windowtitle="JBoss Remoting API">
@@ -1311,14 +1320,15 @@
<doctitle><![CDATA[<h1>JBoss Remoting 3</h1>]]></doctitle>
<bottom><![CDATA[<i>Copyright © 2008 JBoss, a division of Red Hat, Inc.</i>]]></bottom>
<link href="http://java.sun.com/j2se/1.5.0/docs/api/"/>
- <link href="http://docs.jboss.org/xnio/1.0/api/"/>
+ <link href="http://docs.jboss.org/xnio/${lib.xnio.version}/api/"/>
+ <link href="http://docs.jboss.org/river/${lib.marshalling-api.version}/api/"/>
<classpath>
<path refid="core.classpath"/>
<path refid="api.classpath"/>
<path refid="standalone.classpath"/>
<path refid="util.classpath"/>
<pathelement location="${lib.xnio-api.local}"/>
- <pathelement location="${lib.jboss-marshalling-api.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
</classpath>
</javadoc>
</target>
@@ -1344,7 +1354,7 @@
<!-- fetch: These should be the second-to-last targets in the file -->
- <target name="all-fetch" description="Pre-fetch all external libraries" depends="lib.jboss-common-core,lib.jboss-common-logging-spi,lib.jboss-deployers-core-spi,lib.jboss-deployers-spi,lib.jboss-deployers-structure-spi,lib.jbossmc-kernel,lib.jboss-managed,lib.jboss-marshalling-api,lib.jbossxb,lib.servlet"/>
+ <target name="all-fetch" description="Pre-fetch all external libraries" depends="lib.jboss-common-core,lib.jboss-common-logging-spi,lib.jboss-deployers-core-spi,lib.jboss-deployers-spi,lib.jboss-deployers-structure-spi,lib.jbossmc-kernel,lib.jboss-managed,lib.jbossxb,lib.marshalling-api,lib.servlet"/>
<!-- all: These should be the last targets in the file -->
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java 2008-10-02 05:25:36 UTC (rev 4593)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java 2008-10-07 00:08:21 UTC (rev 4594)
@@ -93,24 +93,26 @@
return "client instance <" + Integer.toString(hashCode()) + ">";
}
- static final class ExternalizerImpl implements Externalizer<ClientImpl<?, ?>> {
+ static final class ExternalizerImpl implements Externalizer {
+ private static final long serialVersionUID = 814228455390899997L;
+
private final EndpointImpl endpoint;
ExternalizerImpl(final EndpointImpl endpoint) {
this.endpoint = endpoint;
}
- public void writeExternal(final ClientImpl<?, ?> client, final ObjectOutput output) throws IOException {
- output.writeObject(client.handle.getResource());
+ public void writeExternal(final Object o, final ObjectOutput output) throws IOException {
+ output.writeObject(((ClientImpl)o).handle.getResource());
}
- public ClientImpl<?, ?> createExternal(final Class<ClientImpl<?, ?>> clientClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {
+ public Object createExternal(final Class<?> aClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {
final RequestHandler handler = (RequestHandler) input.readObject();
return new ClientImpl(handler.getHandle(), endpoint.getExecutor());
}
- public void readExternal(final ClientImpl<?, ?> client, final ObjectInput input) throws IOException, ClassNotFoundException {
+ public void readExternal(final Object o, final ObjectInput input) throws IOException, ClassNotFoundException {
// no op
}
}
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandler.java 2008-10-02 05:25:36 UTC (rev 4593)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandler.java 2008-10-07 00:08:21 UTC (rev 4594)
@@ -45,6 +45,7 @@
import org.jboss.marshalling.Unmarshaller;
import org.jboss.marshalling.ByteOutput;
import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Configuration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
@@ -64,6 +65,7 @@
//--== Connection configuration items ==--
private final MarshallerFactory marshallerFactory;
+ private final Configuration configuration;
private final int linkMetric;
private final Executor executor;
private final ClassLoader classLoader;
@@ -102,6 +104,7 @@
executor = configuration.getExecutor();
classLoader = configuration.getClassLoader();
marshallerFactory = configuration.getMarshallerFactory();
+ this.configuration = new Configuration();
linkMetric = configuration.getLinkMetric();
}
@@ -148,17 +151,18 @@
}
final Object payload;
try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration);
try {
unmarshaller.start(createByteInput(buffer, true));
try {
payload = unmarshaller.readObject();
+ unmarshaller.finish();
} catch (ClassNotFoundException e) {
log.trace("Class not found in one-way request for client ID %d", Integer.valueOf(clientId));
break;
}
} finally {
- IoUtils.safeClose(unmarshaller);
+ // TODO: IoUtils.safeClose(unmarshaller);
}
} catch (IOException ex) {
log.error(ex, "Failed to unmarshal a one-way request");
@@ -182,18 +186,19 @@
final int requestId = buffer.getInt();
final Object payload;
try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration);
try {
unmarshaller.start(createByteInput(buffer, true));
try {
payload = unmarshaller.readObject();
+ unmarshaller.finish();
} catch (ClassNotFoundException e) {
log.trace("Class not found in request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
// todo - send request receive failed message
break;
}
} finally {
- IoUtils.safeClose(unmarshaller);
+ // TODO: IoUtils.safeClose(unmarshaller);
}
} catch (IOException ex) {
log.trace("Failed to unmarshal a request (%s), sending %s", ex, MessageType.REQUEST_RECEIVE_FAILED);
@@ -213,18 +218,19 @@
}
final Object payload;
try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration);
try {
unmarshaller.start(createByteInput(buffer, true));
try {
payload = unmarshaller.readObject();
+ unmarshaller.finish();
} catch (ClassNotFoundException e) {
replyHandler.handleException("Reply unmarshalling failed", e);
log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
break;
}
} finally {
- IoUtils.safeClose(unmarshaller);
+ // TODO: IoUtils.safeClose(unmarshaller);
}
} catch (IOException ex) {
log.trace("Failed to unmarshal a reply (%s), sending a ReplyException");
@@ -271,7 +277,7 @@
}
final Throwable cause;
try {
- final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration);
try {
unmarshaller.start(createByteInput(buffer, true));
try {
@@ -286,7 +292,7 @@
break;
}
} finally {
- IoUtils.safeClose(unmarshaller);
+ // TODO: IoUtils.safeClose(unmarshaller);
}
} catch (IOException ex) {
log.trace("Failed to unmarshal an exception reply (%s), sending a generic execution exception");
@@ -436,7 +442,7 @@
buffer.put((byte) MessageType.REPLY.getId());
buffer.putInt(requestId);
try {
- final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller();
+ final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
try {
final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
final ByteOutput output = createByteOutput(allocator, bufferList);
@@ -450,7 +456,7 @@
IoUtils.safeClose(output);
}
} finally {
- IoUtils.safeClose(marshaller);
+ // TODO: IoUtils.safeClose(marshaller);
}
} catch (IOException e) {
log.error(e, "Failed to send a reply to the remote side");
@@ -465,7 +471,7 @@
buffer.put((byte) MessageType.REQUEST_FAILED.getId());
buffer.putInt(requestId);
try {
- final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller();
+ final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
try {
final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
final ByteOutput output = createByteOutput(allocator, bufferList);
@@ -479,7 +485,7 @@
IoUtils.safeClose(output);
}
} finally {
- IoUtils.safeClose(marshaller);
+ // TODO: IoUtils.safeClose(marshaller);
}
} catch (IOException e) {
log.error(e, "Failed to send an exception to the remote side");
@@ -515,41 +521,6 @@
}
}
- private int writeUTFZ(ByteBuffer buffer, CharSequence s) {
- final int len = s.length();
- for (int i = 0; i < len; i++) {
- char c = s.charAt(i);
- if (1 <= c && c < 0x80) {
- if (buffer.hasRemaining()) {
- buffer.put((byte) c);
- } else {
- return i;
- }
- } else if (c < 0x0800) {
- if (buffer.remaining() >= 2) {
- buffer.put((byte) (0xc0 | (c >> 6)));
- buffer.put((byte) (0x80 | (c & 0x3f)));
- } else {
- return i;
- }
- } else {
- if (buffer.remaining() >= 3) {
- buffer.put((byte) (0xe0 | (c >> 12)));
- buffer.put((byte) (0x80 | ((c >> 6) & 0x3f)));
- buffer.put((byte) (0x80 | (c & 0x3f)));
- } else {
- return i;
- }
- }
- }
- if (buffer.hasRemaining()) {
- buffer.put((byte) 0);
- return -1;
- } else {
- return len;
- }
- }
-
// Reader utils
private String readUTFZ(ByteBuffer buffer) {
@@ -641,7 +612,7 @@
log.trace("Sending outbound one-way request of type %s", request == null ? "null" : request.getClass());
try {
final List<ByteBuffer> bufferList;
- final Marshaller marshaller = marshallerFactory.createMarshaller();
+ final Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
try {
bufferList = new ArrayList<ByteBuffer>();
final ByteOutput output = createByteOutput(allocator, bufferList);
@@ -655,7 +626,7 @@
IoUtils.safeClose(output);
}
} finally {
- IoUtils.safeClose(marshaller);
+ // TODO: IoUtils.safeClose(marshaller);
}
try {
registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
@@ -675,7 +646,7 @@
log.trace("Sending outbound request of type %s", request == null ? "null" : request.getClass());
try {
final List<ByteBuffer> bufferList;
- final Marshaller marshaller = marshallerFactory.createMarshaller();
+ final Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
try {
bufferList = new ArrayList<ByteBuffer>();
final ByteOutput output = createByteOutput(allocator, bufferList);
@@ -708,7 +679,7 @@
IoUtils.safeClose(output);
}
} finally {
- IoUtils.safeClose(marshaller);
+ // TODO: IoUtils.safeClose(marshaller);
}
} catch (final IOException t) {
log.trace(t, "receiveRequest failed with an exception");
Modified: remoting3/trunk/testing-support/src/main/resources/testing.policy
===================================================================
--- remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-10-02 05:25:36 UTC (rev 4593)
+++ remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-10-07 00:08:21 UTC (rev 4594)
@@ -70,3 +70,8 @@
{
permission java.security.AllPermission;
};
+
+grant codeBase "file:${lib.marshalling-api.local}"
+{
+ permission java.security.AllPermission;
+};
16 years, 2 months
JBoss Remoting SVN: r4593 - remoting2/tags.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-10-02 01:25:36 -0400 (Thu, 02 Oct 2008)
New Revision: 4593
Added:
remoting2/tags/2.2.2-SP10/
Log:
Copied: remoting2/tags/2.2.2-SP10 (from rev 4592, remoting2/branches/2.2)
16 years, 3 months
JBoss Remoting SVN: r4592 - in remoting2/branches/2.2/src/etc/lib: remoting_2_2_2_SP9 and 1 other directory.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-10-02 01:19:12 -0400 (Thu, 02 Oct 2008)
New Revision: 4592
Added:
remoting2/branches/2.2/src/etc/lib/remoting_2_2_2_SP9/
remoting2/branches/2.2/src/etc/lib/remoting_2_2_2_SP9/jboss-remoting.jar
Log:
JBREM-1038: Added 2.2.2.SP9 jboss-remoting.jar for versioning tests.
Added: remoting2/branches/2.2/src/etc/lib/remoting_2_2_2_SP9/jboss-remoting.jar
===================================================================
(Binary files differ)
Property changes on: remoting2/branches/2.2/src/etc/lib/remoting_2_2_2_SP9/jboss-remoting.jar
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
16 years, 3 months