[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...

Ron Sigal ron_sigal at yahoo.com
Wed Nov 15 23:09:36 EST 2006


  User: rsigal  
  Date: 06/11/15 23:09:36

  Modified:    src/main/org/jboss/remoting/callback  CallbackPoller.java
  Log:
  JBREM-605:  CallbackPoller will not shut down until all received callbacks have been processed and, if requested, acknowledged.
  
  Revision  Changes    Path
  1.8       +238 -44   JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: CallbackPoller.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -b -r1.7 -r1.8
  --- CallbackPoller.java	6 Nov 2006 07:19:25 -0000	1.7
  +++ CallbackPoller.java	16 Nov 2006 04:09:36 -0000	1.8
  @@ -22,6 +22,7 @@
   package org.jboss.remoting.callback;
   
   import java.util.ArrayList;
  +import java.util.Iterator;
   import java.util.List;
   import java.util.Map;
   import java.util.Timer;
  @@ -29,13 +30,33 @@
   
   import org.jboss.logging.Logger;
   import org.jboss.remoting.Client;
  -import org.jboss.remoting.util.TimerUtil;
   
   /**
  + * CallbackPoller is used to simulate push callbacks on transports that don't support
  + * bidirectional connections.  It will periodically pull callbacks from the server
  + * and pass them to the InvokerCallbackHandler.
  + * 
    * @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
  + * @author <a href="mailto:ron.sigal at jboss.com">Ron Sigal</a>
    */
   public class CallbackPoller extends TimerTask
   {
  +   /*
  +    * Implementation note.
  +    * 
  +    * CallbackPoller uses two, or possibly three, threads.  The first thread is the 
  +    * Timer thread, which periodically pulls callbacks from the server and adds them
  +    * to toHandleList.  The second thread takes callbacks from toHandleList, passes
  +    * them to the CallbackHandler, and, if an acknowledgement is requested for a
  +    * callback, it adds the callback to toAcknowledgeList.  The third thread, which is
  +    * created in response to the first callback for which an acknowledgement is requested,
  +    * takes the contents of toAcknowledgeList and acknowledges them in a batch.
  +    * 
  +    * CallbackPoller will not shut down until all received callbacks have been processed
  +    * by the CallbackHandler and acknowledgements have been sent for all callbacks for
  +    * which acknowledgements have been requested.
  +    */   
  +   
      /**
       * Default polling period for getting callbacks from the server.
       * Default is 5000 milliseconds.
  @@ -48,12 +69,27 @@
       */
      public static final String CALLBACK_POLL_PERIOD = "callbackPollPeriod";
   
  +   /** The key value to use in metadata Map to specify the desired scheduling mode. */
  +   public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
  +
  +   /** Use java.util.timer.schedule(). */
  +   public static final String SCHEDULE_FIXED_RATE = "scheduleFixedRate";
  +   
  +   /** Use java.util.timer.scheduleAtFixedRate(). */
  +   public static final String SCHEDULE_FIXED_DELAY = "scheduleFixedDelay";
  +   
  +   /** The key to use in metadata Map to request statistics.  The associated
  +    *  is ignored. */
  +   public static final String REPORT_STATISTICS = "reportStatistics";
  +
      private Client client = null;
      private InvokerCallbackHandler callbackHandler = null;
      private Map metadata = null;
      private Object callbackHandlerObject = null;
      private long pollPeriod = DEFAULT_POLL_PERIOD;
      private Timer timer;
  +   private String scheduleMode = SCHEDULE_FIXED_RATE;
  +   private boolean reportStatistics;
      
      private ArrayList toHandleList = new ArrayList();
      private ArrayList toAcknowledgeList = new ArrayList();
  @@ -108,6 +144,31 @@
                           " and is " + val.getClass().getName());
               }
            }
  +         val = metadata.get(CALLBACK_SCHEDULE_MODE);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
  +               {
  +                  scheduleMode = (String) val;
  +               }
  +               else
  +               {
  +                  log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
  +                  log.warn("Using " + scheduleMode);
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
  +                     " and is " + val.getClass().getName());
  +            }
  +         }
  +         if (metadata.get(REPORT_STATISTICS) != null)
  +         {
  +            reportStatistics = true;  
  +         }
         }
         
         handleThread = new HandleThread("HandleThread");
  @@ -115,14 +176,13 @@
         
         timer = new Timer(true);
         
  -      // If CallbackPoller is getting behind, it's probably because it's getting a burst
  -      // of callbacks.  Calling scheduleAtFixedRate() will cause the task to be run
  -      // extra times to accomodate the increase load.  See JBREM-618.
  +      if (SCHEDULE_FIXED_DELAY.equals(scheduleMode))
  +         timer.schedule(this, pollPeriod, pollPeriod);
  +      else
         timer.scheduleAtFixedRate(this, pollPeriod, pollPeriod);
  -
      }
   
  -   public void run()
  +   public synchronized void run()
      {
         // need to pull callbacks from server and give them to callback handler
         try
  @@ -138,6 +198,9 @@
                     toHandleList.notify();
               }
            }
  +         
  +         if (reportStatistics)
  +            reportStatistics(callbacks);
         }
         catch (Throwable throwable)
         {
  @@ -145,21 +208,34 @@
         }
      }
      
  -   public void stop()
  +   /**
  +    * stop() will not return until all received callbacks have been processed
  +    * by the CallbackHandler and acknowledgements have been sent for all callbacks for
  +    * which acknowledgements have been requested.
  +    */
  +   public synchronized void stop()
      {
  +      // run() and stop() are synchronized so that stop() will wait until run() has finished
  +      // adding any callbacks it has received to toHandleList.  Therefore, once cancel()
  +      // returns, no more callbacks will arrive from the server.   
         cancel();
  -      client = null;
  -      callbackHandler = null;
  +      
  +      // HandleThread.shutdown() will not return until all received callbacks have been 
  +      // processed and, if necessary, added to toAcknowledgeList.
         if (handleThread != null)
         {
            handleThread.shutdown();
            handleThread = null;
         }
  +      
  +      // AcknowledgeThread.shutdown() will not return until acknowledgements have been sent
  +      // for all callbacks for which acknowledgements have been requested.
         if (acknowledgeThread != null)
         {
            acknowledgeThread.shutdown();
            acknowledgeThread = null;
         }
  +      
         if (timer != null)
         {
            timer.cancel();
  @@ -170,6 +246,8 @@
      class HandleThread extends Thread
      {
         boolean running = true;
  +      boolean done;
  +      ArrayList toHandleListCopy = new ArrayList();
         Callback callback;
         
         HandleThread(String name)
  @@ -178,11 +256,11 @@
         }
         public void run()
         {
  -         while (running)
  +         while (true)
            {
               synchronized (toHandleList)
               {
  -               if (toHandleList.isEmpty())
  +               if (toHandleList.isEmpty() && running)
                  {
                     try
                     {
  @@ -195,14 +273,24 @@
                     }
                  }
                  
  -               if (!running)
  +               // If toHandleList is empty, then running must be false.  We return
  +               // only when both conditions are true.
  +               if (toHandleList.isEmpty())
  +               {
  +                  done = true;
  +                  toHandleList.notify();
                     return;
  +               }
                  
  -               callback = (Callback) toHandleList.remove(0);
  +               toHandleListCopy.addAll(toHandleList);
  +               toHandleList.clear();
               }
               
  +            while (!toHandleListCopy.isEmpty())
  +            {
               try
               {
  +                  callback = (Callback) toHandleListCopy.remove(0);
                  callback.setCallbackHandleObject(callbackHandlerObject);
                  callbackHandler.handleCallback(callback);
               }
  @@ -214,22 +302,51 @@
               checkForAcknowledgeRequest(callback);
            }
         }
  +      }
         
  -      public void shutdown()
  +      /**
  +       *  Once CallbackPoller.stop() has called HandleThread.shutdown(), CallbackPoller.run()
  +       *  has terminated and no additional callbacks will be received.  shutdown() will
  +       *  not return until HandleThread has processed all received callbacks.
  +       *  
  +       *  Either run() or shutdown() will enter its own synchronized block first.
  +       *  
  +       *  case 1): run() enters its synchronized block first:
  +       *     If toHandleList is empty, then run() will reach toHandleList.wait(), shutdown()
  +       *     will wake up run(), and run() will exit.  If toHandleList is not empty, then run()
  +       *     will process all outstanding callbacks and return to its synchronized block.  At
  +       *     this point, either case 1) (with toHandleList empty) or case 2) applies.
  +       *     
  +       *  case 2): shutdown() enters its synchronized block first:
  +       *     run() will process all outstanding callbacks and return to its synchronized block.
  +       *     After shutdown() reaches toHandleList.wait(), run() will enter its synchronized
  +       *     block, find running == false and toHandleList empty, and it will exit.
  +       */
  +      protected void shutdown()
         {
  -         running = false;
            synchronized (toHandleList)
            {
  +            running = false;
               toHandleList.notify();
  +            while (!done)
  +            {
  +               try
  +               {
  +                  toHandleList.wait();
  +               }
  +               catch (InterruptedException ignored) {}
  +               return;
  +            }
            }
         }
      }
      
  +
      class AcknowledgeThread extends Thread
      {
         boolean running = true;
  -      Callback callback;
  -      ArrayList callbacksCopy = new ArrayList();
  +      boolean done;
  +      ArrayList toAcknowledgeListCopy = new ArrayList();
         
         AcknowledgeThread(String name)
         {
  @@ -237,11 +354,11 @@
         }
         public void run()
         {
  -         while (running)
  +         while (true)
            {
               synchronized (toAcknowledgeList)
               {
  -               if (toAcknowledgeList.isEmpty())
  +               if (toAcknowledgeList.isEmpty() && running)
                  {
                     try
                     {
  @@ -254,17 +371,33 @@
                     }
                  }
                  
  -               if (!running)
  +               // If toAcknowledgeList is empty, then running must be false.  We return
  +               // only when both conditions are true.
  +               if (toAcknowledgeList.isEmpty())
  +               {
  +                  done = true;
  +                  toAcknowledgeList.notify();
                     return;
  +               }
                  
  -               callbacksCopy.addAll(toAcknowledgeList);
  +               toAcknowledgeListCopy.addAll(toAcknowledgeList);
                  toAcknowledgeList.clear();
               }
               
               try
               {
  -               client.acknowledgeCallbacks(callbackHandler, callbacksCopy);
  -               callbacksCopy.clear();
  +               if (log.isTraceEnabled())
  +               {
  +                  Iterator it = toAcknowledgeListCopy.iterator();
  +                  while (it.hasNext())
  +                  {
  +                     Callback cb = (Callback) it.next();
  +                     Map map = cb.getReturnPayload();
  +                     log.trace("acknowledging: " + map.get(ServerInvokerCallbackHandler.CALLBACK_ID));
  +                  }
  +               }
  +               client.acknowledgeCallbacks(callbackHandler, toAcknowledgeListCopy);
  +               toAcknowledgeListCopy.clear();
               }
               catch (Throwable t)
               {
  @@ -273,12 +406,42 @@
            }
         }
         
  +      /**
  +       *  Once CallbackPoller.stop() has called AcknowledgeThread.shutdown(), HandleThread
  +       *  has terminated and no additional callbacks will be added to toAcknowledgeList.
  +       *  shutdown() will not return until AcknowledgeThread has acknowledged all callbacks
  +       *  in toAcknowledgeList.
  +       *  
  +       *  Either run() or shutdown() will enter its own synchronized block first.
  +       *  
  +       *  case 1): run() enters its synchronized block first:
  +       *     If toAcknowledgeList is empty, then run() will reach toAcknowledgeList.wait(),
  +       *     shutdown() will wake up run(), and run() will exit.  If toAcknowledgeList is not
  +       *     empty, then run() will process all callbacks in toAcknowledgeList and return to
  +       *     its synchronized block.  At this point, either case 1) (with toAcknowledgeList
  +       *     empty) or case 2) applies.
  +       *     
  +       *  case 2): shutdown() enters its synchronized block first:
  +       *     run() will process all callbacks in toAcknowledgeList and return to its
  +       *     synchronized block.  After shutdown() reaches toAcknowledgeList.wait(), run()
  +       *     will enter its synchronized block, find running == false and toAcknowledgeList
  +       *     empty, and it will exit.
  +       */
         public void shutdown()
         {
  -         running = false;
            synchronized (toAcknowledgeList)
            {
  +            running = false;
               toAcknowledgeList.notify();
  +            while (!done)
  +            {
  +               try
  +               {
  +                  toAcknowledgeList.wait();
  +               }
  +               catch (InterruptedException ignored) {}
  +               return;
  +            }
            }
         }
      }
  @@ -292,6 +455,10 @@
            Object callbackId = returnPayload.get(ServerInvokerCallbackHandler.CALLBACK_ID);
            if (callbackId != null)
            {
  +            Object o = returnPayload.get(ServerInvokerCallbackHandler.REMOTING_ACKNOWLEDGES_PUSH_CALLBACKS);
  +            if (o instanceof String  && Boolean.parseBoolean((String)o) ||
  +                o instanceof Boolean && (Boolean) o)
  +            {
               synchronized (toAcknowledgeList)
               {
                  toAcknowledgeList.add(callback);
  @@ -311,4 +478,31 @@
            }
         }
      }
  +   }
  +   
  +   
  +   private void reportStatistics(List callbacks)
  +   {
  +      int toHandle;
  +      int toAcknowledge = 0;
  +      
  +      synchronized (toHandleList)
  +      {
  +         toHandle = toHandleList.size() + handleThread.toHandleListCopy.size();
  +      }
  +      
  +      synchronized (toAcknowledgeList)
  +      {
  +         if (acknowledgeThread != null)
  +            toAcknowledge = toAcknowledgeList.size() + acknowledgeThread.toAcknowledgeListCopy.size();
  +      }
  +      
  +      StringBuffer message = new StringBuffer("\n");
  +      message.append("================================\n")
  +             .append("  retrieved " + callbacks.size() + " callbacks\n")
  +             .append("  callbacks waiting to be processed: " + toHandle + "\n")
  +             .append("  callbacks waiting to be acknowledged: " + toAcknowledge + "\n")
  +             .append("================================");
  +      log.info(message);
  +   }
   }
  \ No newline at end of file
  
  
  



More information about the jboss-cvs-commits mailing list