JBoss Remoting SVN: r4383 - remoting2/branches/2.2/src/main/org/jboss/remoting.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-07-18 01:22:21 -0400 (Fri, 18 Jul 2008)
New Revision: 4383
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/Version.java
Log:
JBREM-1011: Updated version to 2.2.2.SP9.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/Version.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/Version.java 2008-07-18 05:21:38 UTC (rev 4382)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/Version.java 2008-07-18 05:22:21 UTC (rev 4383)
@@ -32,7 +32,7 @@
public static final byte VERSION_2 = 2;
public static final byte VERSION_2_2 = 22;
- public static final String VERSION = "2.2.2.SP8";
+ public static final String VERSION = "2.2.2.SP9";
private static final byte byteVersion = VERSION_2_2;
private static byte defaultByteVersion = byteVersion;
private static boolean performVersioning = true;
16 years, 5 months
JBoss Remoting SVN: r4382 - remoting2/branches/2.2/src/main/org/jboss/remoting.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2008-07-18 01:21:38 -0400 (Fri, 18 Jul 2008)
New Revision: 4382
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/ServerInvoker.java
Log:
JBREM-1010: Enable injection of ConnectionListener.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/ServerInvoker.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/ServerInvoker.java 2008-07-18 01:48:26 UTC (rev 4381)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/ServerInvoker.java 2008-07-18 05:21:38 UTC (rev 4382)
@@ -54,8 +54,6 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -213,6 +211,8 @@
public static final String REGISTER_CALLBACK_LISTENER = "registerCallbackListener";
public static final String INVOKER_SESSION_ID = "invokerSessionId";
+
+ public static final String CONNECTION_LISTENER = "connectionListener";
// Static ---------------------------------------------------------------------------------------
@@ -340,7 +340,64 @@
throw new IllegalArgumentException("Can not add null ConnectionListener.");
}
}
+
+ public void setConnectionListener(Object listener)
+ {
+ if (listener == null)
+ {
+ log.error("ConnectionListener is null");
+ return;
+ }
+
+ if (listener instanceof ConnectionListener)
+ {
+ addConnectionListener((ConnectionListener) listener);
+ return;
+ }
+ if (!(listener instanceof String))
+ {
+ log.error("Object supplied as ConnectionListener is neither String nor ConnectionListener");
+ return;
+ }
+
+ ConnectionListener connectionListener = null;
+ try
+ {
+ MBeanServer server = getMBeanServer();
+ ObjectName objName = new ObjectName((String) listener);
+ Class c = ConnectionListener.class;
+ Object o = MBeanServerInvocationHandler.newProxyInstance(server, objName, c, false);
+ connectionListener = (ConnectionListener) o;
+ }
+ catch (MalformedObjectNameException e)
+ {
+ log.debug("Object supplied as ConnectionListener is not an object name.");
+ }
+
+ if (connectionListener == null)
+ {
+ try
+ {
+ Class listenerClass = ClassLoaderUtility.loadClass((String) listener, ServerInvoker.class);
+ connectionListener = (ConnectionListener) listenerClass.newInstance();
+ }
+ catch (Exception e)
+ {
+ log.error("Unable to instantiate " + listener + ": " + e.getMessage());
+ return;
+ }
+ }
+
+ if (connectionListener == null)
+ {
+ log.error("Unable to create ConnectionListener from " + listener);
+ return;
+ }
+
+ addConnectionListener(connectionListener);
+ }
+
public void removeConnectionListener(ConnectionListener listener)
{
if(connectionNotifier != null)
@@ -1120,6 +1177,13 @@
}
}
+ // Inject ConnectionListener
+ String connectionListener = (String)config.get(CONNECTION_LISTENER);
+ if (connectionListener != null)
+ {
+ setConnectionListener(connectionListener);
+ }
+
String registerCallbackListenersString = (String)config.get(REGISTER_CALLBACK_LISTENER);
if(registerCallbackListenersString != null)
{
16 years, 5 months
JBoss Remoting SVN: r4381 - remoting3/trunk/testing-support/src/main/java/org/jboss/cx/remoting/test/support.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-17 21:48:26 -0400 (Thu, 17 Jul 2008)
New Revision: 4381
Modified:
remoting3/trunk/testing-support/src/main/java/org/jboss/cx/remoting/test/support/LoggingHelper.java
Log:
Print relative times in the test log
Modified: remoting3/trunk/testing-support/src/main/java/org/jboss/cx/remoting/test/support/LoggingHelper.java
===================================================================
--- remoting3/trunk/testing-support/src/main/java/org/jboss/cx/remoting/test/support/LoggingHelper.java 2008-07-17 22:49:31 UTC (rev 4380)
+++ remoting3/trunk/testing-support/src/main/java/org/jboss/cx/remoting/test/support/LoggingHelper.java 2008-07-18 01:48:26 UTC (rev 4381)
@@ -36,6 +36,7 @@
*/
public final class LoggingHelper {
private static final class Once {
+ private static final long startTime = System.currentTimeMillis();
static {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
@@ -47,6 +48,12 @@
handler.setFormatter(new Formatter() {
public String format(final LogRecord record) {
StringBuilder builder = new StringBuilder();
+ long offs = record.getMillis() - startTime;
+ final String sign = offs < 0 ? "-" : "+";
+ offs = Math.abs(offs);
+ int ms = (int) (offs % 1000L);
+ long s = offs / 1000L;
+ builder.append(String.format("%s%04d.%03d ", sign, Long.valueOf(s), Long.valueOf(ms)));
builder.append(record.getLevel().toString());
builder.append(" [").append(record.getLoggerName()).append("] ");
builder.append(String.format(record.getMessage(), record.getParameters()));
16 years, 5 months
JBoss Remoting SVN: r4380 - in remoting3/trunk/api/src: test/java/org/jboss/cx/remoting/spi and 1 other directory.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-17 18:49:31 -0400 (Thu, 17 Jul 2008)
New Revision: 4380
Added:
remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
Log:
Add test case for closables
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java 2008-07-17 16:21:03 UTC (rev 4379)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java 2008-07-17 22:49:31 UTC (rev 4380)
@@ -55,8 +55,10 @@
protected void dec() throws RemotingException {
final int v = refcount.decrementAndGet();
+ log.trace("Clearing reference to %s to %d", this, Integer.valueOf(v));
if (v == 0) {
// we dropped the refcount to zero
+ log.trace("Refcount of %s dropped to zero, closing", this);
if (refcount.compareAndSet(0, -65536)) {
// we are closing
close();
@@ -71,6 +73,7 @@
protected void inc() throws RemotingException {
final int v = refcount.getAndIncrement();
+ log.trace("Adding reference to %s to %d", this, Integer.valueOf(v + 1));
if (v < 0) {
// was already closed
refcount.decrementAndGet();
@@ -95,6 +98,10 @@
inc();
}
+ public void close() throws RemotingException {
+ dec();
+ }
+
@SuppressWarnings({ "unchecked" })
public T getResource() {
return (T) AbstractAutoCloseable.this;
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java 2008-07-17 16:21:03 UTC (rev 4379)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java 2008-07-17 22:49:31 UTC (rev 4380)
@@ -58,6 +58,7 @@
public void close() throws RemotingException {
if (! closed.getAndSet(true)) {
+ log.trace("Closed %s", this);
synchronized (closeLock) {
if (closeHandlers != null) {
for (final CloseHandler<? super T> handler : closeHandlers) {
Added: remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java
===================================================================
--- remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java (rev 0)
+++ remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java 2008-07-17 22:49:31 UTC (rev 4380)
@@ -0,0 +1,179 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi;
+
+import junit.framework.TestCase;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jboss.xnio.IoUtils;
+import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.test.support.LoggingHelper;
+import org.jboss.cx.remoting.spi.remote.Handle;
+
+/**
+ *
+ */
+public final class CloseableTestCase extends TestCase {
+ static {
+ LoggingHelper.init();
+ }
+
+ public void testBasic() throws Throwable {
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ final AtomicBoolean closed = new AtomicBoolean();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AbstractCloseable<Object> closeable = new AbstractCloseable<Object>(executorService) {
+ // empty
+ };
+ try {
+ closeable.addCloseHandler(new CloseHandler<Object>() {
+ public void handleClose(final Object x) {
+ closed.set(true);
+ latch.countDown();
+ }
+ });
+ assertTrue(closeable.isOpen());
+ assertFalse(closed.get());
+ closeable.close();
+ assertTrue(latch.await(500L, TimeUnit.MILLISECONDS));
+ assertFalse(closeable.isOpen());
+ assertTrue(closed.get());
+ } finally {
+ IoUtils.safeClose(closeable);
+ }
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ public void testAutoClose() throws Throwable {
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ final AtomicBoolean closed = new AtomicBoolean();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AbstractAutoCloseable<Object> closeable = new AbstractAutoCloseable<Object>(executorService) {
+ // empty
+ };
+ try {
+ closeable.addCloseHandler(new CloseHandler<Object>() {
+ public void handleClose(final Object x) {
+ closed.set(true);
+ latch.countDown();
+ }
+ });
+ assertTrue(closeable.isOpen());
+ assertFalse(closed.get());
+ closeable.autoClose();
+ assertTrue(latch.await(500L, TimeUnit.MILLISECONDS));
+ assertFalse(closeable.isOpen());
+ assertTrue(closed.get());
+ } finally {
+ IoUtils.safeClose(closeable);
+ }
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ public void testAutoCloseWithOneRef() throws Throwable {
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ final AtomicBoolean closed = new AtomicBoolean();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AbstractAutoCloseable<Object> closeable = new AbstractAutoCloseable<Object>(executorService) {
+ // empty
+ };
+ try {
+ closeable.addCloseHandler(new CloseHandler<Object>() {
+ public void handleClose(final Object x) {
+ closed.set(true);
+ latch.countDown();
+ }
+ });
+ assertTrue(closeable.isOpen());
+ assertFalse(closed.get());
+ final Handle<Object> h1 = closeable.getHandle();
+ assertTrue(closeable.isOpen());
+ assertFalse(closed.get());
+ closeable.autoClose();
+ assertTrue(closeable.isOpen());
+ assertFalse(closed.get());
+ h1.close();
+ assertTrue(latch.await(500L, TimeUnit.MILLISECONDS));
+ assertFalse(closeable.isOpen());
+ assertTrue(closed.get());
+ } finally {
+ IoUtils.safeClose(closeable);
+ }
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ public void testAutoCloseWithThreeRefs() throws Throwable {
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ final AtomicBoolean closed = new AtomicBoolean();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AbstractAutoCloseable<Object> closeable = new AbstractAutoCloseable<Object>(executorService) {
+ // empty
+ };
+ try {
+ closeable.addCloseHandler(new CloseHandler<Object>() {
+ public void handleClose(final Object x) {
+ closed.set(true);
+ latch.countDown();
+ }
+ });
+ assertTrue(closeable.isOpen());
+ assertFalse(closed.get());
+ final Handle<Object> h1 = closeable.getHandle();
+ final Handle<Object> h2 = closeable.getHandle();
+ final Handle<Object> h3 = closeable.getHandle();
+ assertTrue(closeable.isOpen());
+ assertFalse(closed.get());
+ closeable.autoClose();
+ assertTrue(closeable.isOpen());
+ assertFalse(closed.get());
+ h1.close();
+ assertTrue(closeable.isOpen());
+ assertFalse(closed.get());
+ h2.close();
+ assertTrue(closeable.isOpen());
+ assertFalse(closed.get());
+ h3.close();
+ assertTrue(latch.await(500L, TimeUnit.MILLISECONDS));
+ assertFalse(closeable.isOpen());
+ assertTrue(closed.get());
+ } finally {
+ IoUtils.safeClose(closeable);
+ }
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+}
16 years, 5 months
JBoss Remoting SVN: r4379 - in remoting3/trunk: protocol and 18 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-17 12:21:03 -0400 (Thu, 17 Jul 2008)
New Revision: 4379
Added:
remoting3/trunk/protocol/
remoting3/trunk/protocol/basic/
remoting3/trunk/protocol/basic/src/
remoting3/trunk/protocol/basic/src/main/
remoting3/trunk/protocol/basic/src/main/java/
remoting3/trunk/protocol/basic/src/main/java/org/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/SimpleWriteHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/WriteHandler.java
remoting3/trunk/protocol/basic/src/test/
remoting3/trunk/protocol/basic/src/test/java/
remoting3/trunk/protocol/basic/src/test/java/org/
remoting3/trunk/protocol/basic/src/test/java/org/jboss/
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
Log:
Basic protocol support. Needs more work though
Property changes on: remoting3/trunk/protocol/basic
___________________________________________________________________
Name: svn:ignore
+ target
Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-17 16:21:03 UTC (rev 4379)
@@ -0,0 +1,618 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import static org.jboss.xnio.Buffers.*;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.ReplyHandler;
+import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
+import org.jboss.cx.remoting.spi.marshal.Marshaller;
+import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.marshal.IdentityResolver;
+import org.jboss.cx.remoting.spi.SpiUtils;
+import org.jboss.cx.remoting.spi.AbstractAutoCloseable;
+import static org.jboss.cx.remoting.util.CollectionUtil.concurrentMap;
+import org.jboss.cx.remoting.util.CollectionUtil;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_ONEWAY;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.REPLY;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.CLIENT_CLOSE;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.CLIENT_OPEN;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_CLOSE;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_FAILED;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.CANCEL_ACK;
+import org.jboss.cx.remoting.RemotingException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.ArrayList;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class BasicHandler implements IoHandler<AllocatedMessageChannel> {
+
+ private static final Logger log = Logger.getLogger(BasicHandler.class);
+
+ // clients whose requests get forwarded to the remote side
+ private final ConcurrentMap<Integer, RemoteClientEndpoint<?, ?>> remoteClients = concurrentMap();
+ // running on remote node
+ private final ConcurrentMap<Integer, ReplyHandler<?>> outstandingRequests = concurrentMap();
+ // forwarded to remote side
+ private final ConcurrentMap<Integer, Handle<RemoteClientEndpoint<?, ?>>> forwardedClients = concurrentMap();
+ // forwarded to remote side
+ private final ConcurrentMap<Integer, Handle<RemoteServiceEndpoint<?, ?>>> forwardedServices = concurrentMap();
+
+ private final boolean server;
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ private final AtomicBoolean isnew = new AtomicBoolean(true);
+ private volatile AllocatedMessageChannel channel;
+ private final Executor executor;
+ private final MarshallerFactory<ByteBuffer> marshallerFactory;
+ private final ObjectResolver resolver;
+ private final ClassLoader classLoader;
+
+ @SuppressWarnings({ "unchecked" })
+ public <I, O> BasicHandler(final boolean server, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpoint<I, O> root, final Executor executor, final RemoteClientEndpointListener remoteListener, final MarshallerFactory<ByteBuffer> marshallerFactory) throws RemotingException {
+ this.server = server;
+ this.allocator = allocator;
+ this.executor = executor;
+ forwardedClients.put(Integer.valueOf(0), ((RemoteClientEndpoint)root).getHandle());
+ final RemoteClientEndpointImpl<Object, Object> endpoint = new RemoteClientEndpointImpl<Object, Object>(0, marshallerFactory, allocator);
+ remoteClients.put(Integer.valueOf(0), endpoint);
+ if (remoteListener != null) {
+ remoteListener.notifyCreated(endpoint);
+ }
+ this.marshallerFactory = marshallerFactory;
+ // todo
+ resolver = IdentityResolver.getInstance();
+ classLoader = getClass().getClassLoader();
+ }
+
+ /**
+ * Sequence number of requests originating locally.
+ */
+ private final AtomicInteger localRequestIdSeq = new AtomicInteger();
+ /**
+ * Sequence number of local clients forwarded to the remote side.
+ */
+ private final AtomicInteger localClientIdSeq = new AtomicInteger(1);
+ /**
+ * Sequence number of remote clients opened locally from services from the remote side.
+ */
+ private final AtomicInteger remoteClientIdSeq = new AtomicInteger(1);
+ /**
+ * Sequence number of services forwarded to the remote side.
+ */
+ private final AtomicInteger localServiceIdSeq = new AtomicInteger();
+
+ public void handleOpened(final AllocatedMessageChannel channel) {
+ if (isnew.getAndSet(false)) {
+ this.channel = channel;
+ }
+ channel.resumeReads();
+ }
+
+ public void handleReadable(final AllocatedMessageChannel channel) {
+ for (;;) try {
+ final ByteBuffer buffer = channel.receive();
+ if (buffer == null) {
+ // todo release all handles...
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (! buffer.hasRemaining()) {
+ // would block
+ channel.resumeReads();
+ return;
+ }
+ int msgType = buffer.get() & 0xff;
+ log.trace("Received message %s, type %d", buffer, Integer.valueOf(msgType));
+ switch (msgType) {
+ case REQUEST_ONEWAY: {
+ final int clientId = buffer.getInt();
+ final Handle<RemoteClientEndpoint<?, ?>> handle = getForwardedClient(clientId);
+ if (handle == null) {
+ log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
+ return;
+ }
+ final Unmarshaller<ByteBuffer> unmarshaller = marshallerFactory.createUnmarshaller(resolver, classLoader);
+ if (! unmarshaller.unmarshal(buffer)) {
+ log.trace("Incomplete one-way request for client ID %d", Integer.valueOf(clientId));
+ break;
+ }
+ final Object payload;
+ try {
+ payload = unmarshaller.get();
+ } catch (ClassNotFoundException e) {
+ log.trace("Class not found in one-way request for client ID %d", Integer.valueOf(clientId));
+ break;
+ }
+ final RemoteClientEndpoint<?, ?> clientEndpoint = handle.getResource();
+ receiveRequest(clientEndpoint, payload);
+ break;
+ }
+ case REQUEST: {
+ final int clientId = buffer.getInt();
+ final Handle<RemoteClientEndpoint<?, ?>> handle = getForwardedClient(clientId);
+ if (handle == null) {
+ log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
+ break;
+ }
+ final int requestId = buffer.getInt();
+ final Unmarshaller<ByteBuffer> unmarshaller = marshallerFactory.createUnmarshaller(resolver, classLoader);
+ if (! unmarshaller.unmarshal(buffer)) {
+ log.trace("Incomplete request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
+ new ReplyHandlerImpl(channel, requestId, allocator).handleException("Incomplete request", null);
+ break;
+ }
+ final Object payload;
+ try {
+ payload = unmarshaller.get();
+ } catch (ClassNotFoundException e) {
+ log.trace("Class not found in request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
+ break;
+ }
+ final RemoteClientEndpoint<?, ?> clientEndpoint = handle.getResource();
+ receiveRequest(clientEndpoint, new ReplyHandlerImpl(channel, requestId, allocator), payload);
+ break;
+ }
+ case REPLY: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler<?> replyHandler = takeOutstandingReqeust(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+ break;
+ }
+ final Unmarshaller<ByteBuffer> unmarshaller = marshallerFactory.createUnmarshaller(resolver, classLoader);
+ if (! unmarshaller.unmarshal(buffer)) {
+ replyHandler.handleException("Incomplete reply", null);
+ log.trace("Incomplete reply to request ID %d", Integer.valueOf(requestId));
+ break;
+ }
+ final Object payload;
+ try {
+ payload = unmarshaller.get();
+ } catch (ClassNotFoundException e) {
+ replyHandler.handleException("Reply unmarshalling failed", e);
+ log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
+ break;
+ }
+ handleReply(replyHandler, payload);
+ break;
+ }
+ case REQUEST_FAILED: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler<?> replyHandler = takeOutstandingReqeust(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+ break;
+ }
+ final Unmarshaller<ByteBuffer> unmarshaller = marshallerFactory.createUnmarshaller(resolver, classLoader);
+ if (! unmarshaller.unmarshal(buffer)) {
+ replyHandler.handleException("Incomplete exception reply", null);
+ log.trace("Incomplete exception reply to request ID %d", Integer.valueOf(requestId));
+ break;
+ }
+ final Object message;
+ try {
+ message = unmarshaller.get();
+ } catch (ClassNotFoundException e) {
+ replyHandler.handleException("Exception reply unmarshalling failed", e);
+ log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
+ break;
+ }
+ final Object cause;
+ try {
+ cause = unmarshaller.get();
+ } catch (ClassNotFoundException e) {
+ replyHandler.handleException("Exception reply unmarshalling failed", e);
+ log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
+ break;
+ }
+ handleException(replyHandler, message, cause);
+ break;
+ }
+ case CLIENT_CLOSE: {
+ break;
+ }
+ case CLIENT_OPEN: {
+ final int serviceId = buffer.getInt();
+ final int clientId = buffer.getInt();
+ final Handle<RemoteServiceEndpoint<?, ?>> handle = getForwardedService(serviceId);
+ if (handle == null) {
+ // todo log invalid request
+ break;
+ }
+ final RemoteServiceEndpoint<?, ?> serviceEndpoint = handle.getResource();
+ final RemoteClientEndpoint<?, ?> clientEndpoint = serviceEndpoint.createClientEndpoint();
+
+ break;
+ }
+ case SERVICE_CLOSE: {
+ break;
+ }
+ default: {
+ log.trace("Received invalid message type %d", Integer.valueOf(msgType));
+ }
+ }
+ } catch (IOException e) {
+ log.error(e, "I/O error in protocol channel");
+ IoUtils.safeClose(channel);
+ return;
+ } catch (BufferUnderflowException e) {
+ log.error(e, "Malformed packet");
+ } catch (Throwable t) {
+ log.error(t, "Handler failed");
+ }
+ }
+
+ public void handleWritable(final AllocatedMessageChannel channel) {
+ for (;;) {
+ final WriteHandler handler = outputQueue.peek();
+ if (handler == null) {
+ return;
+ }
+ try {
+ if (handler.handleWrite(channel)) {
+ log.trace("Handled write with handler %s", handler);
+ pending.decrementAndGet();
+ outputQueue.remove();
+ } else {
+ channel.resumeWrites();
+ return;
+ }
+ } catch (Throwable t) {
+ pending.decrementAndGet();
+ outputQueue.remove();
+ }
+ }
+ }
+
+ public void handleClosed(final AllocatedMessageChannel channel) {
+ }
+
+ private <I, O> ReplyHandler<O> createReplyHandler(final AllocatedMessageChannel channel, final int requestId) {
+ return new ReplyHandlerImpl<O>(channel, requestId, allocator);
+ }
+
+ RemoteClientEndpoint<?, ?> getRemoteClient(final int i) {
+ return remoteClients.get(Integer.valueOf(i));
+ }
+
+ private final class ReplyHandlerImpl<O> implements ReplyHandler<O> {
+
+ private final AllocatedMessageChannel channel;
+ private final int requestId;
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ private ReplyHandlerImpl(final AllocatedMessageChannel channel, final int requestId, final BufferAllocator<ByteBuffer> allocator) {
+ if (channel == null) {
+ throw new NullPointerException("channel is null");
+ }
+ if (allocator == null) {
+ throw new NullPointerException("allocator is null");
+ }
+ this.channel = channel;
+ this.requestId = requestId;
+ this.allocator = allocator;
+ }
+
+ public void handleReply(final O reply) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) REPLY);
+ buffer.putInt(requestId);
+ try {
+ final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(resolver);
+ marshaller.start(reply);
+ final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+ while (! marshaller.marshal(buffer)) {
+ bufferList.add(flip(buffer));
+ buffer = allocator.allocate();
+ }
+ bufferList.add(flip(buffer));
+ registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+ } catch (IOException e) {
+ // todo log
+ } catch (InterruptedException e) {
+ // todo log
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void handleException(final String msg, final Throwable cause) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) REQUEST_FAILED);
+ buffer.putInt(requestId);
+ try {
+ final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(resolver);
+ final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+ marshaller.start(msg);
+ while (! marshaller.marshal(buffer)) {
+ bufferList.add(flip(buffer));
+ buffer = allocator.allocate();
+ }
+ marshaller.start(cause);
+ while (! marshaller.marshal(buffer)) {
+ bufferList.add(flip(buffer));
+ buffer = allocator.allocate();
+ }
+ bufferList.add(flip(buffer));
+ registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+ } catch (IOException e) {
+ // todo log
+ } catch (InterruptedException e) {
+ // todo log
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void handleCancellation() {
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) CANCEL_ACK);
+ buffer.putInt(requestId);
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ // todo log
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ // Session mgmt
+
+ public int openRequest(ReplyHandler<?> handler) {
+ int id;
+ do {
+ id = localRequestIdSeq.getAndIncrement();
+ } while (outstandingRequests.putIfAbsent(Integer.valueOf(id), handler) != null);
+ return id;
+ }
+
+ public int openClientFromService() {
+ int id;
+ do {
+ id = remoteClientIdSeq.getAndIncrement() << 1 | (server ? 1 : 0);
+ } while (remoteClients.putIfAbsent(Integer.valueOf(id), new RemoteClientEndpointImpl<Object, Object>(id, null, allocator)) != null);
+ return id;
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ public void openClientForForwardedService(int id, RemoteClientEndpoint<?, ?> clientEndpoint) {
+ try {
+ forwardedClients.put(Integer.valueOf(id), ((RemoteClientEndpoint)clientEndpoint).getHandle());
+ } catch (RemotingException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public Handle<RemoteClientEndpoint<?, ?>> getForwardedClient(int id) {
+ return forwardedClients.get(Integer.valueOf(id));
+ }
+
+ public ReplyHandler<?> takeOutstandingReqeust(int id) {
+ return outstandingRequests.remove(Integer.valueOf(id));
+ }
+
+ public Handle<RemoteServiceEndpoint<?, ?>> getForwardedService(final int id) {
+ return forwardedServices.get(Integer.valueOf(id));
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private static <I, O> void receiveRequest(RemoteClientEndpoint<I, O> clientEndpoint, Object request) {
+ clientEndpoint.receiveRequest((I) request);
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private static <I, O> RemoteRequestContext receiveRequest(RemoteClientEndpoint<I, O> clientEndpoint, ReplyHandler<O> replyHandler, Object request) {
+ return clientEndpoint.receiveRequest((I) request, replyHandler);
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private static <O> void handleReply(final ReplyHandler<O> replyHandler, final Object reply) {
+ SpiUtils.safeHandleReply(replyHandler, (O) reply);
+ }
+
+ private static void handleException(final ReplyHandler<?> handler, final Object message, final Object cause) {
+ SpiUtils.safeHandleException(handler, message == null ? null : message.toString(), cause instanceof Throwable ? (Throwable) cause : null);
+ }
+
+ // Writer members
+
+ private final BlockingQueue<WriteHandler> outputQueue = CollectionUtil.blockingQueue(64);
+ private final AtomicInteger pending = new AtomicInteger();
+
+ private void registerWriter(final AllocatedMessageChannel channel, final WriteHandler writeHandler) throws InterruptedException {
+ outputQueue.put(writeHandler);
+ if (pending.getAndIncrement() == 0) {
+ channel.resumeWrites();
+ }
+ }
+
+ // client endpoint
+
+ private final class RemoteClientEndpointImpl<I, O> extends AbstractAutoCloseable<RemoteClientEndpoint<I, O>> implements RemoteClientEndpoint<I, O> {
+
+ private final int identifier;
+ private final MarshallerFactory<ByteBuffer> marshallerFactory;
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ public RemoteClientEndpointImpl(final int identifier, final MarshallerFactory<ByteBuffer> marshallerFactory, final BufferAllocator<ByteBuffer> allocator) {
+ super(executor);
+ if (marshallerFactory == null) {
+ throw new NullPointerException("marshallerFactory is null");
+ }
+ if (allocator == null) {
+ throw new NullPointerException("allocator is null");
+ }
+ this.identifier = identifier;
+ this.marshallerFactory = marshallerFactory;
+ this.allocator = allocator;
+ }
+
+ public void receiveRequest(final I request) {
+ log.trace("Received one-way request of type %s", request == null ? "null" : request.getClass());
+ try {
+ final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(null);
+ final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.REQUEST_ONEWAY);
+ buffer.putInt(identifier);
+ marshaller.start(request);
+ while (! marshaller.marshal(buffer)) {
+ bufferList.add(flip(buffer));
+ buffer = allocator.allocate();
+ }
+ bufferList.add(flip(buffer));
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+ } catch (InterruptedException e) {
+ log.trace(e, "receiveRequest was interrupted");
+ Thread.currentThread().interrupt();
+ return;
+ }
+ } catch (Throwable t) {
+ // ignore
+ log.trace(t, "receiveRequest failed with an exception");
+ return;
+ }
+ }
+
+ public RemoteRequestContext receiveRequest(final I request, final ReplyHandler<O> handler) {
+ log.trace("Received request of type %s", request == null ? "null" : request.getClass());
+ try {
+ final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(null);
+ final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.REQUEST);
+ buffer.putInt(identifier);
+ final int id = openRequest(handler);
+ buffer.putInt(id);
+ marshaller.start(request);
+ while (! marshaller.marshal(buffer)) {
+ bufferList.add(flip(buffer));
+ buffer = allocator.allocate();
+ }
+ bufferList.add(flip(buffer));
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ SpiUtils.safeHandleCancellation(handler);
+ return SpiUtils.getBlankRemoteRequestContext();
+ }
+ log.trace("Sent request %s", request);
+ return new RemoteRequestContextImpl(id, allocator, channel);
+ } catch (Throwable t) {
+ log.trace(t, "receiveRequest failed with an exception");
+ SpiUtils.safeHandleException(handler, "Failed to build request", t);
+ return SpiUtils.getBlankRemoteRequestContext();
+ }
+ }
+ }
+
+ public final class RemoteRequestContextImpl implements RemoteRequestContext {
+
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final int id;
+ private final AllocatedMessageChannel channel;
+
+ public RemoteRequestContextImpl(final int id, final BufferAllocator<ByteBuffer> allocator, final AllocatedMessageChannel channel) {
+ this.id = id;
+ this.allocator = allocator;
+ this.channel = channel;
+ }
+
+ public void cancel(final boolean mayInterrupt) {
+ try {
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CANCEL_REQUEST);
+ buffer.putInt(id);
+ buffer.put((byte) (mayInterrupt ? 1 : 0));
+ buffer.flip();
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ // todo log that cancel attempt failed
+ Thread.currentThread().interrupt();
+ } catch (Throwable t) {
+ // todo log that cancel attempt failed
+ }
+ }
+ }
+
+ public final class RemoteServiceEndpointImpl<I, O> extends AbstractAutoCloseable<RemoteServiceEndpoint<I, O>> implements RemoteServiceEndpoint<I, O> {
+
+ private final MarshallerFactory<ByteBuffer> marshallerFactory;
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final int identifier;
+
+ protected RemoteServiceEndpointImpl(final MarshallerFactory<ByteBuffer> marshallerFactory, final BufferAllocator<ByteBuffer> allocator, final int identifier) {
+ super(executor);
+ this.marshallerFactory = marshallerFactory;
+ this.allocator = allocator;
+ this.identifier = identifier;
+ }
+
+ public RemoteClientEndpoint<I, O> createClientEndpoint() throws RemotingException {
+ final int id = openClientFromService();
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.putInt(identifier);
+ buffer.putInt(openClientFromService());
+ buffer.flip();
+ boolean intr = false;
+ for (;;) {
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ try {
+ return new RemoteClientEndpointImpl<I,O>(id, marshallerFactory, allocator);
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ }
+ }
+ }
+}
Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-07-17 16:21:03 UTC (rev 4379)
@@ -0,0 +1,106 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.core.marshal.JBossSerializationMarshallerFactory;
+import org.jboss.cx.remoting.core.marshal.JavaSerializationMarshallerFactory;
+import org.jboss.xnio.IoHandlerFactory;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.AbstractConvertingIoFuture;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public final class BasicProtocol {
+
+ private static final Logger log = Logger.getLogger(BasicProtocol.class);
+
+ private BasicProtocol() {
+ }
+
+ /**
+ * Create a request server for the basic protocol.
+ *
+ * @param executor the executor to use for invocations
+ * @param localRootSource the service to draw client endpoints from for root clients on inbound connections
+ * @param allocator the buffer allocator to use
+ * @param remoteListener a listener which receives notification of the remote root client of the incoming connection
+ * @return a handler factory for passing to an XNIO server
+ */
+ public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Executor executor, final RemoteServiceEndpoint<?, ?> localRootSource, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpointListener remoteListener) {
+ return new IoHandlerFactory<AllocatedMessageChannel>() {
+ public IoHandler<? super AllocatedMessageChannel> createHandler() {
+ try {
+ final RemoteClientEndpoint<?, ?> remoteClientEndpoint = localRootSource.createClientEndpoint();
+ try {
+ return new BasicHandler(true, allocator, remoteClientEndpoint, executor, remoteListener, new JavaSerializationMarshallerFactory(executor));
+ } finally {
+ try {
+ remoteClientEndpoint.autoClose();
+ } catch (RemotingException e) {
+ log.error(e, "Error setting auto-close mode");
+ }
+ }
+ } catch (RemotingException e) {
+ throw new IllegalStateException("The local root endpoint is unusable", e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Create a request client for the basic protocol.
+ *
+ * @param <I> the request type of the new remote root service endpoint
+ * @param <O> the reply type of the new remote root service endpoint
+ * @param executor the executor to use for invocations
+ * @param localRoot the client endpoint to use as the local root client
+ * @param channelSource the XNIO channel source to use to establish the connection
+ * @param allocator the buffer allocator to use
+ * @return the future client endpoint of the remote side's root client
+ * @throws IOException if an error occurs
+ */
+ public static <I, O> IoFuture<RemoteClientEndpoint<I, O>> connect(final Executor executor, final RemoteClientEndpoint<?, ?> localRoot, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
+ final BasicHandler basicHandler = new BasicHandler(false, allocator, localRoot, executor, null, new JavaSerializationMarshallerFactory(executor));
+ final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(basicHandler);
+ return new AbstractConvertingIoFuture<RemoteClientEndpoint<I, O>, AllocatedMessageChannel>(futureChannel) {
+ @SuppressWarnings({ "unchecked" })
+ protected RemoteClientEndpoint<I, O> convert(final AllocatedMessageChannel channel) throws RemotingException {
+ final RemoteClientEndpoint<?, ?> remoteClientEndpoint = basicHandler.getRemoteClient(0);
+ return (RemoteClientEndpoint) remoteClientEndpoint;
+ }
+ };
+ }
+}
Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java 2008-07-17 16:21:03 UTC (rev 4379)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+/**
+ *
+ */
+public final class MessageType {
+ //
+ public static final int REQUEST_ONEWAY = 0;
+ public static final int REQUEST = 1;
+ public static final int REPLY = 2;
+ public static final int CANCEL_REQUEST = 3;
+ public static final int CANCEL_ACK = 4;
+ public static final int REQUEST_FAILED = 5;
+ // Remote side called .close() on a forwarded RemoteClientEndpoint
+ public static final int CLIENT_CLOSE = 6;
+ // Remote side called .close() on a forwarded RemoteClientEndpoint
+ public static final int CLIENT_OPEN = 7;
+ public static final int SERVICE_CLOSE = 8;
+
+ private MessageType() {
+ }
+}
Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/SimpleWriteHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/SimpleWriteHandler.java (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/SimpleWriteHandler.java 2008-07-17 16:21:03 UTC (rev 4379)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+/**
+ *
+ */
+public final class SimpleWriteHandler implements WriteHandler {
+ private static final Logger log = Logger.getLogger(SimpleWriteHandler.class);
+
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final ByteBuffer[] buffers;
+
+ public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final List<ByteBuffer> buffers) {
+ this.allocator = allocator;
+ this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
+ logBufferSize();
+ }
+
+ public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer[] buffers) {
+ this.allocator = allocator;
+ this.buffers = buffers;
+ logBufferSize();
+ }
+
+ public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer buffer) {
+ this.allocator = allocator;
+ buffers = new ByteBuffer[] { buffer };
+ logBufferSize();
+ }
+
+ private void logBufferSize() {
+ if (log.isTrace()) {
+ long t = 0L;
+ for (ByteBuffer buf : buffers) {
+ t += (long)buf.remaining();
+ }
+ log.trace("Writing a message of size %d", Long.valueOf(t));
+ }
+ }
+
+ public boolean handleWrite(final WritableMessageChannel channel) {
+ boolean done = true;
+ try {
+ return (done = channel.send(buffers));
+ } catch (IOException e) {
+ log.trace(e, "Write failed");
+ return true;
+ } finally {
+ if (done) {
+ for (ByteBuffer buffer : buffers) {
+ allocator.free(buffer);
+ }
+ }
+ }
+ }
+}
Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/WriteHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/WriteHandler.java (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/WriteHandler.java 2008-07-17 16:21:03 UTC (rev 4379)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+/**
+ *
+ */
+public interface WriteHandler {
+ boolean handleWrite(WritableMessageChannel channel);
+}
Added: remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java (rev 0)
+++ remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-07-17 16:21:03 UTC (rev 4379)
@@ -0,0 +1,177 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import junit.framework.TestCase;
+import org.jboss.xnio.Xnio;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.ConfigurableFactory;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.TcpClient;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.cx.remoting.core.EndpointImpl;
+import org.jboss.cx.remoting.RequestContext;
+import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.ClientContext;
+import org.jboss.cx.remoting.ServiceContext;
+import org.jboss.cx.remoting.Client;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.test.support.LoggingHelper;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
+import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.nio.ByteBuffer;
+import java.net.InetSocketAddress;
+import java.io.Closeable;
+
+/**
+ *
+ */
+public final class ConnectionTestCase extends TestCase {
+ static {
+ LoggingHelper.init();
+ }
+
+ public void testConnection() throws Throwable {
+ final AtomicBoolean clientOpened = new AtomicBoolean(false);
+ final AtomicBoolean serviceOpened = new AtomicBoolean(false);
+ final AtomicBoolean clientClosed = new AtomicBoolean(false);
+ final AtomicBoolean serviceClosed = new AtomicBoolean(false);
+ final CountDownLatch clientCloseLatch = new CountDownLatch(1);
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ final BufferAllocator<ByteBuffer> allocator = new BufferAllocator<ByteBuffer>() {
+ public ByteBuffer allocate() {
+ return ByteBuffer.allocate(1024);
+ }
+
+ public void free(final ByteBuffer buffer) {
+ }
+ };
+ final Xnio xnio = Xnio.createNio();
+ try {
+ final EndpointImpl endpoint = new EndpointImpl();
+ endpoint.setExecutor(executorService);
+ endpoint.start();
+ try {
+ final RemoteServiceEndpoint<Object,Object> serverServiceEndpoint = endpoint.createServiceEndpoint(new RequestListener<Object, Object>() {
+ public void handleClientOpen(final ClientContext context) {
+ clientOpened.set(true);
+ }
+
+ public void handleServiceOpen(final ServiceContext context) {
+ serviceOpened.set(true);
+ }
+
+ public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
+ try {
+ System.out.println("Received request; sending response!");
+ context.sendReply("response");
+ } catch (RemotingException e) {
+ try {
+ context.sendFailure("failed", e);
+ } catch (RemotingException e1) {
+ System.out.println("Double fault!");
+ }
+ }
+ }
+
+ public void handleServiceClose(final ServiceContext context) {
+ serviceClosed.set(true);
+ }
+
+ public void handleClientClose(final ClientContext context) {
+ clientClosed.set(true);
+ clientCloseLatch.countDown();
+ }
+ });
+ try {
+ final Handle<RemoteServiceEndpoint<Object,Object>> handle = serverServiceEndpoint.getHandle();
+ serverServiceEndpoint.autoClose();
+ try {
+ final RemoteClientEndpointListener remoteListener = new RemoteClientEndpointListener() {
+
+ public <I, O> void notifyCreated(final RemoteClientEndpoint<I, O> endpoint) {
+
+ }
+ };
+ final ConfigurableFactory<Closeable> tcpServer = xnio.createTcpServer(executorService, Channels.convertStreamToAllocatedMessage(BasicProtocol.createServer(executorService, serverServiceEndpoint, allocator, remoteListener), 32768, 32768), new InetSocketAddress(12345));
+ final Closeable tcpServerCloseable = tcpServer.create();
+ try {
+ // now create a client to connect to it
+ final RemoteClientEndpoint<?,?> localRoot = serverServiceEndpoint.createClientEndpoint();
+ final InetSocketAddress destAddr = new InetSocketAddress("localhost", 12345);
+ final TcpClient tcpClient = xnio.createTcpConnector().create().createChannelSource(destAddr);
+ final ChannelSource<AllocatedMessageChannel> messageChannelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
+ final IoFuture<RemoteClientEndpoint<Object,Object>> futureClient = BasicProtocol.connect(executorService, localRoot, messageChannelSource, allocator);
+ final RemoteClientEndpoint<Object, Object> clientEndpoint = futureClient.get();
+ try {
+ final Client<Object,Object> client = endpoint.createClient(clientEndpoint);
+ try {
+ clientEndpoint.autoClose();
+ final Object result = client.send("Test").get();
+ assertEquals("response", result);
+ client.close();
+ tcpServerCloseable.close();
+ handle.close();
+ } finally {
+ IoUtils.safeClose(client);
+ clientCloseLatch.await(500L, TimeUnit.MILLISECONDS);
+ }
+ } finally {
+ IoUtils.safeClose(clientEndpoint);
+ }
+ } finally {
+ IoUtils.safeClose(tcpServerCloseable);
+ }
+ } finally {
+ IoUtils.safeClose(handle);
+ }
+ } finally {
+ IoUtils.safeClose(serverServiceEndpoint);
+ }
+ } finally {
+ endpoint.stop();
+ }
+ } finally {
+ IoUtils.safeClose(xnio);
+ }
+ } finally {
+ executorService.shutdownNow();
+ }
+ assertTrue(serviceOpened.get());
+ assertTrue(clientOpened.get());
+ assertTrue(clientClosed.get());
+ assertTrue(serviceClosed.get());
+ }
+}
16 years, 5 months
JBoss Remoting SVN: r4378 - remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-17 12:09:45 -0400 (Thu, 17 Jul 2008)
New Revision: 4378
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
Log:
Fix counter problem
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java 2008-07-17 04:39:05 UTC (rev 4377)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java 2008-07-17 16:09:45 UTC (rev 4378)
@@ -26,12 +26,15 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
+import org.jboss.xnio.log.Logger;
/**
*
*/
public final class OneBufferInputStream extends InputStream {
+ private static final Logger log = Logger.getLogger(OneBufferInputStream.class);
+
private final Object lock;
private ByteBuffer buffer;
private boolean eof;
@@ -101,6 +104,7 @@
buffer.get(b, off, rem);
off += rem;
len -= rem;
+ c += rem;
}
return c;
}
16 years, 5 months
JBoss Remoting SVN: r4377 - remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-17 00:39:05 -0400 (Thu, 17 Jul 2008)
New Revision: 4377
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java
Log:
Add missing notify()
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java 2008-07-17 02:25:47 UTC (rev 4376)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java 2008-07-17 04:39:05 UTC (rev 4377)
@@ -71,6 +71,7 @@
log.trace(t, "Failed to unmarshal an object");
}
done = true;
+ resultLock.notify();
}
}
});
16 years, 5 months
JBoss Remoting SVN: r4376 - remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-16 22:25:47 -0400 (Wed, 16 Jul 2008)
New Revision: 4376
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
Log:
Add various marshaller bugfixes
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java 2008-07-15 20:02:34 UTC (rev 4375)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java 2008-07-17 02:25:47 UTC (rev 4376)
@@ -29,11 +29,14 @@
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.xnio.log.Logger;
/**
*
*/
public abstract class AbstractSerializationUnmarshaller implements Unmarshaller<ByteBuffer> {
+ private static final Logger log = Logger.getLogger(AbstractSerializationUnmarshaller.class);
+
private final Executor executor;
private final ObjectInputStream objectInputStream;
private final Object resultLock = new Object();
@@ -41,7 +44,7 @@
protected final ObjectResolver resolver;
protected final OneBufferInputStream inputStream = new OneBufferInputStream(resultLock);
- private boolean done;
+ private boolean done = true;
private Object result;
private Throwable cause;
@@ -54,19 +57,35 @@
protected abstract ObjectInputStream getObjectInputStream() throws IOException;
public boolean unmarshal(final ByteBuffer buffer) throws IOException {
- executor.execute(new Runnable() {
- public void run() {
- synchronized (resultLock) {
- try {
- result = objectInputStream.readObject();
- } catch (Throwable t) {
- cause = t;
+ synchronized (resultLock) {
+ if (done) {
+ done = false;
+ executor.execute(new Runnable() {
+ public void run() {
+ synchronized (resultLock) {
+ try {
+ result = objectInputStream.readObject();
+ log.trace("Successfully unmarshalled object %s", result);
+ } catch (Throwable t) {
+ cause = t;
+ log.trace(t, "Failed to unmarshal an object");
+ }
+ done = true;
+ }
}
- done = true;
+ });
+ }
+ inputStream.setBuffer(buffer, false);
+ try {
+ while (! inputStream.isWaiting() && ! done) {
+ resultLock.wait();
}
+ return done;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("unmarshal operation was interrupted");
}
- });
- return false;
+ }
}
public Object get() throws IOException, IllegalStateException, ClassNotFoundException {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java 2008-07-15 20:02:34 UTC (rev 4375)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java 2008-07-17 02:25:47 UTC (rev 4376)
@@ -58,6 +58,10 @@
this.resolver = resolver;
}
+ protected final void readStreamHeader() {
+ // no headers
+ }
+
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
final String name = desc.getName();
if (primitiveTypes.containsKey(name)) {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java 2008-07-15 20:02:34 UTC (rev 4375)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java 2008-07-17 02:25:47 UTC (rev 4376)
@@ -65,6 +65,12 @@
}
}
+ public boolean isWaiting() {
+ synchronized (lock) {
+ return buffer == null;
+ }
+ }
+
public void setBuffer(ByteBuffer buffer, boolean eof) {
synchronized (lock) {
if (this.buffer != null) {
16 years, 5 months
JBoss Remoting SVN: r4375 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi/marshal and 1 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-15 16:02:34 -0400 (Tue, 15 Jul 2008)
New Revision: 4375
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/IdentityResolver.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationMarshaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarhsaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarshallerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/MarshallingAction.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java
Log:
Marshaller implementations; about 95% done (still have to figure a way to defeat the header reading/writing)
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-14 21:04:18 UTC (rev 4374)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -25,6 +25,7 @@
import org.jboss.cx.remoting.spi.remote.ReplyHandler;
import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
import org.jboss.cx.remoting.RequestCancelHandler;
import org.jboss.cx.remoting.RequestContext;
import org.jboss.cx.remoting.CloseHandler;
@@ -143,6 +144,14 @@
}
}
+ public static void safeAutoClose(final RemoteServiceEndpoint<Object, Object> remoteServiceEndpoint) {
+ try {
+ remoteServiceEndpoint.autoClose();
+ } catch (Throwable t) {
+ log.error("Failed to set autoClose on %s: %s", remoteServiceEndpoint, t);
+ }
+ }
+
private static final class BlankRemoteRequestContext implements RemoteRequestContext {
public void cancel(final boolean mayInterrupt) {
}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/IdentityResolver.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/IdentityResolver.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/IdentityResolver.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.marshal;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class IdentityResolver implements ObjectResolver {
+
+ private static final long serialVersionUID = -6980574391387456877L;
+ private static final IdentityResolver INSTANCE = new IdentityResolver();
+
+ private IdentityResolver() {}
+
+ public Object writeReplace(final Object original) throws IOException {
+ return original;
+ }
+
+ public Object readResolve(final Object original) throws IOException {
+ return original;
+ }
+
+ public static IdentityResolver getInstance() {
+ return INSTANCE;
+ }
+}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationMarshaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationMarshaller.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationMarshaller.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,93 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.spi.marshal.Marshaller;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.xnio.log.Logger;
+
+/**
+ *
+ */
+public abstract class AbstractSerializationMarshaller implements Marshaller<ByteBuffer> {
+ private static final Logger log = Logger.getLogger(AbstractSerializationMarshaller.class);
+
+ private final Executor executor;
+ private final ObjectOutputStream objectOutputStream;
+ private final Object resultLock = new Object();
+
+ protected final ObjectResolver resolver;
+ protected final OneBufferOutputStream outputStream = new OneBufferOutputStream(resultLock);
+
+ private boolean done = false;
+
+ protected AbstractSerializationMarshaller(final Executor executor, final ObjectResolver resolver) throws IOException {
+ this.executor = executor;
+ this.resolver = resolver;
+ objectOutputStream = getObjectOutputStream();
+ }
+
+ protected abstract ObjectOutputStream getObjectOutputStream() throws IOException;
+
+ public void start(final Object object) throws IOException, IllegalStateException {
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ log.trace("Beginning serializing object %s", object);
+ synchronized (objectOutputStream) {
+ objectOutputStream.writeObject(object);
+ log.trace("Flushing stream");
+ objectOutputStream.flush();
+ synchronized (resultLock) {
+ outputStream.flush();
+ done = true;
+ resultLock.notify();
+ log.trace("Completed serializing object %s", object);
+ }
+ }
+ } catch (Throwable t) {
+ log.error(t, "Serialization error");
+ }
+ }
+ });
+ }
+
+ public boolean marshal(final ByteBuffer buffer) throws IOException {
+ log.trace("Marshalling to buffer %s", buffer);
+ outputStream.setBuffer(buffer);
+ synchronized (resultLock) {
+ outputStream.await();
+ return done;
+ }
+ }
+
+ public void clearClassPool() throws IOException {
+ synchronized (objectOutputStream) {
+ objectOutputStream.reset();
+ }
+ }
+}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/AbstractSerializationUnmarshaller.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.InterruptedIOException;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+
+/**
+ *
+ */
+public abstract class AbstractSerializationUnmarshaller implements Unmarshaller<ByteBuffer> {
+ private final Executor executor;
+ private final ObjectInputStream objectInputStream;
+ private final Object resultLock = new Object();
+
+ protected final ObjectResolver resolver;
+ protected final OneBufferInputStream inputStream = new OneBufferInputStream(resultLock);
+
+ private boolean done;
+ private Object result;
+ private Throwable cause;
+
+ protected AbstractSerializationUnmarshaller(final Executor executor, final ObjectResolver resolver) throws IOException {
+ this.executor = executor;
+ this.resolver = resolver;
+ objectInputStream = getObjectInputStream();
+ }
+
+ protected abstract ObjectInputStream getObjectInputStream() throws IOException;
+
+ public boolean unmarshal(final ByteBuffer buffer) throws IOException {
+ executor.execute(new Runnable() {
+ public void run() {
+ synchronized (resultLock) {
+ try {
+ result = objectInputStream.readObject();
+ } catch (Throwable t) {
+ cause = t;
+ }
+ done = true;
+ }
+ }
+ });
+ return false;
+ }
+
+ public Object get() throws IOException, IllegalStateException, ClassNotFoundException {
+ synchronized (resultLock) {
+ while (! done) {
+ try {
+ resultLock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Interrupted while waiting for marshaling result");
+ }
+ }
+ if (cause != null) {
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else if (cause instanceof ClassNotFoundException) {
+ throw (ClassNotFoundException) cause;
+ } else {
+ throw new RuntimeException("Unmarshalling failed unexpectedly", cause);
+ }
+ }
+ return result;
+ }
+ }
+}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java 2008-07-14 21:04:18 UTC (rev 4374)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -2,65 +2,27 @@
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
+import java.io.ObjectOutputStream;
import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.spi.marshal.Marshaller;
import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
import org.jboss.serial.io.JBossObjectOutputStream;
+import org.jboss.xnio.log.Logger;
/**
*
*/
-public class JBossSerializationMarhsaller implements Marshaller<ByteBuffer> {
+public class JBossSerializationMarhsaller extends AbstractSerializationMarshaller {
- private final Executor executor;
+ private static final Logger log = Logger.getLogger(JBossSerializationMarhsaller.class);
- private final OurObjectOutputStream objectOutputStream;
- private final OneBufferOutputStream outputStream;
-
- private final Object resultLock = new Object();
- private boolean done = false;
-
public JBossSerializationMarhsaller(final Executor executor, final ObjectResolver resolver) throws IOException {
- this.executor = executor;
- outputStream = new OneBufferOutputStream(new Object());
- objectOutputStream = new OurObjectOutputStream(outputStream, resolver);
+ super(executor, resolver);
}
- public void start(final Object object) throws IOException, IllegalStateException {
- executor.execute(new Runnable() {
- public void run() {
- try {
- synchronized (objectOutputStream) {
- objectOutputStream.writeObject(object);
- objectOutputStream.flush();
- synchronized (resultLock) {
- outputStream.flush();
- done = true;
- resultLock.notify();
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- });
+ protected ObjectOutputStream getObjectOutputStream() throws IOException {
+ return new OurObjectOutputStream(outputStream, resolver);
}
- public boolean marshal(final ByteBuffer buffer) throws IOException {
- outputStream.setBuffer(buffer);
- synchronized (resultLock) {
- outputStream.await();
- return done;
- }
- }
-
- public void clearClassPool() throws IOException {
- synchronized (objectOutputStream) {
- objectOutputStream.reset();
- }
- }
-
private static final class OurObjectOutputStream extends JBossObjectOutputStream {
private final ObjectResolver resolver;
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java 2008-07-14 21:04:18 UTC (rev 4374)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -24,6 +24,6 @@
}
public Unmarshaller<ByteBuffer> createUnmarshaller(final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
- return null;
+ return new JBossSerializationUnmarshaller(executor, resolver, classLoader);
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java 2008-07-14 21:04:18 UTC (rev 4374)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationUnmarshaller.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -22,63 +22,32 @@
package org.jboss.cx.remoting.core.marshal;
-import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
import org.jboss.serial.io.JBossObjectInputStream;
-import java.nio.ByteBuffer;
import java.io.IOException;
import java.io.ObjectStreamClass;
import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
*
*/
-public final class JBossSerializationUnmarshaller implements Unmarshaller<ByteBuffer> {
- private final Executor executor;
- private final OurObjectInputStream objectInputStream;
+public final class JBossSerializationUnmarshaller extends AbstractSerializationUnmarshaller {
private final ClassLoader classLoader;
- private final ObjectResolver resolver;
- private final AtomicBoolean running = new AtomicBoolean();
-
public JBossSerializationUnmarshaller(final Executor executor, final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
- this.executor = executor;
- this.resolver = resolver;
+ super(executor, resolver);
this.classLoader = classLoader;
- objectInputStream = new OurObjectInputStream(new OneBufferInputStream(), resolver, classLoader);
}
- public boolean unmarshal(final ByteBuffer buffer) throws IOException {
- if (! running.getAndSet(true)) {
- executor.execute(new Runnable() {
- public void run() {
- synchronized (objectInputStream) {
- try {
- final Object object = objectInputStream.readObject();
-
- } catch (IOException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } finally {
- running.set(false);
- }
- }
- }
- });
- }
- return false;
+ protected ObjectInputStream getObjectInputStream() throws IOException {
+ return new OurObjectInputStream(inputStream, resolver, classLoader);
}
- public Object get() throws IOException, ClassNotFoundException, IllegalStateException {
- return null;
- }
-
private static final class OurObjectInputStream extends JBossObjectInputStream {
private final ClassLoader classLoader;
private final ObjectResolver resolver;
Copied: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarhsaller.java (from rev 4371, remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarhsaller.java)
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarhsaller.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarhsaller.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,43 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.marshal.IdentityResolver;
+import org.jboss.xnio.log.Logger;
+
+/**
+ *
+ */
+public class JavaSerializationMarhsaller extends AbstractSerializationMarshaller {
+
+ private static final Logger log = Logger.getLogger(JBossSerializationMarhsaller.class);
+
+ public JavaSerializationMarhsaller(final Executor executor, final ObjectResolver resolver) throws IOException {
+ super(executor, resolver);
+ }
+
+ protected ObjectOutputStream getObjectOutputStream() throws IOException {
+ return new OurObjectOutputStream(outputStream, resolver == null ? IdentityResolver.getInstance() : resolver);
+ }
+
+ private static final class OurObjectOutputStream extends ObjectOutputStream {
+ private final ObjectResolver resolver;
+
+ private OurObjectOutputStream(final OutputStream outputStream, final ObjectResolver resolver) throws IOException {
+ super(outputStream);
+ enableReplaceObject(true);
+ this.resolver = resolver;
+ }
+
+ protected Object replaceObject(final Object obj) throws IOException {
+ return resolver.writeReplace(obj);
+ }
+
+ protected void writeStreamHeader() throws IOException {
+ // no headers
+ }
+ }
+}
\ No newline at end of file
Copied: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarshallerFactory.java (from rev 4371, remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JBossSerializationMarshallerFactory.java)
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarshallerFactory.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationMarshallerFactory.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,29 @@
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import org.jboss.cx.remoting.spi.marshal.Marshaller;
+import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
+
+/**
+ *
+ */
+public class JavaSerializationMarshallerFactory implements MarshallerFactory<ByteBuffer> {
+
+ private final Executor executor;
+
+ public JavaSerializationMarshallerFactory(final Executor executor) {
+ this.executor = executor;
+ }
+
+ public Marshaller<ByteBuffer> createMarshaller(final ObjectResolver resolver) throws IOException {
+ return new JavaSerializationMarhsaller(executor, resolver);
+ }
+
+ public Unmarshaller<ByteBuffer> createUnmarshaller(final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
+ return new JavaSerializationUnmarshaller(executor, resolver, classLoader);
+ }
+}
\ No newline at end of file
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/JavaSerializationUnmarshaller.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,101 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.ObjectInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+import java.util.concurrent.Executor;
+import java.util.Map;
+import java.util.HashMap;
+import java.lang.reflect.Proxy;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+
+/**
+ *
+ */
+public final class JavaSerializationUnmarshaller extends AbstractSerializationUnmarshaller {
+
+ private final ClassLoader classLoader;
+
+ public JavaSerializationUnmarshaller(final Executor executor, final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
+ super(executor, resolver);
+ this.classLoader = classLoader;
+ }
+
+ protected ObjectInputStream getObjectInputStream() throws IOException {
+ return new OurObjectInputStream(inputStream, resolver, classLoader);
+ }
+
+ private static final class OurObjectInputStream extends ObjectInputStream {
+ private final ClassLoader classLoader;
+ private final ObjectResolver resolver;
+
+ private OurObjectInputStream(final InputStream inputStream, final ObjectResolver resolver, final ClassLoader classLoader) throws IOException {
+ super(inputStream);
+ this.classLoader = classLoader;
+ this.resolver = resolver;
+ }
+
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+ final String name = desc.getName();
+ if (primitiveTypes.containsKey(name)) {
+ return primitiveTypes.get(name);
+ } else {
+ return Class.forName(name, false, classLoader);
+ }
+ }
+
+ protected Class<?> resolveProxyClass(final String[] interfaceNames) throws IOException, ClassNotFoundException {
+ final int length = interfaceNames.length;
+ final Class<?>[] interfaces = new Class[length];
+ for (int i = 0; i < length; i ++) {
+ interfaces[i] = Class.forName(interfaceNames[i], false, classLoader);
+ }
+ return Proxy.getProxyClass(classLoader, interfaces);
+ }
+
+ protected Object resolveObject(final Object obj) throws IOException {
+ return resolver.readResolve(obj);
+ }
+
+ private static final Map<String, Class<?>> primitiveTypes = new HashMap<String, Class<?>>();
+
+ private static <T> void add(Class<T> type) {
+ primitiveTypes.put(type.getName(), type);
+ }
+
+ static {
+ add(void.class);
+ add(boolean.class);
+ add(byte.class);
+ add(short.class);
+ add(int.class);
+ add(long.class);
+ add(float.class);
+ add(double.class);
+ add(char.class);
+ }
+ }
+}
Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/MarshallingAction.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/MarshallingAction.java (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/MarshallingAction.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core.marshal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public interface MarshallingAction {
+ boolean marshal(ByteBuffer buffer) throws IOException;
+}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java 2008-07-14 21:04:18 UTC (rev 4374)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferInputStream.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -32,15 +32,23 @@
*/
public final class OneBufferInputStream extends InputStream {
- private final Object lock = new Object();
+ private final Object lock;
private ByteBuffer buffer;
+ private boolean eof;
+ public OneBufferInputStream(final Object lock) {
+ this.lock = lock;
+ }
+
private ByteBuffer getBuffer() throws InterruptedIOException {
synchronized (lock) {
for (;;) {
final ByteBuffer buffer = this.buffer;
if (buffer != null) {
if (! buffer.hasRemaining()) {
+ if (eof) {
+ return null;
+ }
lock.notify();
this.buffer = null;
} else {
@@ -57,15 +65,38 @@
}
}
+ public void setBuffer(ByteBuffer buffer, boolean eof) {
+ synchronized (lock) {
+ if (this.buffer != null) {
+ throw new IllegalStateException("Buffer already set");
+ }
+ this.buffer = buffer;
+ this.eof = eof;
+ lock.notify();
+ }
+ }
+
public int read() throws IOException {
synchronized (lock) {
- return getBuffer().get() & 0xff;
+ final ByteBuffer buffer = getBuffer();
+ return buffer == null ? -1 : buffer.get() & 0xff;
}
}
public int read(final byte[] b, int off, int len) throws IOException {
+ int c = 0;
synchronized (lock) {
- return 0;
+ while (len > 0) {
+ final ByteBuffer buffer = getBuffer();
+ if (buffer == null) {
+ return c == 0 ? -1 : c;
+ }
+ int rem = Math.min(len, buffer.remaining());
+ buffer.get(b, off, rem);
+ off += rem;
+ len -= rem;
+ }
+ return c;
}
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java 2008-07-14 21:04:18 UTC (rev 4374)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/OneBufferOutputStream.java 2008-07-15 20:02:34 UTC (rev 4375)
@@ -55,8 +55,8 @@
final ByteBuffer buffer = this.buffer;
if (buffer != null) {
if (! buffer.hasRemaining()) {
+ this.buffer = null;
lock.notify();
- this.buffer = null;
} else {
return buffer;
}
16 years, 5 months
JBoss Remoting SVN: r4374 - remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-07-14 17:04:18 -0400 (Mon, 14 Jul 2008)
New Revision: 4374
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java
Log:
add a listener type for client creation
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java 2008-07-14 21:04:18 UTC (rev 4374)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.remote;
+
+/**
+ * A listener that watches for creation of remote client endpoints.
+ */
+public interface RemoteClientEndpointListener {
+
+ /**
+ * Receive notification of the creation of a new endpoint.
+ *
+ * @param <I> the request type
+ * @param <O> the reply type
+ * @param endpoint the endpoint that was created
+ */
+ <I, O> void notifyCreated(RemoteClientEndpoint<I, O> endpoint);
+}
16 years, 5 months