[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...

Ron Sigal ron_sigal at yahoo.com
Fri Aug 17 21:13:18 EDT 2007


  User: rsigal  
  Date: 07/08/17 21:13:18

  Modified:    src/main/org/jboss/remoting/callback  Tag:
                        remoting_2_2_2_experimental
                        ServerInvokerCallbackHandler.java
  Log:
  JBREM-641: Synchronized with remoting_2_x branch, adding support for blocking mode for pull callbacks.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.15.2.10.2.2.2.1 +124 -29   JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerInvokerCallbackHandler.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java,v
  retrieving revision 1.15.2.10.2.2
  retrieving revision 1.15.2.10.2.2.2.1
  diff -u -b -r1.15.2.10.2.2 -r1.15.2.10.2.2.2.1
  --- ServerInvokerCallbackHandler.java	5 Aug 2007 21:04:19 -0000	1.15.2.10.2.2
  +++ ServerInvokerCallbackHandler.java	18 Aug 2007 01:13:18 -0000	1.15.2.10.2.2.2.1
  @@ -73,6 +73,8 @@
      private String listenerId;
      private String clientSessionId;
      private InvokerLocator serverLocator;
  +   private int blockingTimeout = ServerInvoker.DEFAULT_BLOCKING_TIMEOUT;
  +   
   
      private SerializableStore callbackStore = null;
      private CallbackErrorHandler callbackErrorHandler = null;
  @@ -146,47 +148,53 @@
      {
         clientSessionId = invocation.getSessionId();
         sessionId = invocation.getSessionId();
  -      Map metadata = invocation.getRequestPayload();
  -      if(metadata != null)
  +      
  +      Map metadata = null;
  +      if (owner.getConfiguration() == null)
  +      {
  +         metadata = new HashMap();
  +      }
  +      else
  +      {
  +         metadata = new HashMap(owner.getConfiguration());
  +      }
  +      if(invocation.getRequestPayload() != null)
         {
  +         metadata.putAll(invocation.getRequestPayload());
  +      }
  +
            listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
            if(listenerId != null)
            {
               sessionId = sessionId + "+" + listenerId;
            }
  -      }
  +      log.debug("Session id for callback handler is " + sessionId);
  +
         if(invocation.getLocator() != null)
         {
  -         Map clientConfig = new HashMap();
  -         if(owner.getConfiguration() != null)
  -         {
  -            clientConfig.putAll(owner.getConfiguration());
  -            clientConfig.putAll(metadata);
  -         }
  -         
  -         Object o = clientConfig.get(CALLBACK_TIMEOUT);
  -         if (o instanceof String)
  +         Object val = metadata.get(CALLBACK_TIMEOUT);
  +         if (val instanceof String)
            {
               try
               {
  -               Integer.parseInt((String) o);
  -               clientConfig.put(ServerInvoker.TIMEOUT, o);
  -               log.debug(this + " using callbackTimeout value " + o);
  +               Integer.parseInt((String) val);
  +               metadata.put(ServerInvoker.TIMEOUT, val);
  +               log.debug(this + " using callbackTimeout value " + val);
               }
               catch (NumberFormatException e)
               {
  -               log.warn("callbackTimeout value must have valid numeric format: " + o);
  +               log.warn("callbackTimeout value must have valid numeric format: " + val);
               }
            }
  -         else if (o != null)
  +         else if (val != null)
            {
  -            log.warn("callbackTimeout value must be a String: " + o);
  +            log.warn("callbackTimeout value must be a String: " + val);
            }
            
            // need to configure callback client with ssl config if one exists for server
  -         configureSocketFactory(clientConfig, owner);
  +         configureSocketFactory(metadata, owner);
   
  -         callBackClient = new Client(invocation.getLocator(), invocation.getSubsystem(), clientConfig);
  +         callBackClient = new Client(invocation.getLocator(), invocation.getSubsystem(), metadata);
            callBackClient.connect();
            createCallbackErrorHandler(owner, invocation.getSubsystem());
         }
  @@ -195,7 +203,26 @@
            createCallbackStore(owner, sessionId);
         }
   
  -      log.debug("Session id for callback handler is " + sessionId);
  +      Object val = metadata.get(ServerInvoker.BLOCKING_TIMEOUT);
  +      if (val != null)
  +      {
  +         if (val instanceof String)
  +         {
  +            try
  +            {
  +               blockingTimeout = Integer.parseInt((String) val);
  +            }
  +            catch (NumberFormatException e)
  +            {
  +               log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long.  " + e.getMessage());
  +            }
  +         }
  +         else
  +         {
  +            log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
  +                     " and is " + val.getClass().getName());
  +         }
  +      }
      }
   
      /**
  @@ -506,7 +533,69 @@
         return sessionId;
      }
   
  -   public List getCallbacks()
  +   public List getCallbacks(Map metadata)
  +   {
  +      log.trace("entering getCallbacks()");
  +      
  +      boolean blocking = false;
  +      int currentBlockingTimeout = blockingTimeout;
  +      
  +      if (metadata != null)
  +      {
  +         Object val = metadata.get(ServerInvoker.BLOCKING_MODE);
  +         if (ServerInvoker.BLOCKING.equals(val))
  +            blocking = true;
  +         
  +         val = metadata.get(ServerInvoker.BLOCKING_TIMEOUT);
  +         if (val != null)
  +         {
  +            if (val instanceof String)
  +            {
  +               try
  +               {
  +                  currentBlockingTimeout = Integer.parseInt((String) val);
  +               }
  +               catch (NumberFormatException e)
  +               {
  +                  log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long.  " + e.getMessage());
  +               }
  +            }
  +            else
  +            {
  +               log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
  +                        " and is " + val.getClass().getName());
  +            }
  +         }
  +      }
  +
  +      if (trace)
  +      {
  +         log.trace("block: " + blocking);
  +         log.trace("blocking timeout: " + currentBlockingTimeout);
  +      }
  +
  +      synchronized (callbacks)
  +      {
  +         List callbackList = constructCallbackList();
  +         if (blocking && callbackList.isEmpty())
  +         {
  +            try
  +            {
  +               callbacks.wait(currentBlockingTimeout);
  +               callbackList = constructCallbackList();
  +            }
  +            catch (InterruptedException e)
  +            {
  +               log.debug("unexpected interrupt");
  +            }
  +         }
  +         
  +         if (trace) log.trace("callbackList.size(): " + callbackList.size());
  +         return callbackList;
  +      }
  +   }
  +   
  +   private List constructCallbackList()
      {
         List callbackList = null;
         synchronized(callbacks)
  @@ -631,12 +720,17 @@
   
            if(callBackClient == null)
            {
  -            // need to check if shoudl persist callback instead of keeping in memory
  +            // need to check if should persist callback instead of keeping in memory
               if(shouldPersist())
               {
                  try
                  {
  +                  synchronized (callbacks)
  +                  {
                     persistCallback(callback);
  +                     callbacks.notify();
  +                  }
  +                  
                     callback = null;
                     // try to help out with the amount of memory usuage
                     new Thread()
  @@ -660,6 +754,7 @@
                  {
                     if(trace){ log.debug(this + " got PULL callback. Adding to callback list ..."); }
                     callbacks.add(callback);
  +                  callbacks.notify();
                  }
               }
            }
  
  
  



More information about the jboss-cvs-commits mailing list