[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...

Ron Sigal ron_sigal at yahoo.com
Wed Nov 15 23:06:31 EST 2006


  User: rsigal  
  Date: 06/11/15 23:06:31

  Modified:    src/main/org/jboss/remoting/callback 
                        ServerInvokerCallbackHandler.java
  Log:
  JBREM-605:  (1) Callback acknowledgements can now return a response. (2) Preprocess callback acknowledgements eliminated.  (3) Application acknowledgemnts supported for push and pull callbacks.
  
  Revision  Changes    Path
  1.16      +81 -133   JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerInvokerCallbackHandler.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -b -r1.15 -r1.16
  --- ServerInvokerCallbackHandler.java	28 Oct 2006 19:59:34 -0000	1.15
  +++ ServerInvokerCallbackHandler.java	16 Nov 2006 04:06:31 -0000	1.16
  @@ -93,15 +93,15 @@
   
      /**
       * The key used for storing a CallbackListener in the return metadata map of a Callback.
  -    * This one will be called when the Callback is retrieved in getCallbacks().
       */
  -   public static final String CALLBACK_PREPROCESS_LISTENER = "callbackPreprocessListener";
  +   public static final String CALLBACK_LISTENER = "callbackListener";
      
  -   /**
  -    * The key used for storing a CallbackListener in the return metadata map of a Callback.
  -    * This one will be called after the Callback is delivered on the client side.
  +   /*
  +    * The key used to indicate if callback acknowledgement should be handled by Remoting
  +    * or the application.  If it is present in the callback's return payload and
  +    * set to true, Remoting will handle the acknowledgement for push callbacks.
       */
  -   public static final String CALLBACK_POSTPROCESS_LISTENER = "callbackPostprocessListener";
  +   public static final String REMOTING_ACKNOWLEDGES_PUSH_CALLBACKS = "remotingAcknowledgesPushCallbackss";
      
      /**
       * This key used to identify a Callback to be acknowledged.
  @@ -109,16 +109,6 @@
      public static final String CALLBACK_ID = "callbackId";
      
      /**
  -    * Passed to CallbackListner to indicate preprocess acknowledgement
  -    */
  -   public static final int CALLBACK_ACK_PREPROCESS = 1;
  -   
  -   /**
  -    * Passed to CallbackListner to indicate postprocess acknowledgement
  -    */
  -   public static final int CALLBACK_ACK_POSTPROCESS = 2;
  -   
  -   /**
       * The percentage number of used memory before should persist messages.
       * For example, if 64MB available and only 30MB free mem and memPercentCeiling
       * is 50, then would trigger persisting of messages.
  @@ -126,14 +116,9 @@
      private double memPercentCeiling = 20; // 20% by default
      
      /**
  -    * Maps an ID to a preprocess CallbackListener for a Callback waiting to be acknowledged.
  +    * Maps an ID to a CallbackListener for a Callback waiting to be acknowledged.
       */
  -   private Map idToPreprocessListenerMap = Collections.synchronizedMap(new HashMap());
  -   
  -   /**
  -    * Maps an ID to a postprocess CallbackListener for a Callback waiting to be acknowledged.
  -    */
  -   private Map idToPostprocessListenerMap = Collections.synchronizedMap(new HashMap());
  +   private Map idToListenerMap = Collections.synchronizedMap(new HashMap());
      
      private static final Logger log = Logger.getLogger(ServerInvokerCallbackHandler.class);
   
  @@ -521,9 +506,6 @@
            throw new RuntimeException("Error getting callbacks", e);
         }
         callbackList.addAll(persistedCallbacks);
  -      
  -      notifyPreprocessListeners(callbackList);
  -
         return callbackList;
      }
   
  @@ -570,10 +552,10 @@
      {
         try
         {
  +         Object callbackId = checkForCallbackListener(callback);
  +         
            if(callBackClient == null)
            {
  -            checkForPullCallbackListeners(callback);
  - 
               // need to check if shoudl persist callback instead of keeping in memory
               if(shouldPersist())
               {
  @@ -617,10 +599,7 @@
                     log.debug("push callback.  Calling client now.");
                  }
                  
  -               Object preprocessCallbackListener = null;
  -               Object postprocessCallbackListener = null;
  -               Object callbackId = null;
  -               
  +               boolean handleAcknowledgement = false;
                  if(callback != null)
                  {
                     Map returnPayload = callback.getReturnPayload();
  @@ -630,10 +609,12 @@
                     }
                     else
                     {
  -                     preprocessCallbackListener = returnPayload.remove(CALLBACK_PREPROCESS_LISTENER);
  -                     postprocessCallbackListener = returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
  -                     callbackId = returnPayload.get(CALLBACK_ID);
  +                     Object o = returnPayload.get(REMOTING_ACKNOWLEDGES_PUSH_CALLBACKS);
  +                     if (o instanceof String  && Boolean.parseBoolean((String)o) ||
  +                         o instanceof Boolean && (Boolean) o)
  +                        handleAcknowledgement = true;
                     }
  +
                     returnPayload.put(Callback.SERVER_LOCATOR_KEY, serverLocator);
                     callback.setReturnPayload(returnPayload);
                  }
  @@ -645,9 +626,7 @@
                  callBackClient.setSessionId(sessionId);
                  callBackClient.invoke(internalInvocation, callback.getRequestPayload());
                  
  -               handlePushCallbackAcknowledgements(callbackId,
  -                                                  preprocessCallbackListener,
  -                                                  postprocessCallbackListener);
  +               handlePushCallbackAcknowledgement(callbackId, handleAcknowledgement);
               }
               catch(Throwable ex)
               {
  @@ -707,136 +686,105 @@
         }
      }
      
  -   private void checkForPullCallbackListeners(Callback callback)
  +   Object checkForCallbackListener(Callback callback)
      {
         Map returnPayload = callback.getReturnPayload();
         if (returnPayload == null)
  -         return;
  -      
  -      Object preprocessObject = returnPayload.remove(CALLBACK_PREPROCESS_LISTENER);
  -      Object postprocessObject = returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
  +         return null;
         
  -      if (preprocessObject == null && postprocessObject == null)
  -         return;
  +      Object listenerObject = returnPayload.remove(CALLBACK_LISTENER);
  +      if (listenerObject == null)
  +         return null;
         
         Object callbackId = returnPayload.get(CALLBACK_ID);
         if (callbackId == null)
         {
  -         log.error("CALLBACK_ID is null");
  -         if (preprocessObject != null)
  -            log.error("Unable to contact callback preprocess listener");
  -         if (postprocessObject != null)
  -            log.error("Unable to contact callback postprocess listener");
  -         return;
  +         log.error("CALLBACK_ID is null: unable to acknowledge callback");
  +         return null;
         }
         
  -      if (preprocessObject != null)
  +      if (listenerObject instanceof CallbackListener)
         {
  -         if (preprocessObject instanceof CallbackListener)
  +         Map metadata = invocation.getRequestPayload();
  +         if(metadata != null)
  +         {
  +            String listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
  +            if(listenerId != null)
            {
  -            idToPreprocessListenerMap.put(callbackId, preprocessObject);
  +               returnPayload.put(Client.LISTENER_ID_KEY, listenerId);
  +               idToListenerMap.put(callbackId, listenerObject);
  +               return callbackId;
            }
            else
            {
  -            log.error("callback preprocess listener has wrong type: " + preprocessObject);
  +               log.error("LISTENER_ID_KEY is null: unable to acknowledge callback");
  +               return null;
            }
         }
  -      
  -      if (postprocessObject != null)
  -      {
  -         if (postprocessObject instanceof CallbackListener)
  +         else
            {
  -            idToPostprocessListenerMap.put(callbackId, postprocessObject);
  +            log.error("LISTENER_ID_KEY is null: unable to acknowledge callback");
  +            return null;
  +         }
            }
            else
            {
  -            log.error("callback postprocess listener has wrong type: " + postprocessObject);
  -         }
  +         log.error("callback preprocess listener has wrong type: " + listenerObject);
  +         return null;
         }
      }
      
  -   private void handlePushCallbackAcknowledgements(Object callbackId,
  -                                                   Object preprocessObject,
  -                                                   Object postprocessObject)
  +   private void handlePushCallbackAcknowledgement(Object callbackId, boolean handleAck)
      {
  -      if (preprocessObject == null && postprocessObject == null)
  +      if (!handleAck)
            return;
         
         if (callbackId == null)
         {
  -         log.error("CALLBACK_ID is null");
  -         if (preprocessObject != null)
  -            log.error("Unable to contact callback preprocess listener");
  -         if (postprocessObject != null)
  -            log.error("Unable to contact callback postprocess listener");
  +         log.error("Unable to acknowledge push callback: callback id is null");
            return;
         }
         
  -      if (preprocessObject != null)
  -      {
  -         if (preprocessObject instanceof CallbackListener)
  -         {
  -            CallbackListener listener = (CallbackListener) preprocessObject;
  -            listener.callbackSent(callbackId, CALLBACK_ACK_PREPROCESS);
  -         }
  -         else
  +      CallbackListener listener = (CallbackListener) idToListenerMap.get(callbackId);
  +      if (listener == null)
            {
  -            log.error("callback preprocess listener has wrong type: " + preprocessObject);
  -         }
  +         log.error("Unable to acknowledge push callback: listener is null");
  +         return;
         }
         
  -      if (postprocessObject != null)
  -      {
  -         if (postprocessObject instanceof CallbackListener)
  -         {
  -            CallbackListener listener = (CallbackListener) postprocessObject;
  -            listener.callbackSent(callbackId, CALLBACK_ACK_POSTPROCESS);
  -         }
  -         else
  -         {
  -            log.error("callback postprocess listener has wrong type: " + postprocessObject);
  -         }
  -      }
  -   }
  -   
  -   private void notifyPreprocessListeners(List callbacks)
  -   {
  -      Iterator it = callbacks.iterator();
  -      while (it.hasNext())
  -      {
  -         Callback callback = (Callback) it.next();
  -         Map returnPayload = callback.getReturnPayload();
  -         if (returnPayload != null)
  -         {
  -            Object callbackId = returnPayload.get(CALLBACK_ID);
  -            if (callbackId == null)
  -               continue;
  -            
  -            CallbackListener listener = (CallbackListener) idToPreprocessListenerMap.remove(callbackId);
  -            if (listener == null)
  -               continue;
  -            
  -            listener.callbackSent(callbackId, CALLBACK_ACK_PREPROCESS);
  -         }
  -      }
  +      listener.acknowledgeCallback(this, callbackId, null);
      }
      
      /**
  -    * Calls a postprocess listener
  -    * @param callbackId identity of Callback to acknowledge
  +    * Calls listeners to acknowledge callbacks
  +    * @param invocation carries identities of Callbacks to acknowledge and,
  +    *                   optionally, responses 
       */
      public void acknowledgeCallbacks(InternalInvocation invocation) throws Exception 
      {
  -      Object[] callbackIds = invocation.getParameters();
  -      if (callbackIds == null)
  -      {
  +      Object[] params = invocation.getParameters();
  +      if (params == null)
            return;
  -      }
         
  -      for (int i = 0; i < callbackIds.length; i++)
  +      List callbackIds = (List) params[0];
  +      List responses = (List) params[1];
  +      if (callbackIds == null || callbackIds.size() == 0)
  +         return;
  +      
  +      Iterator idsIterator = callbackIds.iterator();
  +      Iterator responseIterator = null;
  +      if (responses != null)
  +         responseIterator = responses.iterator();
  +      
  +      Object callbackId = null;
  +      Object response = null;
  +      while(idsIterator.hasNext())
         {
  -         Object callbackId = callbackIds[i];
  -         CallbackListener listener = (CallbackListener) idToPostprocessListenerMap.remove(callbackId);
  +         callbackId = idsIterator.next();
  +         if (responseIterator != null)
  +            response = responseIterator.next();
  +         
  +         CallbackListener listener = (CallbackListener) idToListenerMap.remove(callbackId);
            
            if (listener == null)
            {
  @@ -844,7 +792,7 @@
               continue;
            }
            
  -         listener.callbackSent(callbackId, CALLBACK_ACK_POSTPROCESS);
  +         listener.acknowledgeCallback(this, callbackId, response);
         }
      }
   
  
  
  



More information about the jboss-cvs-commits mailing list