Author: david.lloyd(a)jboss.com
Date: 2008-03-27 00:46:56 -0400 (Thu, 27 Mar 2008)
New Revision: 3792
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java
Log:
Switch back to jboss serialization (yay), enable proper context forwarding...
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-03-27
04:07:05 UTC (rev 3791)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-03-27
04:46:56 UTC (rev 3792)
@@ -39,6 +39,10 @@
import org.jboss.cx.remoting.spi.stream.StreamDetector;
import org.jboss.cx.remoting.spi.stream.StreamSerializer;
import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
+import org.jboss.serial.io.JBossObjectOutputStream;
+import org.jboss.serial.io.JBossObjectInputStream;
+import org.jboss.serial.io.JBossObjectOutputStreamSharedTree;
+import org.jboss.serial.io.JBossObjectInputStreamSharedTree;
/**
@@ -473,7 +477,11 @@
throw new NullPointerException("data is null");
}
final CoreStream coreStream = streams.get(streamIdentifier);
- coreStream.receiveStreamData(data);
+ if (coreStream == null) {
+ log.trace("Received stream data on an unknown context %s",
streamIdentifier);
+ } else {
+ coreStream.receiveStreamData(data);
+ }
}
@SuppressWarnings ({"unchecked"})
@@ -512,7 +520,7 @@
// message output
- private final class ObjectMessageOutputImpl extends ObjectOutputStream implements
ObjectMessageOutput {
+ private final class ObjectMessageOutputImpl extends JBossObjectOutputStream
implements ObjectMessageOutput {
private final ByteMessageOutput target;
private final List<StreamDetector> streamDetectors;
private final List<StreamSerializer> streamSerializers = new
ArrayList<StreamSerializer>();
@@ -573,10 +581,21 @@
return target.getBytesWritten();
}
- protected void writeObjectOverride(Object obj) throws IOException {
- super.writeObjectOverride(obj);
+ protected final void writeObjectOverride(Object obj) throws IOException {
+ if (obj instanceof AbstractRealContext) {
+ super.writeObjectOverride(doContextReplace(((AbstractRealContext<?,
?>)obj).getContextServer()));
+ } else {
+ super.writeObjectOverride(obj);
+ }
}
+ private final <I, O> ContextMarker doContextReplace(ContextServer<I,
O> contextServer) throws IOException {
+ final ContextIdentifier contextIdentifier = protocolHandler.openContext();
+ final ProtocolContextClientImpl<I, O> contextClient = new
ProtocolContextClientImpl<I, O>(contextIdentifier);
+ new ServerContextPair<I, O>(contextClient, contextServer);
+ return new ContextMarker(contextIdentifier);
+ }
+
protected Object replaceObject(Object obj) throws IOException {
final Object testObject = super.replaceObject(obj);
for (StreamDetector detector : streamDetectors) {
@@ -602,16 +621,16 @@
// message input
- private final class ObjectInputImpl extends ObjectInputStream {
+ private final class ObjectInputImpl extends JBossObjectInputStream {
private ClassLoader classLoader;
public ObjectInputImpl(final InputStream is) throws IOException {
super(is);
- super.enableResolveObject(true);
+ enableResolveObject(true);
}
- protected Object resolveObject(Object obj) throws IOException {
+ public Object resolveObject(Object obj) throws IOException {
final Object testObject = super.resolveObject(obj);
if (testObject instanceof StreamMarker) {
StreamMarker marker = (StreamMarker) testObject;
Modified:
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java
===================================================================
---
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java 2008-03-27
04:07:05 UTC (rev 3791)
+++
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java 2008-03-27
04:46:56 UTC (rev 3792)
@@ -339,6 +339,7 @@
final IoBuffer buffer = newBuffer(rawMsgData.length + 100, false);
final ObjectMessageOutput output = protocolContext.getMessageOutput(new
IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.SASL_RESPONSE);
+ output.writeInt(rawMsgData.length);
output.write(rawMsgData);
output.commit();
}
@@ -347,6 +348,7 @@
final IoBuffer buffer = newBuffer(rawMsgData.length + 100, false);
final ObjectMessageOutput output = protocolContext.getMessageOutput(new
IoBufferByteMessageOutput(buffer, ioSession));
write(output, MessageType.SASL_CHALLENGE);
+ output.writeInt(rawMsgData.length);
output.write(rawMsgData);
output.commit();
}
@@ -718,7 +720,8 @@
if (trace) {
log.trace("Recevied SASL response from
client");
}
- byte[] bytes = new byte[input.remaining()];
+ int len = input.readInt();
+ byte[] bytes = new byte[len];
input.readFully(bytes);
SaslServerFilter saslServerFilter = getSaslServerFilter();
try {
@@ -820,7 +823,8 @@
case AWAITING_SERVER_CHALLENGE: {
switch (type) {
case SASL_CHALLENGE: {
- byte[] bytes = new byte[input.remaining()];
+ int len = input.readInt();
+ byte[] bytes = new byte[len];
input.readFully(bytes);
SaslClientFilter saslClientFilter = getSaslClientFilter();
try {
Show replies by date