[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...

Ron Sigal ron_sigal at yahoo.com
Sat May 5 03:40:41 EDT 2007


  User: rsigal  
  Date: 07/05/05 03:40:41

  Modified:    src/main/org/jboss/remoting/callback  Tag: remoting_2_x
                        CallbackPoller.java
  Log:
  JBREM-641: Added support for blocking mode for pull callbacks.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.5.2.7   +214 -84   JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: CallbackPoller.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java,v
  retrieving revision 1.5.2.6
  retrieving revision 1.5.2.7
  diff -u -b -r1.5.2.6 -r1.5.2.7
  --- CallbackPoller.java	16 Feb 2007 04:19:01 -0000	1.5.2.6
  +++ CallbackPoller.java	5 May 2007 07:40:41 -0000	1.5.2.7
  @@ -22,6 +22,7 @@
   package org.jboss.remoting.callback;
   
   import java.util.ArrayList;
  +import java.util.HashMap;
   import java.util.Iterator;
   import java.util.List;
   import java.util.Map;
  @@ -30,6 +31,7 @@
   
   import org.jboss.logging.Logger;
   import org.jboss.remoting.Client;
  +import org.jboss.remoting.ServerInvoker;
   
   /**
    * CallbackPoller is used to simulate push callbacks on transports that don't support
  @@ -39,7 +41,7 @@
    * @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
    * @author <a href="mailto:ron.sigal at jboss.com">Ron Sigal</a>
    */
  -public class CallbackPoller extends TimerTask
  +public class CallbackPoller extends TimerTask implements Runnable
   {
      /*
       * Implementation note.
  @@ -64,14 +66,17 @@
      public static final long DEFAULT_POLL_PERIOD = 5000;
   
      /**
  +    * Default timeout for getting callbacks in blocking mode.
  +    * Default is 5000 milliseconds.
  +    */
  +   public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
  +   
  +   /**
       * The key value to use to specify the desired poll period
       * within the metadata Map.
       */
      public static final String CALLBACK_POLL_PERIOD = "callbackPollPeriod";
   
  -   /** The key value to use in metadata Map to specify the desired scheduling mode. */
  -   public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
  -
      /** Use java.util.timer.schedule(). */
      public static final String SCHEDULE_FIXED_RATE = "scheduleFixedRate";
   
  @@ -86,15 +91,18 @@
      private InvokerCallbackHandler callbackHandler = null;
      private Map metadata = null;
      private Object callbackHandlerObject = null;
  +   private boolean blocking = true;
      private long pollPeriod = DEFAULT_POLL_PERIOD;
      private Timer timer;
      private String scheduleMode = SCHEDULE_FIXED_RATE;
      private boolean reportStatistics;
  +   private boolean running;
   
      private ArrayList toHandleList = new ArrayList();
      private ArrayList toAcknowledgeList = new ArrayList();
      private HandleThread handleThread;
      private AcknowledgeThread acknowledgeThread;
  +   private BlockingPollerThread blockingPollerThread;
   
      private static final Logger log = Logger.getLogger(CallbackPoller.class);
   
  @@ -103,7 +111,7 @@
      {
         this.client = client;
         this.callbackHandler = callbackhandler;
  -      this.metadata = metadata;
  +      this.metadata = new HashMap(metadata);
         this.callbackHandlerObject = callbackHandlerObject;
      }
   
  @@ -122,72 +130,36 @@
            throw new NullPointerException("Can not poll for callbacks when Client is null.");
         }
   
  -      if (metadata != null)
  -      {
  -         Object val = metadata.get(CALLBACK_POLL_PERIOD);
  -         if (val != null)
  -         {
  -            if (val instanceof String)
  -            {
  -               try
  -               {
  -                  pollPeriod = Long.parseLong((String) val);
  -               }
  -               catch (NumberFormatException e)
  -               {
  -                  log.warn("Error converting " + CALLBACK_POLL_PERIOD + " to type long.  " + e.getMessage());
  -               }
  -            }
  -            else
  -            {
  -               log.warn("Value for " + CALLBACK_POLL_PERIOD + " configuration must be of type " + String.class.getName() +
  -                        " and is " + val.getClass().getName());
  -            }
  -         }
  -         val = metadata.get(CALLBACK_SCHEDULE_MODE);
  -         if (val != null)
  -         {
  -            if (val instanceof String)
  -            {
  -               if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
  -               {
  -                  scheduleMode = (String) val;
  -               }
  -               else
  +      configureParameters();
  +
  +      handleThread = new HandleThread("HandleThread");
  +      handleThread.start();
  +      log.error("blocking: " + blocking);
  +      if (blocking)
                  {
  -                  log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
  -                  log.warn("Using " + scheduleMode);
  -               }
  +         blockingPollerThread = new BlockingPollerThread();
  +         blockingPollerThread.start();
               }
               else
               {
  -               log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
  -                     " and is " + val.getClass().getName());
  -            }
  -         }
  -         if (metadata.get(REPORT_STATISTICS) != null)
  -         {
  -            reportStatistics = true;
  -         }
  -      }
  -
  -      handleThread = new HandleThread("HandleThread");
  -      handleThread.start();
  -
         timer = new Timer(true);
  -
         if (SCHEDULE_FIXED_DELAY.equals(scheduleMode))
            timer.schedule(this, pollPeriod, pollPeriod);
         else
            timer.scheduleAtFixedRate(this, pollPeriod, pollPeriod);
      }
  +   }
   
      public synchronized void run()
      {
         // need to pull callbacks from server and give them to callback handler
         try
         {
  -         List callbacks = client.getCallbacks(callbackHandler);
  +         if (log.isTraceEnabled()) log.trace(this + " getting callbacks");
  +         log.error("getting callbacks");
  +         List callbacks = client.getCallbacks(callbackHandler, metadata);
  +         if (log.isTraceEnabled()) log.trace("callback count: " + (callbacks == null ? 0 : callbacks.size()));
  +         log.error("callback count: " + (callbacks == null ? 0 : callbacks.size()));
   
            if (callbacks != null && callbacks.size() > 0)
            {
  @@ -208,14 +180,27 @@
         }
      }
   
  +   public void stop()
  +   {
  +      stop(-1);
  +   }
  +
      /**
       * stop() will not return until all received callbacks have been processed
       * by the CallbackHandler and acknowledgements have been sent for all callbacks for
       * which acknowledgements have been requested.
       */
  -   public synchronized void stop()
  +   public void stop(int timeout)
  +   {
  +      log.error(this + " is shutting down");
  +      running = false;
  +      log.error("running: " + running);
  +      
  +      if (timeout == 0)
  +         return;
  +      
  +      synchronized (this)
      {
  -      log.debug(this + " is shutting down");
         
         // run() and stop() are synchronized so that stop() will wait until run() has finished
         // adding any callbacks it has received to toHandleList.  Therefore, once cancel()
  @@ -243,10 +228,41 @@
            timer.cancel();
            timer = null;
         }
  +      }
         
         log.debug(this + " has shut down");
      }
   
  +   
  +   class BlockingPollerThread extends Thread
  +   {
  +      public BlockingPollerThread()
  +      {
  +         String threadName = getName();
  +         int i = threadName.indexOf('-');
  +         String threadNumber = null;
  +         if (i >= 0)
  +            threadNumber = threadName.substring(i+1);
  +         else
  +            threadNumber = Long.toString(System.currentTimeMillis());
  +         setName("CallbackPoller:" + threadNumber);
  +         setDaemon(true);
  +      }
  +
  +      public void run()
  +      {
  +         running = true;
  +         while (running)
  +         {
  +            log.error("running: " + running);
  +            log.error("calling CallbackPoller.this.run()");
  +            CallbackPoller.this.run();
  +            log.error("back from CallbackPoller.this.run()");
  +         }
  +      }
  +   }  
  +
  +   
      class HandleThread extends Thread
      {
         boolean running = true;
  @@ -489,6 +505,114 @@
      }
   
   
  +   private void configureParameters()
  +   {
  +      if (metadata != null)
  +      {
  +         Object val = metadata.get(ServerInvoker.BLOCKING_MODE);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               if (ServerInvoker.BLOCK.equals(val))
  +               {
  +                  blocking = true;
  +               }
  +               else if (ServerInvoker.NONBLOCKING.equals(val))
  +               {
  +                  blocking = false;
  +               }
  +               else
  +               {
  +                  log.warn("Value for " + ServerInvoker.BLOCKING_MODE + 
  +                           " configuration is " + val + ". Must be either " +
  +                           ServerInvoker.BLOCK + " or " + ServerInvoker.NONBLOCKING +
  +                           ". Using " + ServerInvoker.BLOCK + ".");
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + ServerInvoker.BLOCKING_MODE + 
  +                     " configuration must be of type " + String.class.getName() +
  +                        " and is of type " + val.getClass().getName());
  +            }
  +         }
  +         
  +         // Default blocking mode on server is nonblocking.
  +         if (blocking)
  +            metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCK);
  +         
  +         val = metadata.get(ServerInvoker.BLOCKING_TIMEOUT);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               try
  +               {
  +                  int blockingTimeout = Integer.parseInt((String) val);
  +                  metadata.put(ServerInvoker.TIMEOUT, Integer.toString(blockingTimeout));
  +               }
  +               catch (NumberFormatException e)
  +               {
  +                  log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long.  " + e.getMessage());
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
  +                        " and is " + val.getClass().getName());
  +            }
  +         }
  +         
  +         val = metadata.get(CALLBACK_POLL_PERIOD);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               try
  +               {
  +                  pollPeriod = Long.parseLong((String) val);
  +               }
  +               catch (NumberFormatException e)
  +               {
  +                  log.warn("Error converting " + CALLBACK_POLL_PERIOD + " to type long.  " + e.getMessage());
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + CALLBACK_POLL_PERIOD + " configuration must be of type " + String.class.getName() +
  +                        " and is " + val.getClass().getName());
  +            }
  +         }
  +         val = metadata.get(CALLBACK_SCHEDULE_MODE);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
  +               {
  +                  scheduleMode = (String) val;
  +               }
  +               else
  +               {
  +                  log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
  +                  log.warn("Using " + scheduleMode);
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
  +                     " and is " + val.getClass().getName());
  +            }
  +         }
  +         if (metadata.get(REPORT_STATISTICS) != null)
  +         {
  +            reportStatistics = true;
  +         }
  +      }
  +   }
  +
  +
      private void reportStatistics(List callbacks)
      {
         int toHandle;
  @@ -513,4 +637,10 @@
                .append("================================");
         log.info(message);
      }
  +
  +
  +   /**
  +    * The key value to use in metadata Map to specify the desired scheduling mode. 
  +    */
  +   public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
   }
  \ No newline at end of file
  
  
  



More information about the jboss-cvs-commits mailing list