[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...

Ron Sigal ron_sigal at yahoo.com
Wed Aug 22 23:04:20 EDT 2007


  User: rsigal  
  Date: 07/08/22 23:04:20

  Modified:    src/main/org/jboss/remoting/callback  Tag: remoting_2_2_0_GA
                        CallbackPoller.java
  Log:
  JBREM-641, JBREM-756:  Merged changes from branch remoting_2_2_2_experimental.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.5.2.6.4.1 +315 -75   JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: CallbackPoller.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java,v
  retrieving revision 1.5.2.6
  retrieving revision 1.5.2.6.4.1
  diff -u -b -r1.5.2.6 -r1.5.2.6.4.1
  --- CallbackPoller.java	16 Feb 2007 04:19:01 -0000	1.5.2.6
  +++ CallbackPoller.java	23 Aug 2007 03:04:20 -0000	1.5.2.6.4.1
  @@ -22,6 +22,7 @@
   package org.jboss.remoting.callback;
   
   import java.util.ArrayList;
  +import java.util.HashMap;
   import java.util.Iterator;
   import java.util.List;
   import java.util.Map;
  @@ -30,6 +31,7 @@
   
   import org.jboss.logging.Logger;
   import org.jboss.remoting.Client;
  +import org.jboss.remoting.ServerInvoker;
   
   /**
    * CallbackPoller is used to simulate push callbacks on transports that don't support
  @@ -39,7 +41,7 @@
    * @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
    * @author <a href="mailto:ron.sigal at jboss.com">Ron Sigal</a>
    */
  -public class CallbackPoller extends TimerTask
  +public class CallbackPoller extends TimerTask implements Runnable
   {
      /*
       * Implementation note.
  @@ -64,20 +66,43 @@
      public static final long DEFAULT_POLL_PERIOD = 5000;
   
      /**
  +    * Default timeout for getting callbacks in blocking mode.
  +    * Default is 5000 milliseconds.
  +    */
  +   public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
  +   
  +   /**
  +    * Default number of exceptions before callback polling wil be shut down.
  +    * Default is 5.
  +    */
  +   public static final int DEFAULT_MAX_ERROR_COUNT = 5;
  +   
  +   /**
  +    * The key value to use to specify if stop() should wait for the call to
  +    * org.jboss.remoting.Client.getCallbacks() should return.  The default 
  +    * behavior is do a synchronized shutdown for nonblocking callbacks and
  +    * a nonsynchronized shutdown for blocking callbacks.
  +    */
  +   public static final String SYNCHRONIZED_SHUTDOWN = "doSynchronizedShutdown";
  +   
  +   /**
       * The key value to use to specify the desired poll period
       * within the metadata Map.
       */
      public static final String CALLBACK_POLL_PERIOD = "callbackPollPeriod";
   
  -   /** The key value to use in metadata Map to specify the desired scheduling mode. */
  -   public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
  -
      /** Use java.util.timer.schedule(). */
      public static final String SCHEDULE_FIXED_RATE = "scheduleFixedRate";
   
      /** Use java.util.timer.scheduleAtFixedRate(). */
      public static final String SCHEDULE_FIXED_DELAY = "scheduleFixedDelay";
   
  +   /**
  +    * The key to use to specify the number of errors before callback polling
  +    * will be shut down.
  +    */
  +   public static final String MAX_ERROR_COUNT = "maxErrorCount";
  +
      /** The key to use in metadata Map to request statistics.  The associated
       *  is ignored. */
      public static final String REPORT_STATISTICS = "reportStatistics";
  @@ -86,15 +111,22 @@
      private InvokerCallbackHandler callbackHandler = null;
      private Map metadata = null;
      private Object callbackHandlerObject = null;
  +   private boolean blocking = false;
  +   private boolean synchronizedShutdown = false;
      private long pollPeriod = DEFAULT_POLL_PERIOD;
      private Timer timer;
      private String scheduleMode = SCHEDULE_FIXED_RATE;
      private boolean reportStatistics;
  +   private boolean running;
  +   private int maxErrorCount = -1;
  +   private int errorCount;
  +   
   
      private ArrayList toHandleList = new ArrayList();
      private ArrayList toAcknowledgeList = new ArrayList();
      private HandleThread handleThread;
      private AcknowledgeThread acknowledgeThread;
  +   private BlockingPollerThread blockingPollerThread;
   
      private static final Logger log = Logger.getLogger(CallbackPoller.class);
   
  @@ -103,7 +135,7 @@
      {
         this.client = client;
         this.callbackHandler = callbackhandler;
  -      this.metadata = metadata;
  +      this.metadata = new HashMap(metadata);
         this.callbackHandlerObject = callbackHandlerObject;
      }
   
  @@ -122,72 +154,39 @@
            throw new NullPointerException("Can not poll for callbacks when Client is null.");
         }
   
  -      if (metadata != null)
  -      {
  -         Object val = metadata.get(CALLBACK_POLL_PERIOD);
  -         if (val != null)
  -         {
  -            if (val instanceof String)
  -            {
  -               try
  -               {
  -                  pollPeriod = Long.parseLong((String) val);
  -               }
  -               catch (NumberFormatException e)
  -               {
  -                  log.warn("Error converting " + CALLBACK_POLL_PERIOD + " to type long.  " + e.getMessage());
  -               }
  -            }
  -            else
  -            {
  -               log.warn("Value for " + CALLBACK_POLL_PERIOD + " configuration must be of type " + String.class.getName() +
  -                        " and is " + val.getClass().getName());
  -            }
  -         }
  -         val = metadata.get(CALLBACK_SCHEDULE_MODE);
  -         if (val != null)
  -         {
  -            if (val instanceof String)
  -            {
  -               if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
  -               {
  -                  scheduleMode = (String) val;
  -               }
  -               else
  -               {
  -                  log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
  -                  log.warn("Using " + scheduleMode);
  -               }
  -            }
  -            else
  -            {
  -               log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
  -                     " and is " + val.getClass().getName());
  -            }
  -         }
  -         if (metadata.get(REPORT_STATISTICS) != null)
  -         {
  -            reportStatistics = true;
  -         }
  -      }
  +      configureParameters();
   
         handleThread = new HandleThread("HandleThread");
         handleThread.start();
  +      if (log.isTraceEnabled()) log.trace("blocking: " + blocking);
  +      if (blocking)
  +      {
  +         if (maxErrorCount == -1)
  +            maxErrorCount = DEFAULT_MAX_ERROR_COUNT;
   
  +         running = true;
  +         metadata.put(Client.THROW_CALLBACK_EXCEPTION, "true");
  +         blockingPollerThread = new BlockingPollerThread();
  +         blockingPollerThread.start();
  +      }
  +      else
  +      {
         timer = new Timer(true);
  -
         if (SCHEDULE_FIXED_DELAY.equals(scheduleMode))
            timer.schedule(this, pollPeriod, pollPeriod);
         else
            timer.scheduleAtFixedRate(this, pollPeriod, pollPeriod);
      }
  +   }
   
      public synchronized void run()
      {
         // need to pull callbacks from server and give them to callback handler
         try
         {
  -         List callbacks = client.getCallbacks(callbackHandler);
  +         if (log.isTraceEnabled()) log.trace(this + " getting callbacks for " + callbackHandler);
  +         List callbacks = client.getCallbacks(callbackHandler, metadata);
  +         if (log.isTraceEnabled()) log.trace(this + " callback count: " + (callbacks == null ? 0 : callbacks.size()));
   
            if (callbacks != null && callbacks.size() > 0)
            {
  @@ -204,24 +203,95 @@
         }
         catch (Throwable throwable)
         {
  -         log.error("Error getting callbacks from server.", throwable);
  +    	 if (!running)
  +         {
  +            stop();
  +            return;
  +         }
  +         
  +         log.error(this + " Error getting callbacks from server.", throwable);
  +         String errorMessage = throwable.getMessage();
  +         if (errorMessage != null)
  +         {
  +            if (errorMessage.startsWith("Could not find listener id"))
  +            {
  +               log.error("Client no longer has InvokerCallbackHandler (" + 
  +                          callbackHandler +
  +                         ") registered.  Shutting down callback polling");
  +               stop();
  +               return;
  +            }
  +            if (errorMessage.startsWith("Can not make remoting client invocation " +
  +                                        "due to not being connected to server."))
  +            {
  +               log.error("Client no longer connected.  Shutting down callback polling");
  +               stop();
  +               return;
         }
      }
  +         if (maxErrorCount >= 0)
  +         {
  +            if (++errorCount > maxErrorCount)
  +            {
  +               log.error("Error limit of " + maxErrorCount + 
  +                          " exceeded.  Shutting down callback polling");
  +               stop();
  +               return;
  +            }
  +         }
  +      }
  +   }
  +   
  +   public void stop()
  +   {
  +      stop(-1);
  +   }
   
      /**
       * stop() will not return until all received callbacks have been processed
       * by the CallbackHandler and acknowledgements have been sent for all callbacks for
       * which acknowledgements have been requested.
       */
  -   public synchronized void stop()
  +   public void stop(int timeout)
      {
         log.debug(this + " is shutting down");
  +      running = false;
  +      
  +      if (!blocking)
  +      {
  +         cancel();
  +         
  +         if (timer != null)
  +         {
  +            timer.cancel();
  +            timer = null;
  +         }
  +      }
         
  +      if (timeout == 0)
  +         return;
  +      
  +      if (synchronizedShutdown)
  +      {
         // run() and stop() are synchronized so that stop() will wait until run() has finished
         // adding any callbacks it has received to toHandleList.  Therefore, once cancel()
         // returns, no more callbacks will arrive from the server.
  -      cancel();
  +         synchronized (this)
  +         {
  +            shutdown();
  +         }
  +      }
  +      else
  +      {
  +         shutdown();
  +      }
   
  +      log.debug(this + " has shut down");
  +   }
  +
  +   
  +   private void shutdown()
  +   {
         // HandleThread.shutdown() will not return until all received callbacks have been
         // processed and, if necessary, added to toAcknowledgeList.
         if (handleThread != null)
  @@ -237,15 +307,35 @@
            acknowledgeThread.shutdown();
            acknowledgeThread = null;
         }
  +   }
   
  -      if (timer != null)
  +   
  +   class BlockingPollerThread extends Thread
         {
  -         timer.cancel();
  -         timer = null;
  +      public BlockingPollerThread()
  +      {
  +         String threadName = getName();
  +         int i = threadName.indexOf('-');
  +         String threadNumber = null;
  +         if (i >= 0)
  +            threadNumber = threadName.substring(i+1);
  +         else
  +            threadNumber = Long.toString(System.currentTimeMillis());
  +         String pollerString = CallbackPoller.this.toString();
  +         String address = pollerString.substring(pollerString.indexOf('@'));
  +         setName("CallbackPoller:" + threadNumber + "[" + address + "]");
  +         setDaemon(true);
         }
         
  -      log.debug(this + " has shut down");
  +      public void run()
  +      {
  +         while (running)
  +         {
  +            CallbackPoller.this.run();
  +         }
      }
  +   }  
  +
   
      class HandleThread extends Thread
      {
  @@ -340,10 +430,10 @@
                     toHandleList.wait();
                  }
                  catch (InterruptedException ignored) {}
  -               return;
               }
            }
            log.debug(this + " has shut down");
  +         return;
         }
      }
   
  @@ -447,10 +537,10 @@
                     toAcknowledgeList.wait();
                  }
                  catch (InterruptedException ignored) {}
  -               return;
               }
            }
            log.debug(this + " has shut down");
  +         return;
         }
      }
   
  @@ -489,6 +579,150 @@
      }
   
   
  +   private void configureParameters()
  +   {
  +      if (metadata != null)
  +      {
  +         Object val = metadata.get(ServerInvoker.BLOCKING_MODE);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               if (ServerInvoker.BLOCKING.equals(val))
  +               {
  +                  blocking = true;
  +                  synchronizedShutdown = false;
  +               }
  +               else if (ServerInvoker.NONBLOCKING.equals(val))
  +               {
  +                  blocking = false;
  +                  synchronizedShutdown = true;
  +               }
  +               else
  +               {
  +                  log.warn("Value for " + ServerInvoker.BLOCKING_MODE + 
  +                           " configuration is " + val + ". Must be either " +
  +                           ServerInvoker.BLOCKING + " or " + ServerInvoker.NONBLOCKING +
  +                           ". Using " + ServerInvoker.BLOCKING + ".");
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + ServerInvoker.BLOCKING_MODE + 
  +                     " configuration must be of type " + String.class.getName() +
  +                        " and is of type " + val.getClass().getName());
  +            }
  +         }
  +         
  +         // Default blocking mode on server is nonblocking.
  +         if (blocking)
  +            metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
  +         
  +         val = metadata.get(ServerInvoker.BLOCKING_TIMEOUT);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               try
  +               {
  +                  int blockingTimeout = Integer.parseInt((String) val);
  +                  metadata.put(ServerInvoker.TIMEOUT, Integer.toString(blockingTimeout));
  +               }
  +               catch (NumberFormatException e)
  +               {
  +                  log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long.  " + e.getMessage());
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
  +                        " and is " + val.getClass().getName());
  +            }
  +         }
  +         
  +         val = metadata.get(SYNCHRONIZED_SHUTDOWN);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               synchronizedShutdown = Boolean.valueOf((String) val).booleanValue();
  +            }
  +            else
  +            {
  +               log.warn("Value for " + SYNCHRONIZED_SHUTDOWN + " must be of type " + String.class.getName() +
  +                     " and is " + val.getClass().getName());
  +            }
  +         }
  +         
  +         val = metadata.get(CALLBACK_POLL_PERIOD);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               try
  +               {
  +                  pollPeriod = Long.parseLong((String) val);
  +               }
  +               catch (NumberFormatException e)
  +               {
  +                  log.warn("Error converting " + CALLBACK_POLL_PERIOD + " to type long.  " + e.getMessage());
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + CALLBACK_POLL_PERIOD + " configuration must be of type " + String.class.getName() +
  +                        " and is " + val.getClass().getName());
  +            }
  +         }
  +         val = metadata.get(CALLBACK_SCHEDULE_MODE);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
  +               {
  +                  scheduleMode = (String) val;
  +               }
  +               else
  +               {
  +                  log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
  +                  log.warn("Using " + scheduleMode);
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
  +                     " and is " + val.getClass().getName());
  +            }
  +         }
  +         val = metadata.get(MAX_ERROR_COUNT);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               try
  +               {
  +                  maxErrorCount = Integer.parseInt((String) val);
  +               }
  +               catch (NumberFormatException e)
  +               {
  +                  log.warn("Error converting " + MAX_ERROR_COUNT + " to type int.  " + e.getMessage());
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + MAX_ERROR_COUNT + " configuration must be of type " + String.class.getName() +
  +                        " and is " + val.getClass().getName());
  +            }
  +         }
  +         if (metadata.get(REPORT_STATISTICS) != null)
  +         {
  +            reportStatistics = true;
  +         }
  +      }
  +   }
  +
  +
      private void reportStatistics(List callbacks)
      {
         int toHandle;
  @@ -513,4 +747,10 @@
                .append("================================");
         log.info(message);
      }
  +
  +
  +   /**
  +    * The key value to use in metadata Map to specify the desired scheduling mode. 
  +    */
  +   public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
   }
  \ No newline at end of file
  
  
  



More information about the jboss-cvs-commits mailing list