[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...

Ron Sigal ron_sigal at yahoo.com
Sat Feb 3 00:17:21 EST 2007


  User: rsigal  
  Date: 07/02/03 00:17:21

  Modified:    src/main/org/jboss/remoting/transport/socket 
                        ServerThread.java
  Log:
  JBREM-597, JBREM-690, JBREM-692: sync with remoting_2_x.
  
  Revision  Changes    Path
  1.33      +508 -404  JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerThread.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java,v
  retrieving revision 1.32
  retrieving revision 1.33
  diff -u -b -r1.32 -r1.33
  --- ServerThread.java	12 Jan 2007 18:56:22 -0000	1.32
  +++ ServerThread.java	3 Feb 2007 05:17:21 -0000	1.33
  @@ -26,12 +26,14 @@
   import org.jboss.logging.Logger;
   import org.jboss.remoting.InvocationRequest;
   import org.jboss.remoting.InvocationResponse;
  +import org.jboss.remoting.InvokerLocator;
   import org.jboss.remoting.ServerInvoker;
   import org.jboss.remoting.Version;
  +import org.jboss.remoting.Client;
  +import org.jboss.remoting.serialization.ClassLoaderUtility;
   import org.jboss.remoting.marshal.MarshalFactory;
   import org.jboss.remoting.marshal.Marshaller;
   import org.jboss.remoting.marshal.UnMarshaller;
  -import org.jboss.remoting.serialization.ClassLoaderUtility;
   
   import java.io.EOFException;
   import java.io.IOException;
  @@ -43,6 +45,7 @@
   import java.net.SocketAddress;
   import java.net.SocketException;
   import java.net.SocketTimeoutException;
  +import java.util.HashMap;
   import java.util.LinkedList;
   import java.util.Map;
   
  @@ -59,65 +62,84 @@
    *
    * @author <a href="mailto:bill at jboss.org">Bill Burke</a>
    * @author <a href="mailto:tom at jboss.org">Tom Elrod</a>
  - * @version $Revision: 1.32 $
  + * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  + *
  + * @version $Revision: 1.33 $
    */
   public class ServerThread extends Thread
   {
  +   // Constants ------------------------------------------------------------------------------------
  +
      final static private Logger log = Logger.getLogger(ServerThread.class);
   
  -   protected SocketServerInvoker invoker;
  +   // Static ---------------------------------------------------------------------------------------
  +
  +   private static boolean trace = log.isTraceEnabled();
  +
  +   private static int idGenerator = 0;
  +
  +   public static synchronized int nextID()
  +   {
  +      return idGenerator++;
  +   }
  +
  +   // Attributes -----------------------------------------------------------------------------------
  +
  +   protected volatile boolean running;
  +   protected volatile boolean handlingResponse;
  +   protected volatile boolean shutdown;
  +
      protected LRUPool clientpool;
      protected LinkedList threadpool;
  -   protected volatile boolean running = true;
  -   protected volatile boolean handlingResponse = true; // start off as true so that nobody can interrupt us
  -   protected volatile boolean shutdown = false;
  -   protected static int id = 0;
  -
  -   private SocketWrapper socketWrapper = null;
  -   protected String serverSocketClassName = ServerSocketWrapper.class.getName();
  -   protected Class serverSocketClass = null;
  -   private Constructor serverSocketConstructor = null;
   
  -   private static final boolean isTrace = log.isTraceEnabled();
  -   private static final boolean isDebug = log.isDebugEnabled();
  +   protected String serverSocketClassName;
  +   protected Class serverSocketClass;
   
  -   /**
  -    * Indicates if will check the socket connection when
  -    * getting from pool by sending byte over the connection
  -    * to validate is still good.
  -    */
  -   private boolean shouldCheckConnection = false;
  +   private Socket socket;
  +   private int timeout;
  +   protected SocketServerInvoker invoker;
  +   private Constructor serverSocketConstructor;
   
  -   /**
  -    * Will indicate when the last request has been processed (used in determining
  -    * idle connection/thread timeout)
  -    */
  +   protected Marshaller marshaller;
  +   protected UnMarshaller unmarshaller;
  +
  +   // the unique identity of the thread, which won't change during the life of the thread. The
  +   // thread may get associated with different IP addresses though.
  +   private int id = Integer.MIN_VALUE;
  +
  +   // Indicates if will check the socket connection when getting from pool by sending byte over the
  +   // connection to validate is still good.
  +    private boolean shouldCheckConnection;
  +
  +   // Will indicate when the last request has been processed (used in determining idle
  +   // connection/thread timeout)
      private long lastRequestHandledTimestamp = System.currentTimeMillis();
   
  -   public static synchronized int nextID()
  -   {
  -      int nextID = id++;
  -      return nextID;
  -   }
  +   // Constructors ---------------------------------------------------------------------------------
   
      public ServerThread(Socket socket, SocketServerInvoker invoker, LRUPool clientpool,
  -                       LinkedList threadpool, int timeout, String serverSocketClassName) throws Exception
  +                       LinkedList threadpool, int timeout, String serverSocketClassName)
  +      throws Exception
      {
  -      super("SocketServerInvokerThread-" + socket.getInetAddress().getHostAddress() + "-" + nextID());
  +      super();
  +
  +      running = true;
  +      handlingResponse = true; // start off as true so that nobody can interrupt us
  +
  +      setName(getWorkerThreadName(socket));
  +
  +      this.socket = socket;
  +      this.timeout = timeout;
         this.serverSocketClassName = serverSocketClassName;
  -      this.socketWrapper = createServerSocket(socket, timeout, invoker.getLocator().getParameters());
         this.invoker = invoker;
         this.clientpool = clientpool;
         this.threadpool = threadpool;
  -      init();
  -   }
  +      processNewSocket();
   
  -   private void init()
  -   {
         if (invoker != null)
         {
            Map configMap = invoker.getConfiguration();
  -         String checkValue = (String) configMap.get(SocketServerInvoker.CHECK_CONNECTION_KEY);
  +         String checkValue = (String)configMap.get(SocketServerInvoker.CHECK_CONNECTION_KEY);
            if (checkValue != null && checkValue.length() > 0)
            {
               shouldCheckConnection = Boolean.valueOf(checkValue).booleanValue();
  @@ -125,6 +147,101 @@
         }
      }
   
  +   // Thread overrides -----------------------------------------------------------------------------
  +
  +   public void run()
  +   {
  +      try
  +      {
  +         while (true)
  +         {
  +            dorun();
  +
  +            // The following code has been changed to eliminate a race condition with
  +            // SocketServerInvoker.cleanup().
  +            //
  +            // A ServerThread can shutdown for two reasons:
  +            // 1. the client shuts down, and
  +            // 2. the server shuts down.
  +            //
  +            // If both occur around the same time, a problem arises.  If a ServerThread starts to
  +            // shut down because the client shut down, it will test shutdown, and if it gets to the
  +            // test before SocketServerInvoker.cleanup() calls ServerThread.stop() to set shutdown
  +            // to true, it will return itself to threadpool.  If it moves from clientpool to
  +            // threadpool at just the right time, SocketServerInvoker could miss it in both places
  +            // and never call stop(), leaving it alive, resulting in a memory leak.  The solution is
  +            // to synchronize parts of ServerThread.run() and SocketServerInvoker.cleanup() so that
  +            // they interact atomically.
  +
  +            synchronized (this)
  +            {
  +               synchronized (clientpool)
  +               {
  +                  synchronized (threadpool)
  +                  {
  +                     if (shutdown)
  +                     {
  +                        invoker = null;
  +                        return; // exit thread
  +                     }
  +                     else
  +                     {
  +                        if(trace) { log.trace(this + " removing itself from clientpool and going to threadpool"); }
  +                        clientpool.remove(this);
  +                        threadpool.add(this);
  +                        Thread.interrupted(); // clear any interruption so that we can be pooled.
  +                        clientpool.notify();
  +                     }
  +                  }
  +               }
  +
  +               try
  +               {
  +                  if(trace) { log.trace(this + " begins to wait"); }
  +
  +                  wait();
  +
  +                  if(trace) { log.trace(this + " woke up after wait"); }
  +               }
  +               catch (InterruptedException e)
  +               {
  +                  if (shutdown)
  +                  {
  +                     invoker = null;
  +                     return; // exit thread
  +                  }
  +
  +                  throw e;
  +               }
  +            }
  +         }
  +      }
  +      catch (Exception e)
  +      {
  +         log.debug(this + " exiting run on exception, definitively thrown out of the threadpool", e);
  +      }
  +   }
  +
  +   // Public ---------------------------------------------------------------------------------------
  +
  +   public synchronized void wakeup(Socket socket, int timeout, SocketServerInvoker invoker)
  +      throws Exception
  +   {
  +      // rename the worker thread to reflect the new socket it is handling
  +      setName(getWorkerThreadName(socket));
  +
  +      this.socket = socket;
  +      this.timeout = timeout;
  +      this.invoker = invoker;
  +
  +      running = true;
  +      handlingResponse = true;
  +      processNewSocket();
  +      notify();
  +
  +      if(trace) { log.trace(this + " has notified on mutex"); }
  +   }
  +
      public long getLastRequestTimestamp()
      {
         return lastRequestHandledTimestamp;
  @@ -134,14 +251,13 @@
      {
         shutdown = true;
         running = false;
  -      // This is a race and there is a chance
  -      // that a invocation is going on at the time
  -      // of the interrupt.  But I see no way right
  -      // now to protect for this.
  -
  -      // NOTE ALSO!:
  -      // Shutdown should never be synchronized.
  -      // We don't want to hold up accept() thread! (via LRUpool)
  +
  +      // This is a race and there is a chance that a invocation is going on at the time of the
  +      // interrupt.  But I see no way right now to protect for this.
  +
  +      // NOTE ALSO!: Shutdown should never be synchronized. We don't want to hold up accept()
  +      // thread! (via LRUpool)
  +
         if (!handlingResponse)
         {
            try
  @@ -153,15 +269,12 @@
            {
            }
         }
  -
      }
   
      /**
  -    * Sets if server thread should check connection before continue to process on
  -    * next invocation request.  If is set to true, will send an ACK to client to
  -    * verify client is still connected on same socket.
  -    *
  -    * @param checkConnection
  +    * Sets if server thread should check connection before continue to process on next invocation
  +    * request.  If is set to true, will send an ACK to client to verify client is still connected
  +    * on same socket.
       */
      public void shouldCheckConnection(boolean checkConnection)
      {
  @@ -169,64 +282,24 @@
      }
   
      /**
  -    * Indicates if server will check with client (via an ACK) to
  -    * see if is still there.
  -    *
  -    * @return
  +    * Indicates if server will check with client (via an ACK) to see if is still there.
       */
      public boolean getCheckingConnection()
      {
         return this.shouldCheckConnection;
      }
   
  -   private SocketWrapper createServerSocket(Socket socket, int timeout, Map metadata) throws Exception
  -   {
  -      if (serverSocketConstructor == null)
  -      {
  -         if(serverSocketClass == null)
  -         {
  -            serverSocketClass = ClassLoaderUtility.loadClass(serverSocketClassName, getClass());
  -         }
  -
  -         try
  -         {
  -            serverSocketConstructor = serverSocketClass.getConstructor(new Class[]{Socket.class, Map.class, Integer.class});
  -         }
  -         catch (NoSuchMethodException e)
  -         {
  -            serverSocketConstructor = serverSocketClass.getConstructor(new Class[]{Socket.class});
  -         }
  -
  -      }
  -
  -      SocketWrapper serverSocketWrapper = null;
  -      if (serverSocketConstructor.getParameterTypes().length == 3)
  -      {
  -         serverSocketWrapper = (SocketWrapper) serverSocketConstructor.newInstance(new Object[]{socket, metadata, new Integer(timeout)});
  -      }
  -      else
  +   public void evict()
         {
  -         serverSocketWrapper = (SocketWrapper) serverSocketConstructor.newInstance(new Object[]{socket});
  -         serverSocketWrapper.setTimeout(timeout);
  -      }
  +      running = false;
   
  -      return serverSocketWrapper;
  -   }
  +      // This is a race and there is a chance that a invocation is going on at the time of the
  +      // interrupt.  But I see no way right now to protect for this. There may not be a problem
  +      // because interrupt only effects threads blocking on IO.
   
  +      // NOTE ALSO!: Shutdown should never be synchronized. We don't want to hold up accept()
  +      // thread! (via LRUpool)
   
  -   public void evict()
  -   {
  -      running = false;
  -      // This is a race and there is a chance
  -      // that a invocation is going on at the time
  -      // of the interrupt.  But I see no way right
  -      // now to protect for this.
  -      // There may not be a problem because interrupt only effects
  -      // threads blocking on IO.
  -
  -      // NOTE ALSO!:
  -      // Shutdown should never be synchronized.
  -      // We don't want to hold up accept() thread! (via LRUpool)
         if (!handlingResponse)
         {
            try
  @@ -241,16 +314,14 @@
      }
   
      /**
  -    * This method is intended to be used when need to unblock
  -    * I/O read, which the thread will automatically loop back to
  -    * do after processing a request.  Calling this method will cause
  -    * the underlying socket to be closed.
  +    * This method is intended to be used when need to unblock I/O read, which the thread will
  +    * automatically loop back to do after processing a request.
       */
      public void unblock()
      {
         try
         {
  -         socketWrapper.close();
  +         socket.close();
         }
         catch (IOException e)
         {
  @@ -258,216 +329,206 @@
         }
      }
   
  -   public synchronized void wakeup(Socket socket, int timeout, Map metadata) throws Exception
  -   {
  -      this.socketWrapper = createServerSocket(socket, timeout, metadata);
  -      String name = "SocketServerInvokerThread-" + socket.getInetAddress().getHostAddress() + "-" + nextID();
  -      super.setName(name);
  -      running = true;
  -      handlingResponse = true;
  -      this.notify();
  -      if(isTrace)
  +   public String toString()
         {
  -         log.trace("Have woken up " + getName());
  -      }
  +      return getName();
      }
   
  -   public void run()
  +   // Package protected ----------------------------------------------------------------------------
  +
  +   // Protected ------------------------------------------------------------------------------------
  +
  +   /**
  +    * This is needed because Object*Streams leak
  +    */
  +   protected void dorun()
      {
  +      if(trace) { log.trace("beginning dorun()"); }
  +
  +      running = true;
  +      handlingResponse = true;
  +
  +      // lazy initialize the socketWrapper on the worker thread itself. We do this to avoid to have
  +      // it done on the acceptor thread (prone to lockup)
  +      SocketWrapper socketWrapper = null;
  +
         try
         {
  -         while (true)
  -         {
  -            dorun();
  +         if(trace) { log.trace("creating the socket wrapper"); }
   
  -/*
  -* The following code has been changed to eliminate a race condition with SocketServerInvoker.cleanup
  -().
  -* A ServerThread can shutdown for two reasons:
  -*
  -*   1. the client shuts down, and
  -*   2. the server shuts down.
  -*
  -* If both occur around the same time, a problem arises.  If a ServerThread starts to shut
  -* down because the client shut down, it will test shutdown, and if it gets to the test
  -* before SocketServerInvoker.cleanup() calls ServerThread.stop() to set shutdown to true, it
  -* will return itself to threadpool.  If it moves from clientpool to threadpool at just the
  -* right time, SocketServerInvoker could miss it in both places and never call stop(), leaving
  -* it alive, resulting in a memory leak.  The solution is to synchronize parts of
  -* ServerThread.run() and SocketServerInvoker.cleanup() so that they interact atomically.
  -*/
  -            synchronized (this)
  -            {
  -               synchronized (clientpool)
  +         socketWrapper =
  +            createServerSocketWrapper(socket, timeout, invoker.getLocator().getParameters());
  +
  +         // Always do first one without an ACK because its not needed
  +         if(trace) { log.trace("processing first invocation without acknowledging"); }
  +         processInvocation(socketWrapper);
  +      }
  +      catch (Exception ex)
                  {
  -                  synchronized (threadpool)
  +         log.error("Worker thread initialization failure", ex);
  +         running = false;
  +      }
  +
  +      // Re-use loop
  +      while (running)
                     {
  -                     if (shutdown)
  +         try
                        {
  -                        invoker = null;
  -                        return; // exit thread
  +            acknowledge(socketWrapper);
  +            processInvocation(socketWrapper);
                        }
  -                     else
  +         catch (AcknowledgeFailure e)
                        {
  -                        if(isTrace)
  +            if (!shutdown && trace)
                           {
  -                           log.trace("removing from clientpool and adding to threadpool " + this);
  -                        }
  -                        clientpool.remove(this);
  -                        threadpool.add(this);
  -                        Thread.interrupted(); // clear any interruption so that we can be pooled.
  -                        clientpool.notify();
  -                     }
  +               log.trace("keep alive acknowledge failed!");
                     }
  +            running = false;
                  }
  -
  -               try
  +         catch(SocketTimeoutException ste)
                  {
  -                  if(isTrace)
  +            if(!shutdown)
                     {
  -                     log.trace("begin thread wait");
  -                  }
  -                  this.wait();
  -                  if(isTrace)
  +               if(trace)
                     {
  -                     log.trace("WAKEUP in SERVER THREAD");
  +                  log.trace(ste);
                     }
                  }
  -               catch (InterruptedException e)
  +            running = false;
  +         }
  +         catch (InterruptedIOException e)
                  {
  -                  if (shutdown)
  +            if (!shutdown)
                     {
  -                     invoker = null;
  -                     return; // exit thread
  +               log.error("Socket IO interrupted", e);
                     }
  +            running = false;
   
  -                  throw e;
                  }
  +         catch (InterruptedException e)
  +         {
  +            if(trace)
  +            {
  +               log.trace(e);
               }
  +            if (!shutdown)
  +            {
  +               log.error("interrupted", e);
            }
         }
  -      catch (Exception ignored)
  +         catch (EOFException eof)
         {
  -         if(isDebug)
  +            if (!shutdown && trace)
            {
  -            log.debug("Exiting run on exception", ignored);
  -         }
  +               log.trace("EOFException received. This is likely due to client finishing communication.", eof);
         }
  +            running = false;
      }
  -
  -
  -   protected void acknowledge() throws Exception
  +         catch (SocketException sex)
      {
  -      if (shouldCheckConnection)
  +            if (!shutdown && trace)
         {
  -
  -         // HERE IS THE RACE between ACK received and handlingResponse = true
  -         // We can't synchronize because readByte blocks and client is expecting
  -         // a response and we don't want to hang client.
  -         // see shutdown and evict for more details
  -         // There may not be a problem because interrupt only effects
  -         // threads blocking on IO. and this thread will just continue.
  -         handlingResponse = true;
  -
  -         try
  -         {
  -            socketWrapper.checkConnection();
  +               log.trace("SocketException received. This is likely due to client disconnecting and resetting connection.", sex);
            }
  -         catch (EOFException e)
  -         {
  -            throw new AcknowledgeFailure();
  +            running = false;
            }
  -         catch (SocketException se)
  +         catch (Exception ex)
            {
  -            throw new AcknowledgeFailure();
  -         }
  -         catch (IOException ioe)
  +            if (!shutdown)
            {
  -            throw new AcknowledgeFailure();
  +               log.error("failed", ex);
  +               running = false;
            }
  -
  -         handlingResponse = false;
         }
  +         // clear any interruption so that thread can be pooled.
  +         handlingResponse = false;
  +         Thread.interrupted();
      }
   
  -   protected Object versionedRead(InputStream inputStream, ServerInvoker invoker,
  -                                  ClassLoader classLoader, int version) throws IOException, ClassNotFoundException
  -   {
  -      //TODO: -TME - Should I even botther to check for version here?  Only one way to do processing at this
  -      // point, regardless of version.
  -      switch (version)
  -      {
  -         case Version.VERSION_1:
  -         case Version.VERSION_2:
  -         {
  +      // Ok, we've been shutdown.  Do appropriate cleanups.
  +      // The stream close code has been moved to SocketWrapper.close().
  +//      try
  +//      {
  +//         if (socketWrapper != null)
  +//         {
  +//            InputStream in = socketWrapper.getInputStream();
  +//            if (in != null)
  +//            {
  +//               in.close();
  +//            }
  +//            OutputStream out = socketWrapper.getOutputStream();
  +//            if (out != null)
  +//            {
  +//               out.close();
  +//            }
  +//         }
  +//      }
  +//      catch (Exception ex)
  +//      {
  +//         log.debug("failed to close in/out", ex);
  +//      }
   
  -            //TODO: -TME Need better way to get the unmarshaller (via config)
  -            UnMarshaller unmarshaller = MarshalFactory.getUnMarshaller(invoker.getLocator(), classLoader);
  -            if (unmarshaller == null)
  +      try
               {
  -               unmarshaller = MarshalFactory.getUnMarshaller(invoker.getDataType(), invoker.getSerializationType());
  +         if (socketWrapper != null)
  +         {
  +            socketWrapper.close();
               }
  -            return unmarshaller.read(inputStream, null);
            }
  -         default:
  +      catch (Exception ex)
            {
  -            throw new IOException("Can not read data for version " + version + ".  Supported versions: " + Version.VERSION_1 + "," + Version.VERSION_2);
  -         }
  +         log.error("failed to close socket wrapper", ex);
         }
  +      socketWrapper = null;
      }
   
  -   protected void processInvocation() throws Exception
  -   {
  -      if(isTrace)
  +   protected void processInvocation(SocketWrapper socketWrapper) throws Exception
         {
  -         log.trace("processing invocation in server thread.");
  -      }
  +      if(trace) { log.trace("preparing to process next invocation invocation"); }
  +
         handlingResponse = true;
  +
         // Ok, now read invocation and invoke
   
         //TODO: -TME This needs to be done by ServerInvoker
         int version = Version.getDefaultVersion();
         boolean performVersioning = Version.performVersioning();
         InputStream inputStream = socketWrapper.getInputStream();
  +
         if (performVersioning)
         {
  -//         if(!inputStream.markSupported())
  -//         {
  -//            inputStream = new BufferedInputStream(inputStream);
  -//         }
  -//         // marking the stream in case the version is invalid
  -//         inputStream.mark(2);
  -
            version = readVersion(inputStream);
  +
            //TODO: -TME Should I be checking for -1?
   
  -         /**
  -          * This is a best attempt to determine if is old version.  Typically, the first
  -          * byte will be -1, so if is, will reset stream and process as though is older version
  -          */
  -         /**
  -          * Originally this code (now uncommented) and the other commented code was to try to
  -          * make so could automatically detect older version that would not be sending a byte
  -          * for the version.  However, due to the way the serialization stream manager handles
  -          * the stream, resetting it does not work, so will probably have to throw away that idea.
  -          * However, for now, am uncommenting this section because if are using the flag to turn
  -          * off connection checking (ack back to client), then will get a -1 when the client
  -          * closes connection.  Then when stream passed onto the versionedRead, will get EOFException
  -          * thrown and will process normally (as though came from the acknowledge, as would have happened
  -          * if connection checking was turned on).  Am hoping this is not a mistake...
  -          */
  +         // This is a best attempt to determine if is old version.  Typically, the first byte will
  +         // be -1, so if is, will reset stream and process as though is older version.
  +
  +         // Originally this code (now uncommented) and the other commented code was to try to make
  +         // so could automatically detect older version that would not be sending a byte for the
  +         // version.  However, due to the way the serialization stream manager handles the stream,
  +         // resetting it does not work, so will probably have to throw away that idea. However, for
  +         // now, am uncommenting this section because if are using the flag to turn off connection
  +         // checking (ack back to client), then will get a -1 when the client closes connection.
  +         // Then when stream passed onto the versionedRead, will get EOFException thrown and will
  +         // process normally (as though came from the acknowledge, as would have happened if
  +         // connection checking was turned on).  Am hoping this is not a mistake...
  +
            if(version == -1)
            {
               version = Version.VERSION_1;
  -//            inputStream.reset();
            }
         }
   
  -      Object obj = versionedRead(inputStream, invoker, this.getClass().getClassLoader(), version);
  +      Object obj = versionedRead(inputStream, invoker, getClass().getClassLoader(), version);
  +
  +      // setting timestamp since about to start processing
  +      lastRequestHandledTimestamp = System.currentTimeMillis();
   
         InvocationRequest req = null;
         boolean createdInvocationRequest = false;
         boolean isError = false;
  +
         if(obj instanceof InvocationRequest)
         {
            req = (InvocationRequest)obj;
  @@ -479,28 +540,40 @@
            performVersioning = false;
         }
   
  -
         Object resp = null;
  +
         try
         {
            // Make absolutely sure thread interrupted is cleared.
  -         boolean interrupted = Thread.interrupted();
  +         Thread.interrupted();
  +
  +         if(trace) { log.trace("about to call " + invoker + ".invoke()"); }
  +
            // call transport on the subclass, get the result to handback
            resp = invoker.invoke(req);
  +
  +         if(trace) { log.trace(invoker + ".invoke() returned " + resp); }
         }
         catch (Throwable ex)
         {
            resp = ex;
            isError = true;
  +         log.error(invoker + ".invoke() call failed: " + ex.getMessage());
  +         log.debug(invoker + ".invoke() call failed", ex);
         }
   
         Thread.interrupted(); // clear interrupted state so we don't fail on socket writes
   
  -      if(!isOneway(req.getRequestPayload()))
  +      if(isOneway(req.getRequestPayload()))
  +      {
  +         if(trace) { log.trace("oneway request, writing no reply on the wire"); }
  +      }
  +      else
         {
            if(!createdInvocationRequest)
            {
               // need to return invocation response
  +            if(trace) { log.trace("creating response instance"); }
               resp = new InvocationResponse(req.getSessionId(), resp, isError, req.getReturnPayload());
            }
   
  @@ -512,214 +585,245 @@
   
            versionedWrite(outputStream, invoker, this.getClass().getClassLoader(), resp, version);
         }
  +
         handlingResponse = false;
  +
         // set the timestamp for last successful processed request
         lastRequestHandledTimestamp = System.currentTimeMillis();
      }
   
  -   private boolean isOneway(Map metadata)
  -   {
  -      boolean isOneway = false;
  -
  -      if (metadata != null)
  +   protected void acknowledge(SocketWrapper socketWrapper) throws Exception
         {
  -         Object val = metadata.get(org.jboss.remoting.Client.ONEWAY_FLAG);
  -         if (val != null && val instanceof String && Boolean.valueOf((String) val).booleanValue())
  +      if (shouldCheckConnection)
            {
  -            if (isTrace)
  +         // HERE IS THE RACE between ACK received and handlingResponse = true. We can't synchronize
  +         // because readByte blocks and client is expecting a response and we don't want to hang
  +         // client. See shutdown and evict for more details. There may not be a problem because
  +         // interrupt only effects threads blocking on IO. and this thread will just continue.
  +
  +         handlingResponse = true;
  +
  +         try
               {
  -               log.trace("Oneway invocation, so not waiting for response.  Returning null.");
  -            }
  -            isOneway = true;
  -         }
  -      }
  -      return isOneway;
  +            if(trace) { log.trace("checking connection"); }
  +            socketWrapper.checkConnection();
      }
  -
  -   private InvocationRequest createInvocationRequest(Object obj, SocketWrapper socketWrapper)
  +         catch (EOFException e)
      {
  -      if(obj instanceof InvocationRequest)
  +            throw new AcknowledgeFailure();
  +         }
  +         catch (SocketException se)
         {
  -         return (InvocationRequest)obj;
  +            throw new AcknowledgeFailure();
         }
  -      else
  +         catch (IOException ioe)
         {
  -         // need to wrap request with invocation request
  -         SocketAddress remoteAddress = socketWrapper.getSocket().getRemoteSocketAddress();
  -
  -         InvocationRequest request = new InvocationRequest(remoteAddress.toString(), invoker.getSupportedSubsystems()[0], obj, null, null, null);
  -         return request;
  +            throw new AcknowledgeFailure();
         }
   
  +         handlingResponse = false;
  +      }
      }
   
  -   private void versionedWrite(OutputStream outputStream, SocketServerInvoker invoker,
  -                               ClassLoader classLoader, Object resp, int version) throws IOException
  +   protected Object versionedRead(InputStream inputStream, ServerInvoker invoker,
  +                                  ClassLoader classLoader, int version)
  +      throws IOException, ClassNotFoundException
      {
  -      //TODO: -TME - Should I ever worry about checking version here?  Only one way to send data at this point.
  +      //TODO: -TME - Should I even botther to check for version here?  Only one way to do processing
  +      //             at this point, regardless of version.
         switch (version)
         {
            case Version.VERSION_1:
            case Version.VERSION_2:
            {
  -            Marshaller marshaller = MarshalFactory.getMarshaller(invoker.getLocator(), classLoader);
  +            if(trace) { log.trace("blocking to read invocation from unmarshaller"); }
   
  -            if (marshaller == null)
  -            {
  -               marshaller = MarshalFactory.getMarshaller(invoker.getDataType(), invoker.getSerializationType());
  -            }
  +            Object o = unmarshaller.read(inputStream, null);
   
  -            marshaller.write(resp, outputStream);
  -            return;
  +            if(trace) { log.trace("read " + o + " from unmarshaller"); }
  +
  +            return o;
            }
            default:
            {
  -            throw new IOException("Can not write data for version " + version + ".  Supported version: " + Version.VERSION_1 + ", " + Version.VERSION_2);
  +            throw new IOException("Can not read data for version " + version +
  +               ".  Supported versions: " + Version.VERSION_1 + "," + Version.VERSION_2);
            }
         }
  -
      }
   
  -   private int readVersion(InputStream inputStream) throws IOException
  -   {
  -      return inputStream.read();
  -   }
  +   // Private --------------------------------------------------------------------------------------
   
  -   private void writeVersion(OutputStream outputStream, int version) throws IOException
  +   private SocketWrapper createServerSocketWrapper(Socket socket, int timeout, Map metadata)
  +      throws Exception
      {
  -      outputStream.write(version);
  -   }
  -
  -   /**
  -    * This is needed because Object*Streams leak
  -    */
  -   protected void dorun()
  +      if (serverSocketConstructor == null)
      {
  -      if(isTrace)
  +         if(serverSocketClass == null)
         {
  -         log.trace("beginning dorun");
  +            serverSocketClass = ClassLoaderUtility.loadClass(serverSocketClassName, getClass());
         }
  -      running = true;
  -      handlingResponse = true;
   
  -      // Always do first one without an ACK because its not needed
         try
         {
  -         processInvocation();
  +            serverSocketConstructor = serverSocketClass.
  +               getConstructor(new Class[]{Socket.class, Map.class, Integer.class});
         }
  -      catch (Exception ex)
  +         catch (NoSuchMethodException e)
         {
  -         log.error("failed to process invocation.", ex);
  -         running = false;
  +            serverSocketConstructor = serverSocketClass.getConstructor(new Class[]{Socket.class});
         }
   
  -      // Re-use loop
  -      while (running)
  +      }
  +
  +      SocketWrapper serverSocketWrapper = null;
  +
  +      if (serverSocketConstructor.getParameterTypes().length == 3)
         {
  -         try
  +         Map localMetadata = null;
  +         if (metadata == null)
            {
  -            acknowledge();
  -            processInvocation();
  +            localMetadata = new HashMap(2);
            }
  -         catch (AcknowledgeFailure e)
  -         {
  -            if (!shutdown && isTrace)
  +         else
               {
  -               log.trace("Keep alive acknowledge failed.");
  +            localMetadata = new HashMap(metadata);   
               }
  -            running = false;
  +         localMetadata.put(SocketWrapper.MARSHALLER, marshaller);
  +         localMetadata.put(SocketWrapper.UNMARSHALLER, unmarshaller);
   
  +         serverSocketWrapper = (SocketWrapper)serverSocketConstructor.
  +            newInstance(new Object[]{socket, localMetadata, new Integer(timeout)});
            }
  -         catch(SocketTimeoutException ste)
  -         {
  -            if(!shutdown)
  -            {
  -               if(isTrace)
  +      else
                  {
  -                  log.trace(ste);
  -               }
  +         serverSocketWrapper =
  +            (SocketWrapper)serverSocketConstructor.newInstance(new Object[]{socket});
  +
  +         serverSocketWrapper.setTimeout(timeout);
               }
  -            running = false;
  +      return serverSocketWrapper;
            }
  -         catch (InterruptedIOException e)
  -         {
  -            if (!shutdown)
  +
  +   private boolean isOneway(Map metadata)
               {
  -               log.error("Socket IO interrupted", e);
  -            }
  -            running = false;
  +      boolean isOneway = false;
   
  -         }
  -         catch (InterruptedException e)
  +      if (metadata != null)
            {
  -            if(isTrace)
  +         Object val = metadata.get(Client.ONEWAY_FLAG);
  +         if (val != null && val instanceof String && Boolean.valueOf((String) val).booleanValue())
               {
  -               log.trace(e);
  +            isOneway = true;
               }
  -            if (!shutdown)
  -            {
  -               log.error("interrupted", e);
               }
  +      return isOneway;
            }
  -         catch (EOFException eof)
  +
  +   private InvocationRequest createInvocationRequest(Object obj, SocketWrapper socketWrapper)
            {
  -            if (!shutdown && isTrace)
  +      if(obj instanceof InvocationRequest)
               {
  -               log.trace("EOF received.  This is likely due to client finishing comminication.");
  +         return (InvocationRequest)obj;
               }
  -            running = false;
  +      else
  +      {
  +         // need to wrap request with invocation request
  +         SocketAddress remoteAddress = socketWrapper.getSocket().getRemoteSocketAddress();
   
  +         return new InvocationRequest(remoteAddress.toString(),
  +                                      invoker.getSupportedSubsystems()[0],
  +                                      obj, null, null, null);
            }
  -         catch (SocketException sex)
  +   }
  +   
  +   private void processNewSocket()
            {
  -            if (!shutdown && isTrace)
  +      InvokerLocator locator = invoker.getLocator();
  +      ClassLoader classLoader = getClass().getClassLoader();
  +      String dataType = invoker.getDataType();
  +      String serializationType = invoker.getSerializationType();
  +      
  +      //TODO: -TME Need better way to get the unmarshaller (via config)
  +      
  +      if (unmarshaller == null)
               {
  -               log.trace("SocketException received.  This is likely due to client disconnecting and resetting connection.");
  +         unmarshaller = MarshalFactory.getUnMarshaller(locator, classLoader);
               }
  -            running = false;
  -         }
  -         catch (Exception ex)
  +      if (unmarshaller == null)
            {
  -            if (!shutdown)
  +         unmarshaller = MarshalFactory.getUnMarshaller(dataType, serializationType);
  +      }
  +     
  +      if (marshaller == null)
               {
  -               log.error("failed", ex);
  -               running = false;
  +         marshaller = MarshalFactory.getMarshaller(locator, classLoader);
               }
  +      if (marshaller == null)
  +      {
  +         marshaller = MarshalFactory.getMarshaller(dataType, serializationType);
            }
  -         // clear any interruption so that thread can be pooled.
  -         handlingResponse = false;
  -         Thread.interrupted();
  +      
  +      
         }
  -      // Ok, we've been shutdown.  Do appropriate cleanups.
  -      try
  +
  +   private void versionedWrite(OutputStream outputStream, SocketServerInvoker invoker,
  +                               ClassLoader classLoader, Object resp, int version) throws IOException
  +   {
  +      //TODO: -TME - Should I ever worry about checking version here?  Only one way to send data at this point.
  +      switch (version)
         {
  -         InputStream in = socketWrapper.getInputStream();
  -         if (in != null)
  +         case Version.VERSION_1:
  +         case Version.VERSION_2:
            {
  -            in.close();
  +            marshaller.write(resp, outputStream);
  +            if (trace) { log.trace("wrote response to the output stream"); }
  +            return;
            }
  -         OutputStream out = socketWrapper.getOutputStream();
  -         if (out != null)
  +         default:
            {
  -            out.close();
  +            throw new IOException("Can not write data for version " + version +
  +               ".  Supported version: " + Version.VERSION_1 + ", " + Version.VERSION_2);
            }
         }
  -      catch (Exception ex)
  +   }
  +
  +   private int readVersion(InputStream inputStream) throws IOException
         {
  +      if(trace) { log.trace("blocking to read version from input stream"); }
  +
  +      int version = inputStream.read();
  +
  +      if(trace) { log.trace("read version " + version + " from input stream"); }
  +
  +      return version;
         }
  -      try
  +
  +   private void writeVersion(OutputStream outputStream, int version) throws IOException
         {
  -         socketWrapper.close();
  +      outputStream.write(version);
         }
  -      catch (Exception ex)
  +
  +   private String getWorkerThreadName(Socket currentSocket)
  +   {
  +      if (id == Integer.MIN_VALUE)
         {
  -         log.error("Failed cleanup", ex);
  +         id = nextID();
         }
  -      socketWrapper = null;
  +
  +      StringBuffer sb = new StringBuffer("WorkerThread#");
  +      sb.append(id).append('[');
  +      sb.append(currentSocket.getInetAddress().getHostAddress());
  +      sb.append(':');
  +      sb.append(currentSocket.getPort());
  +      sb.append(']');
  +
  +      return sb.toString();
      }
   
  +   // Inner classes --------------------------------------------------------------------------------
  +
      public static class AcknowledgeFailure extends Exception
      {
  -
      }
   }
  
  
  



More information about the jboss-cvs-commits mailing list