[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...

Ron Sigal ron_sigal at yahoo.com
Thu Oct 12 01:04:51 EDT 2006


  User: rsigal  
  Date: 06/10/12 01:04:50

  Modified:    src/main/org/jboss/remoting/callback  CallbackPoller.java
  Log:
  JBREM-610: All callbacks are delivered on the same thread.
  
  Revision  Changes    Path
  1.4       +137 -42   JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: CallbackPoller.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -b -r1.3 -r1.4
  --- CallbackPoller.java	25 Sep 2006 23:43:47 -0000	1.3
  +++ CallbackPoller.java	12 Oct 2006 05:04:50 -0000	1.4
  @@ -21,15 +21,15 @@
   */
   package org.jboss.remoting.callback;
   
  -import org.jboss.logging.Logger;
  -import org.jboss.remoting.Client;
  -import org.jboss.remoting.invocation.InternalInvocation;
  -import org.jboss.remoting.util.TimerUtil;
  -
  +import java.util.ArrayList;
   import java.util.List;
   import java.util.Map;
   import java.util.TimerTask;
   
  +import org.jboss.logging.Logger;
  +import org.jboss.remoting.Client;
  +import org.jboss.remoting.util.TimerUtil;
  +
   /**
    * @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
    */
  @@ -53,6 +53,11 @@
      private Object callbackHandlerObject = null;
      private long pollPeriod = DEFAULT_POLL_PERIOD;
   
  +   private ArrayList toHandleList = new ArrayList();
  +   private ArrayList toAcknowledgeList = new ArrayList();
  +   private HandleThread handleThread;
  +   private AcknowledgeThread acknowledgeThread;
  +
      private static final Logger log = Logger.getLogger(CallbackPoller.class);
   
   
  @@ -74,6 +79,10 @@
         {
            client.connect();
         }
  +      else
  +      {
  +         throw new NullPointerException("Can not poll for callbacks when Client is null.");
  +      }
   
         if (metadata != null)
         {
  @@ -99,6 +108,9 @@
            }
         }
   
  +      handleThread = new HandleThread();
  +      handleThread.start();
  +
         TimerUtil.schedule(this, pollPeriod);
   
      }
  @@ -109,59 +121,127 @@
         try
         {
            List callbacks = client.getCallbacks(callbackHandler);
  +
            if (callbacks != null && callbacks.size() > 0)
            {
  -            final Callback[] callbackArray = (Callback[]) callbacks.toArray(new Callback[callbacks.size()]);
  +            synchronized (toHandleList)
  +            {
  +               toHandleList.addAll(callbacks);
  +               if (toHandleList.size() == callbacks.size())
  +                  toHandleList.notify();
  +            }
  +         }
  +      }
  +      catch (Throwable throwable)
  +      {
  +         log.error("Error getting callbacks from server.", throwable);
  +      }
  +   }
  +   
  +   public void stop()
  +   {
  +      cancel();
  +      client = null;
  +      callbackHandler = null;
  +      handleThread.shutdown();
  +      handleThread = null;
  +      if (acknowledgeThread != null)
  +      {
  +         acknowledgeThread.shutdown();
  +         acknowledgeThread = null;
  +      }
  +   }
   
  -            // delivering callbacks on new thread as don't want to allow the callback handler
  -            // to hijack timer task thread as could slow down timer otherwise.
  -            new Thread(new Runnable()
  +   class HandleThread extends Thread
               {
  +      boolean running = true;
  +      Callback callback;
  +      
                  public void run()
                  {
  -                  for (int x = 0; x < callbackArray.length; x++)
  +         while (running)
                     {
  -                     Callback callbackObject = callbackArray[x];
  -                     callbackObject.setCallbackHandleObject(callbackHandlerObject);
  -                     try
  +            synchronized (toHandleList)
  +            {
  +               if (toHandleList.isEmpty())
                        {
  -                        callbackHandler.handleCallback(callbackObject);
                           try
                           {
  -                           checkForAcknowledgementRequest(callbackHandler, callbackObject);
  +                     toHandleList.wait();
                           }
  -                        catch (Throwable t)
  +                  catch (InterruptedException e)
                           {
  -                           log.error("Error acknowledging callback to callback handler (" + callbackHandler + ").", t);
  +                     log.warn("unexpected interrupt");
                           }
                        }
  +               callback = (Callback) toHandleList.remove(0);
  +            }
  +            
  +            try
  +            {
  +               callback.setCallbackHandleObject(callbackHandlerObject);
  +               callbackHandler.handleCallback(callback);
  +            }
                        catch (HandleCallbackException e)
                        {
                           log.error("Error delivering callback to callback handler (" + callbackHandler + ").", e);
                        }
   
  +            checkForAcknowledgeRequest(callback);
                     }
                  }
  -            }).start();
  +      
  +      public void shutdown()
  +      {
  +         running = false;
  +      }
            }
   
  +   class AcknowledgeThread extends Thread
  +   {
  +      boolean running = true;
  +      Callback callback;
  +      
  +      public void run()
  +      {
  +         while (running)
  +         {
  +            synchronized (toAcknowledgeList)
  +            {
  +               if (toAcknowledgeList.isEmpty())
  +               {
  +                  try
  +                  {
  +                     toAcknowledgeList.wait();
         }
  -      catch (Throwable throwable)
  +                  catch (InterruptedException e)
         {
  -         log.error("Error getting callbacks from server.", throwable);
  +                     log.warn("unexpected interrupt");
  +                     continue;
         }
      }
  +               callback = (Callback) toAcknowledgeList.remove(0);
  +            }
   
  -   public void stop()
  +            try
      {
  -      cancel();
  -      client = null;
  -      callbackHandler = null;
  -      callbackHandlerObject = null;
  +               client.acknowledgeCallback(callbackHandler, callback);
      }
  +            catch (Throwable t)
  +            {
  +               log.error("Error acknowledging callback for callback handler (" + callbackHandler + ").", t);
  +            }
  +         }
  +      }
  +      
  +      public void shutdown()
  +      {
  +         running = false;
  +      }
  +   }
  +
      
  -   private void checkForAcknowledgementRequest(InvokerCallbackHandler callbackhandler, Callback callback)
  -   throws Throwable
  +   private void checkForAcknowledgeRequest(Callback callback)
      {
         Map returnPayload = callback.getReturnPayload();
         if (returnPayload != null)
  @@ -169,7 +249,22 @@
            Object callbackId = returnPayload.get(ServerInvokerCallbackHandler.CALLBACK_ID);
            if (callbackId != null)
            {
  -            client.acknowledgeCallback(callbackHandler, callback);
  +            synchronized (toAcknowledgeList)
  +            {
  +               toAcknowledgeList.add(callback);
  +               if (toAcknowledgeList.size() == 1)
  +               {
  +                  if (acknowledgeThread == null)
  +                  {
  +                     acknowledgeThread = new AcknowledgeThread();
  +                     acknowledgeThread.start();
  +                  }
  +                  else
  +                  {
  +                     toAcknowledgeList.notify();
  +                  }
  +               }
  +            }
            }
         }
      }
  
  
  



More information about the jboss-cvs-commits mailing list