[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...

Ron Sigal ron_sigal at yahoo.com
Mon Sep 25 19:34:47 EDT 2006


  User: rsigal  
  Date: 06/09/25 19:34:47

  Modified:    src/main/org/jboss/remoting/callback 
                        ServerInvokerCallbackHandler.java
  Log:
  JBREM-605: Added facility for informing a CallbackListener when a callback has been sent to client and processed.
  
  Revision  Changes    Path
  1.13      +117 -0    JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerInvokerCallbackHandler.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -b -r1.12 -r1.13
  --- ServerInvokerCallbackHandler.java	11 Jul 2006 20:09:50 -0000	1.12
  +++ ServerInvokerCallbackHandler.java	25 Sep 2006 23:34:47 -0000	1.13
  @@ -33,6 +33,7 @@
   import org.jboss.remoting.security.SSLSocketBuilder;
   import org.jboss.remoting.security.SSLSocketBuilderMBean;
   import org.jboss.remoting.security.SSLSocketFactoryService;
  +import org.jboss.util.id.GUID;
   
   import javax.management.MBeanServer;
   import javax.management.MBeanServerInvocationHandler;
  @@ -42,7 +43,9 @@
   import javax.net.ssl.SSLServerSocketFactory;
   import java.io.IOException;
   import java.util.ArrayList;
  +import java.util.Collections;
   import java.util.HashMap;
  +import java.util.Iterator;
   import java.util.List;
   import java.util.Map;
   
  @@ -90,12 +93,39 @@
      public static final String CALLBACK_MEM_CEILING = "callbackMemCeiling";
   
      /**
  +    * The key used for storing a CallbackListener in the return metadata map of a Callback.
  +    * This one will be called when the Callback is retrieved in getCallbacks().
  +    */
  +   public static final String CALLBACK_PREPROCESS_LISTENER = "callbackPreprocessListener";
  +   
  +   /**
  +    * The key used for storing a CallbackListener in the return metadata map of a Callback.
  +    * This one will be called after the Callback is delivered on the client side.
  +    */
  +   public static final String CALLBACK_POSTPROCESS_LISTENER = "callbackPostprocessListener";
  +   
  +   /**
  +    * This key used to identify a Callback to be acknowledged.
  +    */
  +   public static final String CALLBACK_ID = "callbackId";
  +   
  +   /**
       * The percentage number of used memory before should persist messages.
       * For example, if 64MB available and only 30MB free mem and memPercentCeiling
       * is 50, then would trigger persisting of messages.
       */
      private double memPercentCeiling = 20; // 20% by default
   
  +   /**
  +    * Maps a GUID to a Callback waiting to be acknowledged.
  +    */
  +   private Map idToCallbackMap = Collections.synchronizedMap(new HashMap());
  +
  +   /**
  +    * Maps a GUID to a CallbackListener for a Callback waiting to be acknowledged.
  +    */
  +   private Map idToListenerMap = Collections.synchronizedMap(new HashMap());
  +   
      private static final Logger log = Logger.getLogger(ServerInvokerCallbackHandler.class);
   
   
  @@ -483,6 +513,8 @@
         }
         callbackList.addAll(persistedCallbacks);
   
  +      notifyPreprocessListeners(callbackList);
  +
         return callbackList;
      }
   
  @@ -512,6 +544,24 @@
         return callbacks;
      }
   
  +   private void notifyPreprocessListeners(List callbacks)
  +   {
  +      Iterator it = callbacks.iterator();
  +      while (it.hasNext())
  +      {
  +         Callback callback = (Callback) it.next();
  +         Map returnPayload = callback.getReturnPayload();
  +         if (returnPayload != null)
  +         {
  +            CallbackListener listener = (CallbackListener) returnPayload.remove(CALLBACK_PREPROCESS_LISTENER);
  +            if (listener != null)
  +            {
  +               listener.callbackSent(callback);
  +            }
  +         }
  +      }
  +   }
  +   
      public boolean isPullCallbackHandler()
      {
         return (callBackClient == null);
  @@ -564,6 +614,8 @@
                     callbacks.add(callback);
                  }
               }
  +            
  +            checkForPullPostprocessListener(callback);
            }
            else
            {
  @@ -573,6 +625,9 @@
                  {
                     log.debug("push callback.  Calling client now.");
                  }
  +               
  +               CallbackListener callbackListener = null;
  +               
                  if(callback != null)
                  {
                     Map returnPayload = callback.getReturnPayload();
  @@ -580,9 +635,14 @@
                     {
                        returnPayload = new HashMap();
                     }
  +                  else
  +                  {
  +                     callbackListener = (CallbackListener) returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
  +                  }
                     returnPayload.put(Callback.SERVER_LOCATOR_KEY, serverLocator);
                     callback.setReturnPayload(returnPayload);
                  }
  +               
                  // sending internal invocation so server invoker we are sending to
                  // will know how pass onto it's client callback handler
                  InternalInvocation internalInvocation = new InternalInvocation(InternalInvocation.HANDLECALLBACK,
  @@ -590,6 +650,10 @@
                  callBackClient.setSessionId(sessionId);
                  callBackClient.invoke(internalInvocation,
                                        callback.getRequestPayload());
  +               
  +               if (callbackListener != null)
  +                  callbackListener.callbackSent(callback);
  +  
               }
               catch(Throwable ex)
               {
  @@ -649,6 +713,59 @@
         }
      }
   
  +   private void checkForPullPostprocessListener(Callback callback)
  +   {
  +      Map returnPayload = callback.getReturnPayload();
  +      if (returnPayload != null)
  +      {
  +         CallbackListener listener = (CallbackListener) returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
  +         if (listener != null)
  +         {
  +            Object guid = new GUID();
  +            idToCallbackMap.put(guid, callback);
  +            idToListenerMap.put(guid, listener);
  +            returnPayload.put(CALLBACK_ID, guid);
  +         }
  +      }
  +   }
  +   
  +   private void checkForPushPostprocessListener(Callback callback)
  +   {
  +      Map returnPayload = callback.getReturnPayload();
  +      if (returnPayload != null)
  +      {
  +         CallbackListener listener = (CallbackListener) returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
  +         if (listener != null)
  +         {
  +            listener.callbackSent(callback);
  +         }
  +      }
  +   }
  +   
  +   /**
  +    * Calls a postprocess listener
  +    * @param callbackId identity of Callback to acknowledge
  +    */
  +   public void acknowledgeCallback(InternalInvocation invocation) throws Exception 
  +   {
  +      Object callbackId = invocation.getParameters()[0];
  +      
  +      // sanity check
  +      if (!(callbackId instanceof GUID))
  +      {
  +         log.error("received callbackId with invalid type: " + callbackId.getClass());
  +         throw new Exception("Cannot acknowledge: invalid callbackId type: " + callbackId.getClass());
  +      }
  +      
  +      Callback callback = (Callback) idToCallbackMap.get(callbackId);
  +      CallbackListener listener = (CallbackListener) idToListenerMap.get(callbackId);
  +      
  +      if (callback == null || listener == null)
  +         throw new Exception("Cannot acknowledge Callback: unrecognized id: " + callbackId);
  +         
  +      listener.callbackSent(callback);   
  +   }
  +
      /**
       * Returns the id for this handler
       *
  
  
  



More information about the jboss-cvs-commits mailing list