[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...

Ron Sigal ron_sigal at yahoo.com
Sat Oct 28 15:59:34 EDT 2006


  User: rsigal  
  Date: 06/10/28 15:59:34

  Modified:    src/main/org/jboss/remoting/callback 
                        ServerInvokerCallbackHandler.java
  Log:
  JBREM-605:  (1) Changed acknowledgeCallback() to acknowledgeCallbacks(). (2) Require application to supply callback ID.  (3) Reorganized callback acknowledgement processing. 
  
  Revision  Changes    Path
  1.15      +148 -53   JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerInvokerCallbackHandler.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -b -r1.14 -r1.15
  --- ServerInvokerCallbackHandler.java	11 Oct 2006 22:39:53 -0000	1.14
  +++ ServerInvokerCallbackHandler.java	28 Oct 2006 19:59:34 -0000	1.15
  @@ -33,7 +33,6 @@
   import org.jboss.remoting.security.SSLSocketBuilder;
   import org.jboss.remoting.security.SSLSocketBuilderMBean;
   import org.jboss.remoting.security.SSLSocketFactoryService;
  -import org.jboss.util.id.GUID;
   
   import javax.management.MBeanServer;
   import javax.management.MBeanServerInvocationHandler;
  @@ -110,6 +109,16 @@
      public static final String CALLBACK_ID = "callbackId";
      
      /**
  +    * Passed to CallbackListner to indicate preprocess acknowledgement
  +    */
  +   public static final int CALLBACK_ACK_PREPROCESS = 1;
  +   
  +   /**
  +    * Passed to CallbackListner to indicate postprocess acknowledgement
  +    */
  +   public static final int CALLBACK_ACK_POSTPROCESS = 2;
  +   
  +   /**
       * The percentage number of used memory before should persist messages.
       * For example, if 64MB available and only 30MB free mem and memPercentCeiling
       * is 50, then would trigger persisting of messages.
  @@ -117,14 +126,14 @@
      private double memPercentCeiling = 20; // 20% by default
      
      /**
  -    * Maps a GUID to a Callback waiting to be acknowledged.
  +    * Maps an ID to a preprocess CallbackListener for a Callback waiting to be acknowledged.
       */
  -   private Map idToCallbackMap = Collections.synchronizedMap(new HashMap());
  +   private Map idToPreprocessListenerMap = Collections.synchronizedMap(new HashMap());
   
      /**
  -    * Maps a GUID to a CallbackListener for a Callback waiting to be acknowledged.
  +    * Maps an ID to a postprocess CallbackListener for a Callback waiting to be acknowledged.
       */
  -   private Map idToListenerMap = Collections.synchronizedMap(new HashMap());
  +   private Map idToPostprocessListenerMap = Collections.synchronizedMap(new HashMap());
      
      private static final Logger log = Logger.getLogger(ServerInvokerCallbackHandler.class);
   
  @@ -544,24 +553,6 @@
         return callbacks;
      }
   
  -   private void notifyPreprocessListeners(List callbacks)
  -   {
  -      Iterator it = callbacks.iterator();
  -      while (it.hasNext())
  -      {
  -         Callback callback = (Callback) it.next();
  -         Map returnPayload = callback.getReturnPayload();
  -         if (returnPayload != null)
  -         {
  -            CallbackListener listener = (CallbackListener) returnPayload.remove(CALLBACK_PREPROCESS_LISTENER);
  -            if (listener != null)
  -            {
  -               listener.callbackSent(callback);
  -            }
  -         }
  -      }
  -   }
  -   
      public boolean isPullCallbackHandler()
      {
         return (callBackClient == null);
  @@ -581,7 +572,7 @@
         {
            if(callBackClient == null)
            {
  -            checkForPullPostprocessListener(callback);
  +            checkForPullCallbackListeners(callback);
    
               // need to check if shoudl persist callback instead of keeping in memory
               if(shouldPersist())
  @@ -626,7 +617,9 @@
                     log.debug("push callback.  Calling client now.");
                  }
                  
  -               CallbackListener callbackListener = null;
  +               Object preprocessCallbackListener = null;
  +               Object postprocessCallbackListener = null;
  +               Object callbackId = null;
                  
                  if(callback != null)
                  {
  @@ -637,7 +630,9 @@
                     }
                     else
                     {
  -                     callbackListener = (CallbackListener) returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
  +                     preprocessCallbackListener = returnPayload.remove(CALLBACK_PREPROCESS_LISTENER);
  +                     postprocessCallbackListener = returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
  +                     callbackId = returnPayload.get(CALLBACK_ID);
                     }
                     returnPayload.put(Callback.SERVER_LOCATOR_KEY, serverLocator);
                     callback.setReturnPayload(returnPayload);
  @@ -648,12 +643,11 @@
                  InternalInvocation internalInvocation = new InternalInvocation(InternalInvocation.HANDLECALLBACK,
                                                                                 new Object[]{callback});
                  callBackClient.setSessionId(sessionId);
  -               callBackClient.invoke(internalInvocation,
  -                                     callback.getRequestPayload());
  -               
  -               if (callbackListener != null)
  -                  callbackListener.callbackSent(callback);
  +               callBackClient.invoke(internalInvocation, callback.getRequestPayload());
     
  +               handlePushCallbackAcknowledgements(callbackId,
  +                                                  preprocessCallbackListener,
  +                                                  postprocessCallbackListener);
               }
               catch(Throwable ex)
               {
  @@ -713,18 +707,116 @@
         }
      }
      
  -   private void checkForPullPostprocessListener(Callback callback)
  +   private void checkForPullCallbackListeners(Callback callback)
      {
         Map returnPayload = callback.getReturnPayload();
  -      if (returnPayload != null)
  +      if (returnPayload == null)
  +         return;
  +      
  +      Object preprocessObject = returnPayload.remove(CALLBACK_PREPROCESS_LISTENER);
  +      Object postprocessObject = returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
  +      
  +      if (preprocessObject == null && postprocessObject == null)
  +         return;
  +      
  +      Object callbackId = returnPayload.get(CALLBACK_ID);
  +      if (callbackId == null)
  +      {
  +         log.error("CALLBACK_ID is null");
  +         if (preprocessObject != null)
  +            log.error("Unable to contact callback preprocess listener");
  +         if (postprocessObject != null)
  +            log.error("Unable to contact callback postprocess listener");
  +         return;
  +      }
  +      
  +      if (preprocessObject != null)
  +      {
  +         if (preprocessObject instanceof CallbackListener)
         {
  -         CallbackListener listener = (CallbackListener) returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
  -         if (listener != null)
  +            idToPreprocessListenerMap.put(callbackId, preprocessObject);
  +         }
  +         else
            {
  -            Object guid = new GUID();
  -            idToCallbackMap.put(guid, callback);
  -            idToListenerMap.put(guid, listener);
  -            returnPayload.put(CALLBACK_ID, guid);
  +            log.error("callback preprocess listener has wrong type: " + preprocessObject);
  +         }
  +      }
  +      
  +      if (postprocessObject != null)
  +      {
  +         if (postprocessObject instanceof CallbackListener)
  +         {
  +            idToPostprocessListenerMap.put(callbackId, postprocessObject);
  +         }
  +         else
  +         {
  +            log.error("callback postprocess listener has wrong type: " + postprocessObject);
  +         }
  +      }
  +   }
  +   
  +   private void handlePushCallbackAcknowledgements(Object callbackId,
  +                                                   Object preprocessObject,
  +                                                   Object postprocessObject)
  +   {
  +      if (preprocessObject == null && postprocessObject == null)
  +         return;
  +      
  +      if (callbackId == null)
  +      {
  +         log.error("CALLBACK_ID is null");
  +         if (preprocessObject != null)
  +            log.error("Unable to contact callback preprocess listener");
  +         if (postprocessObject != null)
  +            log.error("Unable to contact callback postprocess listener");
  +         return;
  +      }
  +      
  +      if (preprocessObject != null)
  +      {
  +         if (preprocessObject instanceof CallbackListener)
  +         {
  +            CallbackListener listener = (CallbackListener) preprocessObject;
  +            listener.callbackSent(callbackId, CALLBACK_ACK_PREPROCESS);
  +         }
  +         else
  +         {
  +            log.error("callback preprocess listener has wrong type: " + preprocessObject);
  +         }
  +      }
  +      
  +      if (postprocessObject != null)
  +      {
  +         if (postprocessObject instanceof CallbackListener)
  +         {
  +            CallbackListener listener = (CallbackListener) postprocessObject;
  +            listener.callbackSent(callbackId, CALLBACK_ACK_POSTPROCESS);
  +         }
  +         else
  +         {
  +            log.error("callback postprocess listener has wrong type: " + postprocessObject);
  +         }
  +      }
  +   }
  +   
  +   private void notifyPreprocessListeners(List callbacks)
  +   {
  +      Iterator it = callbacks.iterator();
  +      while (it.hasNext())
  +      {
  +         Callback callback = (Callback) it.next();
  +         Map returnPayload = callback.getReturnPayload();
  +         if (returnPayload != null)
  +         {
  +            Object callbackId = returnPayload.get(CALLBACK_ID);
  +            if (callbackId == null)
  +               continue;
  +            
  +            CallbackListener listener = (CallbackListener) idToPreprocessListenerMap.remove(callbackId);
  +            if (listener == null)
  +               continue;
  +            
  +            listener.callbackSent(callbackId, CALLBACK_ACK_PREPROCESS);
            }
         }
      }
  @@ -733,24 +825,27 @@
       * Calls a postprocess listener
       * @param callbackId identity of Callback to acknowledge
       */
  -   public void acknowledgeCallback(InternalInvocation invocation) throws Exception 
  +   public void acknowledgeCallbacks(InternalInvocation invocation) throws Exception 
      {
  -      Object callbackId = invocation.getParameters()[0];
  -      
  -      // sanity check
  -      if (!(callbackId instanceof GUID))
  +      Object[] callbackIds = invocation.getParameters();
  +      if (callbackIds == null)
         {
  -         log.error("received callbackId with invalid type: " + callbackId.getClass());
  -         throw new Exception("Cannot acknowledge: invalid callbackId type: " + callbackId.getClass());
  +         return;
         }
         
  -      Callback callback = (Callback) idToCallbackMap.get(callbackId);
  -      CallbackListener listener = (CallbackListener) idToListenerMap.get(callbackId);
  +      for (int i = 0; i < callbackIds.length; i++)
  +      {
  +         Object callbackId = callbackIds[i];
  +         CallbackListener listener = (CallbackListener) idToPostprocessListenerMap.remove(callbackId);
         
  -      if (callback == null || listener == null)
  -         throw new Exception("Cannot acknowledge Callback: unrecognized id: " + callbackId);
  +         if (listener == null)
  +         {
  +            log.warn("Cannot acknowledge callback: unrecognized id: " + callbackId);
  +            continue;
  +         }
            
  -      listener.callbackSent(callback);   
  +         listener.callbackSent(callbackId, CALLBACK_ACK_POSTPROCESS);
  +      }
      }
   
      /**
  
  
  



More information about the jboss-cvs-commits mailing list