[jboss-cvs] JBoss Messaging SVN: r1483 - branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/remoting

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Oct 17 03:53:11 EDT 2006


Author: ron_sigal
Date: 2006-10-17 03:53:10 -0400 (Tue, 17 Oct 2006)
New Revision: 1483

Modified:
   branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
Log:
JBMESSAGING-207:  Needs to unwrap Callbacks to allow JMSWireFormat access to messages in payload.

Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-10-17 07:46:05 UTC (rev 1482)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-10-17 07:53:10 UTC (rev 1483)
@@ -50,6 +50,8 @@
 import org.jboss.messaging.core.plugin.IdBlock;
 import org.jboss.remoting.InvocationRequest;
 import org.jboss.remoting.InvocationResponse;
+import org.jboss.remoting.callback.Callback;
+import org.jboss.remoting.invocation.InternalInvocation;
 import org.jboss.remoting.marshal.Marshaller;
 import org.jboss.remoting.marshal.UnMarshaller;
 import org.jboss.serial.io.JBossObjectInputStream;
@@ -81,7 +83,7 @@
 
    private static final Logger log = Logger.getLogger(JMSWireFormat.class);
    
-   private static boolean usingJBossSerialization;
+   private static boolean usingJBossSerialization = true;
 
    // The request codes  - start from zero
 
@@ -129,14 +131,28 @@
 
    public void write(Object obj, OutputStream out) throws IOException
    {          
-      //Sanity check
-      if (!(out instanceof MessagingObjectOutputStream))
+//      //Sanity check
+//      if (!(out instanceof MessagingObjectOutputStream))
+//      {
+//         throw new IllegalStateException("Must be MessagingObjectOutputStream");
+//      }
+      log.info("write(): " + obj);
+      DataOutputStream dos = null;
+      
+      // This won't be necessary: see JBREM-597.
+      if (out instanceof MessagingObjectOutputStream)
       {
-         throw new IllegalStateException("Must be MessagingObjectOutputStream");
+         dos = (DataOutputStream)(((MessagingObjectOutputStream)out).getUnderlyingStream());
       }
+      else if (out instanceof DataOutputStream)
+      {
+         dos = (DataOutputStream) out;
+      }
+      else
+      {
+         dos = new DataOutputStream(out);
+      }
       
-      DataOutputStream dos = (DataOutputStream)(((MessagingObjectOutputStream)out).getUnderlyingStream());
-            
       handleVersion(obj, dos);
 
       try
@@ -148,9 +164,27 @@
             InvocationRequest req = (InvocationRequest)obj;
    
             Object param;
-   
-            if (req.getParameter() instanceof MessagingMarshallable)
+            
+            // Unwrap Callback.
+            if (req.getParameter() instanceof InternalInvocation)
             {
+               log.info("InternalInvocation: " + req.getParameter());
+               InternalInvocation ii = (InternalInvocation) req.getParameter();
+               Object[] params = ii.getParameters();
+               
+               if (params != null && params.length > 0 && params[0] instanceof Callback)
+               {
+                  Callback callback = (Callback) params[0];
+                  MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
+                  param = mm.getLoad();
+               }
+               else
+               {
+                  param = req.getParameter();
+               }
+            }
+            else if (req.getParameter() instanceof MessagingMarshallable)
+            {
                param = ((MessagingMarshallable)req.getParameter()).getLoad();
             }
             else
@@ -299,6 +333,8 @@
                ClientDelivery dr = (ClientDelivery)param;
    
                dos.writeByte(CALLBACK);
+               
+               dos.writeUTF(req.getSessionId());
    
                dr.write(dos);
    
@@ -441,14 +477,28 @@
 
    public Object read(InputStream in, Map map) throws IOException, ClassNotFoundException
    {      
-      // Sanity check
-      if (!(in instanceof MessagingObjectInputStream))
+//      // Sanity check
+//      if (!(in instanceof MessagingObjectInputStream))
+//      {
+//         throw new IllegalStateException("Must be MessagingObjectInputStream");
+//      }
+      
+      DataInputStream dis = null;
+      
+      // This won't be necessary: see JBREM-597.
+      if (in instanceof MessagingObjectInputStream)
       {
-         throw new IllegalStateException("Must be MessagingObjectInputStream");
+         dis = (DataInputStream)(((MessagingObjectInputStream)in).getUnderlyingStream());
       }
+      else if (in instanceof DataInputStream)
+      {
+         dis = (DataInputStream) in;
+      }
+      else
+      {
+         dis = new DataInputStream(in);
+      }
       
-      DataInputStream dis = (DataInputStream)(((MessagingObjectInputStream)in).getUnderlyingStream());
-
       // First byte read is always version
 
       byte version = dis.readByte();
@@ -693,14 +743,20 @@
             }
             case CALLBACK:
             {
+               String sessionId = dis.readUTF();
                ClientDelivery dr = new ClientDelivery();
                
                dr.read(dis);
+
+               // Recreate Callback.
+               MessagingMarshallable mm = new MessagingMarshallable(version, dr);
+               Callback callback = new Callback(mm);
+               InternalInvocation ii
+                  = new InternalInvocation(InternalInvocation.HANDLECALLBACK, new Object[]{callback});
+               InvocationRequest request
+                  = new InvocationRequest(sessionId, CallbackServerFactory.JMS_CALLBACK_SUBSYSTEM,
+                                          ii, null, null, null);
    
-               InvocationRequest request =
-                  new InvocationRequest(null, CallbackServerFactory.JMS_CALLBACK_SUBSYSTEM,
-                                        new MessagingMarshallable(version, dr), null, null, null);
-   
                if (trace) { log.trace("read callback()"); }
    
                return request;




More information about the jboss-cvs-commits mailing list