[jboss-cvs] JBoss Messaging SVN: r1483 - branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/remoting
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 17 03:53:11 EDT 2006
Author: ron_sigal
Date: 2006-10-17 03:53:10 -0400 (Tue, 17 Oct 2006)
New Revision: 1483
Modified:
branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
Log:
JBMESSAGING-207: Needs to unwrap Callbacks to allow JMSWireFormat access to messages in payload.
Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-10-17 07:46:05 UTC (rev 1482)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-10-17 07:53:10 UTC (rev 1483)
@@ -50,6 +50,8 @@
import org.jboss.messaging.core.plugin.IdBlock;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.InvocationResponse;
+import org.jboss.remoting.callback.Callback;
+import org.jboss.remoting.invocation.InternalInvocation;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;
import org.jboss.serial.io.JBossObjectInputStream;
@@ -81,7 +83,7 @@
private static final Logger log = Logger.getLogger(JMSWireFormat.class);
- private static boolean usingJBossSerialization;
+ private static boolean usingJBossSerialization = true;
// The request codes - start from zero
@@ -129,14 +131,28 @@
public void write(Object obj, OutputStream out) throws IOException
{
- //Sanity check
- if (!(out instanceof MessagingObjectOutputStream))
+// //Sanity check
+// if (!(out instanceof MessagingObjectOutputStream))
+// {
+// throw new IllegalStateException("Must be MessagingObjectOutputStream");
+// }
+ log.info("write(): " + obj);
+ DataOutputStream dos = null;
+
+ // This won't be necessary: see JBREM-597.
+ if (out instanceof MessagingObjectOutputStream)
{
- throw new IllegalStateException("Must be MessagingObjectOutputStream");
+ dos = (DataOutputStream)(((MessagingObjectOutputStream)out).getUnderlyingStream());
}
+ else if (out instanceof DataOutputStream)
+ {
+ dos = (DataOutputStream) out;
+ }
+ else
+ {
+ dos = new DataOutputStream(out);
+ }
- DataOutputStream dos = (DataOutputStream)(((MessagingObjectOutputStream)out).getUnderlyingStream());
-
handleVersion(obj, dos);
try
@@ -148,9 +164,27 @@
InvocationRequest req = (InvocationRequest)obj;
Object param;
-
- if (req.getParameter() instanceof MessagingMarshallable)
+
+ // Unwrap Callback.
+ if (req.getParameter() instanceof InternalInvocation)
{
+ log.info("InternalInvocation: " + req.getParameter());
+ InternalInvocation ii = (InternalInvocation) req.getParameter();
+ Object[] params = ii.getParameters();
+
+ if (params != null && params.length > 0 && params[0] instanceof Callback)
+ {
+ Callback callback = (Callback) params[0];
+ MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
+ param = mm.getLoad();
+ }
+ else
+ {
+ param = req.getParameter();
+ }
+ }
+ else if (req.getParameter() instanceof MessagingMarshallable)
+ {
param = ((MessagingMarshallable)req.getParameter()).getLoad();
}
else
@@ -299,6 +333,8 @@
ClientDelivery dr = (ClientDelivery)param;
dos.writeByte(CALLBACK);
+
+ dos.writeUTF(req.getSessionId());
dr.write(dos);
@@ -441,14 +477,28 @@
public Object read(InputStream in, Map map) throws IOException, ClassNotFoundException
{
- // Sanity check
- if (!(in instanceof MessagingObjectInputStream))
+// // Sanity check
+// if (!(in instanceof MessagingObjectInputStream))
+// {
+// throw new IllegalStateException("Must be MessagingObjectInputStream");
+// }
+
+ DataInputStream dis = null;
+
+ // This won't be necessary: see JBREM-597.
+ if (in instanceof MessagingObjectInputStream)
{
- throw new IllegalStateException("Must be MessagingObjectInputStream");
+ dis = (DataInputStream)(((MessagingObjectInputStream)in).getUnderlyingStream());
}
+ else if (in instanceof DataInputStream)
+ {
+ dis = (DataInputStream) in;
+ }
+ else
+ {
+ dis = new DataInputStream(in);
+ }
- DataInputStream dis = (DataInputStream)(((MessagingObjectInputStream)in).getUnderlyingStream());
-
// First byte read is always version
byte version = dis.readByte();
@@ -693,14 +743,20 @@
}
case CALLBACK:
{
+ String sessionId = dis.readUTF();
ClientDelivery dr = new ClientDelivery();
dr.read(dis);
+
+ // Recreate Callback.
+ MessagingMarshallable mm = new MessagingMarshallable(version, dr);
+ Callback callback = new Callback(mm);
+ InternalInvocation ii
+ = new InternalInvocation(InternalInvocation.HANDLECALLBACK, new Object[]{callback});
+ InvocationRequest request
+ = new InvocationRequest(sessionId, CallbackServerFactory.JMS_CALLBACK_SUBSYSTEM,
+ ii, null, null, null);
- InvocationRequest request =
- new InvocationRequest(null, CallbackServerFactory.JMS_CALLBACK_SUBSYSTEM,
- new MessagingMarshallable(version, dr), null, null, null);
-
if (trace) { log.trace("read callback()"); }
return request;
More information about the jboss-cvs-commits
mailing list