[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...

Ron Sigal ron_sigal at yahoo.com
Tue Jun 19 00:45:37 EDT 2007


  User: rsigal  
  Date: 07/06/19 00:45:37

  Modified:    src/main/org/jboss/remoting/callback  Tag: remoting_2_x
                        CallbackPoller.java
  Log:
  JBREM-641, JBREM-756: (1) Changed default polling mode to nonblocking; (2) added option to do nonsynchronized shutdown; (3) added ability to shut down if errors exceed some maximum.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.5.2.10  +141 -30   JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: CallbackPoller.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java,v
  retrieving revision 1.5.2.9
  retrieving revision 1.5.2.10
  diff -u -b -r1.5.2.9 -r1.5.2.10
  --- CallbackPoller.java	22 May 2007 03:42:29 -0000	1.5.2.9
  +++ CallbackPoller.java	19 Jun 2007 04:45:37 -0000	1.5.2.10
  @@ -72,6 +72,20 @@
      public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
      
      /**
  +    * Default number of exceptions before callback polling wil be shut down.
  +    * Default is 5.
  +    */
  +   public static final int DEFAULT_MAX_ERROR_COUNT = 5;
  +   
  +   /**
  +    * The key value to use to specify if stop() should wait for the call to
  +    * org.jboss.remoting.Client.getCallbacks() should return.  The default 
  +    * behavior is do a synchronized shutdown for nonblocking callbacks and
  +    * a nonsynchronized shutdown for blocking callbacks.
  +    */
  +   public static final String SYNCHRONIZED_SHUTDOWN = "doSynchronizedShutdown";
  +   
  +   /**
       * The key value to use to specify the desired poll period
       * within the metadata Map.
       */
  @@ -83,6 +97,12 @@
      /** Use java.util.timer.scheduleAtFixedRate(). */
      public static final String SCHEDULE_FIXED_DELAY = "scheduleFixedDelay";
   
  +   /**
  +    * The key to use to specify the number of errors before callback polling
  +    * will be shut down.
  +    */
  +   public static final String MAX_ERROR_COUNT = "maxErrorCount";
  +
      /** The key to use in metadata Map to request statistics.  The associated
       *  is ignored. */
      public static final String REPORT_STATISTICS = "reportStatistics";
  @@ -91,12 +111,16 @@
      private InvokerCallbackHandler callbackHandler = null;
      private Map metadata = null;
      private Object callbackHandlerObject = null;
  -   private boolean blocking = true;
  +   private boolean blocking = false;
  +   private boolean synchronizedShutdown = false;
      private long pollPeriod = DEFAULT_POLL_PERIOD;
      private Timer timer;
      private String scheduleMode = SCHEDULE_FIXED_RATE;
      private boolean reportStatistics;
      private boolean running;
  +   private int maxErrorCount = -1;
  +   private int errorCount;
  +   
   
      private ArrayList toHandleList = new ArrayList();
      private ArrayList toAcknowledgeList = new ArrayList();
  @@ -137,6 +161,11 @@
         if (log.isTraceEnabled()) log.trace("blocking: " + blocking);
         if (blocking)
         {
  +         if (maxErrorCount == -1)
  +            maxErrorCount = DEFAULT_MAX_ERROR_COUNT;
  +         
  +         running = true;
  +         metadata.put(Client.THROW_CALLBACK_EXCEPTION, "true");
            blockingPollerThread = new BlockingPollerThread();
            blockingPollerThread.start();
         }
  @@ -155,9 +184,9 @@
         // need to pull callbacks from server and give them to callback handler
         try
         {
  -         if (log.isTraceEnabled()) log.trace(this + " getting callbacks");
  +         if (log.isTraceEnabled()) log.trace(this + " getting callbacks for " + callbackHandler);
            List callbacks = client.getCallbacks(callbackHandler, metadata);
  -         if (log.isTraceEnabled()) log.trace("callback count: " + (callbacks == null ? 0 : callbacks.size()));
  +         if (log.isTraceEnabled()) log.trace(this + " callback count: " + (callbacks == null ? 0 : callbacks.size()));
   
            if (callbacks != null && callbacks.size() > 0)
            {
  @@ -174,7 +203,36 @@
         }
         catch (Throwable throwable)
         {
  -         log.error("Error getting callbacks from server.", throwable);
  +         log.error(this + " Error getting callbacks from server.", throwable);
  +         String errorMessage = throwable.getMessage();
  +         if (errorMessage != null)
  +         {
  +            if (errorMessage.startsWith("Could not find listener id"))
  +            {
  +               log.error("Client no longer has InvokerCallbackHandler (" + 
  +                          callbackHandler +
  +                         ") registered.  Shutting down callback polling");
  +               stop();
  +               return;
  +            }
  +            if (errorMessage.startsWith("Can not make remoting client invocation " +
  +                                        "due to not being connected to server."))
  +            {
  +               log.error("Client no longer connected.  Shutting down callback polling");
  +               stop();
  +               return;
  +            }
  +         }
  +         if (maxErrorCount >= 0)
  +         {
  +            if (++errorCount > maxErrorCount)
  +            {
  +               log.error("Error limit of " + maxErrorCount + 
  +                          " exceeded.  Shutting down callback polling");
  +               stop();
  +               return;
  +            }
  +         }
         }
      }
      
  @@ -193,17 +251,41 @@
         log.debug(this + " is shutting down");
         running = false;
         
  +      if (!blocking)
  +      {
  +         cancel();
  +         
  +         if (timer != null)
  +         {
  +            timer.cancel();
  +            timer = null;
  +         }
  +      }
  +      
         if (timeout == 0)
            return;
         
  -      synchronized (this)
  +      if (synchronizedShutdown)
         {
  -
            // run() and stop() are synchronized so that stop() will wait until run() has finished
            // adding any callbacks it has received to toHandleList.  Therefore, once cancel()
            // returns, no more callbacks will arrive from the server.
  -         cancel();
  +         synchronized (this)
  +         {
  +            shutdown();
  +         }
  +      }
  +      else
  +      {
  +         shutdown();
  +      }
  +
  +      log.debug(this + " has shut down");
  +   }
  +
   
  +   private void shutdown()
  +   {
            // HandleThread.shutdown() will not return until all received callbacks have been
            // processed and, if necessary, added to toAcknowledgeList.
            if (handleThread != null)
  @@ -219,15 +301,6 @@
               acknowledgeThread.shutdown();
               acknowledgeThread = null;
            }
  -
  -         if (timer != null)
  -         {
  -            timer.cancel();
  -            timer = null;
  -         }
  -      }
  -
  -      log.debug(this + " has shut down");
      }
   
      
  @@ -242,7 +315,9 @@
               threadNumber = threadName.substring(i+1);
            else
               threadNumber = Long.toString(System.currentTimeMillis());
  -         setName("CallbackPoller:" + threadNumber);
  +         String pollerString = CallbackPoller.this.toString();
  +         String address = pollerString.substring(pollerString.indexOf('@'));
  +         setName("CallbackPoller:" + threadNumber + "[" + address + "]");
            setDaemon(true);
         }
   
  @@ -350,10 +425,10 @@
                     toHandleList.wait();
                  }
                  catch (InterruptedException ignored) {}
  -               return;
               }
            }
            log.debug(this + " has shut down");
  +         return;
         }
      }
   
  @@ -457,10 +532,10 @@
                     toAcknowledgeList.wait();
                  }
                  catch (InterruptedException ignored) {}
  -               return;
               }
            }
            log.debug(this + " has shut down");
  +         return;
         }
      }
   
  @@ -511,10 +586,12 @@
                  if (ServerInvoker.BLOCKING.equals(val))
                  {
                     blocking = true;
  +                  synchronizedShutdown = false;
                  }
                  else if (ServerInvoker.NONBLOCKING.equals(val))
                  {
                     blocking = false;
  +                  synchronizedShutdown = true;
                  }
                  else
                  {
  @@ -558,6 +635,20 @@
               }
            }
            
  +         val = metadata.get(SYNCHRONIZED_SHUTDOWN);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               synchronizedShutdown = Boolean.valueOf((String) val).booleanValue();
  +            }
  +            else
  +            {
  +               log.warn("Value for " + SYNCHRONIZED_SHUTDOWN + " must be of type " + String.class.getName() +
  +                     " and is " + val.getClass().getName());
  +            }
  +         }
  +         
            val = metadata.get(CALLBACK_POLL_PERIOD);
            if (val != null)
            {
  @@ -599,6 +690,26 @@
                        " and is " + val.getClass().getName());
               }
            }
  +         val = metadata.get(MAX_ERROR_COUNT);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               try
  +               {
  +                  maxErrorCount = Integer.parseInt((String) val);
  +               }
  +               catch (NumberFormatException e)
  +               {
  +                  log.warn("Error converting " + MAX_ERROR_COUNT + " to type int.  " + e.getMessage());
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + MAX_ERROR_COUNT + " configuration must be of type " + String.class.getName() +
  +                        " and is " + val.getClass().getName());
  +            }
  +         }
            if (metadata.get(REPORT_STATISTICS) != null)
            {
               reportStatistics = true;
  
  
  



More information about the jboss-cvs-commits mailing list