[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...

Ron Sigal ron_sigal at yahoo.com
Sat May 5 03:41:12 EDT 2007


  User: rsigal  
  Date: 07/05/05 03:41:12

  Modified:    src/main/org/jboss/remoting/callback  Tag: remoting_2_x
                        ServerInvokerCallbackHandler.java
  Log:
  JBREM-641: Added support for blocking mode for pull callbacks.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.15.2.11 +122 -20   JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerInvokerCallbackHandler.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java,v
  retrieving revision 1.15.2.10
  retrieving revision 1.15.2.11
  diff -u -b -r1.15.2.10 -r1.15.2.11
  --- ServerInvokerCallbackHandler.java	7 Mar 2007 06:08:56 -0000	1.15.2.10
  +++ ServerInvokerCallbackHandler.java	5 May 2007 07:41:12 -0000	1.15.2.11
  @@ -73,6 +73,8 @@
      private String listenerId;
      private String clientSessionId;
      private InvokerLocator serverLocator;
  +   private int blockingTimeout = DEFAULT_BLOCKING_TIMEOUT;
  +   
   
      private SerializableStore callbackStore = null;
      private CallbackErrorHandler callbackErrorHandler = null;
  @@ -122,6 +124,12 @@
      private double memPercentCeiling = 20; // 20% by default
   
      /**
  +    * Default timeout for getting callbacks in blocking mode.
  +    * Default is 5000 milliseconds.
  +    */
  +   public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
  +
  +   /**
       * Maps an ID to a CallbackListener for a Callback waiting to be acknowledged.
       */
      private Map idToListenerMap = Collections.synchronizedMap(new HashMap());
  @@ -143,27 +151,34 @@
      {
         clientSessionId = invocation.getSessionId();
         sessionId = invocation.getSessionId();
  -      Map metadata = invocation.getRequestPayload();
  -      if(metadata != null)
  +      
  +      Map metadata = null;
  +      if (owner.getConfiguration() == null)
  +      {
  +         metadata = new HashMap();
  +      }
  +      else
  +      {
  +         metadata = new HashMap(owner.getConfiguration());
  +      }
  +      if(invocation.getRequestPayload() != null)
         {
  +         metadata.putAll(invocation.getRequestPayload());
  +      }
  +
            listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
            if(listenerId != null)
            {
               sessionId = sessionId + "+" + listenerId;
            }
  -      }
  +      log.debug("Session id for callback handler is " + sessionId);
  +
         if(invocation.getLocator() != null)
         {
  -         Map clientConfig = new HashMap();
  -         if(owner.getConfiguration() != null)
  -         {
  -            clientConfig.putAll(owner.getConfiguration());
  -            clientConfig.putAll(metadata);
  -         }
            // need to configure callback client with ssl config if one exists for server
  -         configureSocketFactory(clientConfig, owner);
  +         configureSocketFactory(metadata, owner);
   
  -         callBackClient = new Client(invocation.getLocator(), invocation.getSubsystem(), clientConfig);
  +         callBackClient = new Client(invocation.getLocator(), invocation.getSubsystem(), metadata);
            callBackClient.connect();
            createCallbackErrorHandler(owner, invocation.getSubsystem());
         }
  @@ -172,7 +187,26 @@
            createCallbackStore(owner, sessionId);
         }
   
  -      log.debug("Session id for callback handler is " + sessionId);
  +      Object val = metadata.get(ServerInvoker.BLOCKING_TIMEOUT);
  +      if (val != null)
  +      {
  +         if (val instanceof String)
  +         {
  +            try
  +            {
  +               blockingTimeout = Integer.parseInt((String) val);
  +            }
  +            catch (NumberFormatException e)
  +            {
  +               log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long.  " + e.getMessage());
  +            }
  +         }
  +         else
  +         {
  +            log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
  +                     " and is " + val.getClass().getName());
  +         }
  +      }
      }
   
      /**
  @@ -483,7 +517,69 @@
         return sessionId;
      }
   
  -   public List getCallbacks()
  +   public List getCallbacks(Map metadata)
  +   {
  +      log.trace("entering getCallbacks()");
  +      
  +      boolean blocking = false;
  +      int currentBlockingTimeout = blockingTimeout;
  +      
  +      if (metadata != null)
  +      {
  +         Object val = metadata.get(ServerInvoker.BLOCKING_MODE);
  +         if (ServerInvoker.BLOCK.equals(val))
  +            blocking = true;
  +         
  +         val = metadata.get(ServerInvoker.BLOCKING_TIMEOUT);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               try
  +               {
  +                  currentBlockingTimeout = Integer.parseInt((String) val);
  +               }
  +               catch (NumberFormatException e)
  +               {
  +                  log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long.  " + e.getMessage());
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
  +                        " and is " + val.getClass().getName());
  +            }
  +         }
  +      }
  +
  +      if (true)
  +      {
  +         log.info("block: " + blocking);
  +         log.info("blocking timeout: " + currentBlockingTimeout);
  +      }
  +
  +      synchronized (callbacks)
  +      {
  +         List callbackList = constructCallbackList();
  +         if (blocking && callbackList.isEmpty())
  +         {
  +            try
  +            {
  +               callbacks.wait(currentBlockingTimeout);
  +               callbackList = constructCallbackList();
  +            }
  +            catch (InterruptedException e)
  +            {
  +               log.debug("unexpected interrupt");
  +            }
  +         }
  +         
  +         if (trace) log.trace("callbackList.size(): " + callbackList.size());
  +         return callbackList;
  +      }
  +   }
  +   
  +   private List constructCallbackList()
      {
         List callbackList = null;
         synchronized(callbacks)
  @@ -608,12 +704,17 @@
   
            if(callBackClient == null)
            {
  -            // need to check if shoudl persist callback instead of keeping in memory
  +            // need to check if should persist callback instead of keeping in memory
               if(shouldPersist())
               {
                  try
                  {
  +                  synchronized (callbacks)
  +                  {
                     persistCallback(callback);
  +                     callbacks.notify();
  +                  }
  +                  
                     callback = null;
                     // try to help out with the amount of memory usuage
                     new Thread()
  @@ -637,6 +738,7 @@
                  {
                     if(trace){ log.debug(this + " got PULL callback. Adding to callback list ..."); }
                     callbacks.add(callback);
  +                  callbacks.notify();
                  }
               }
            }
  
  
  



More information about the jboss-cvs-commits mailing list