[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...

Ovidiu Feodorov ovidiu.feodorov at jboss.com
Tue Jan 16 09:17:15 EST 2007


  User: ovidiu  
  Date: 07/01/16 09:17:15

  Modified:    src/main/org/jboss/remoting/transport/socket   Tag:
                        remoting_2_x ServerThread.java
                        SocketServerInvoker.java
  Log:
  Fixed http://jira.jboss.org/jira/browse/JBREM-666 by moving all I/O initialization-related
  operations from the acceptor thread to the worker thread.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.29.2.9  +398 -377  JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerThread.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java,v
  retrieving revision 1.29.2.8
  retrieving revision 1.29.2.9
  diff -u -b -r1.29.2.8 -r1.29.2.9
  --- ServerThread.java	16 Jan 2007 08:15:05 -0000	1.29.2.8
  +++ ServerThread.java	16 Jan 2007 14:17:15 -0000	1.29.2.9
  @@ -61,50 +61,55 @@
    * @author <a href="mailto:tom at jboss.org">Tom Elrod</a>
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    *
  - * @version $Revision: 1.29.2.8 $
  + * @version $Revision: 1.29.2.9 $
    */
   public class ServerThread extends Thread
   {
  +   // Constants ------------------------------------------------------------------------------------
  +
      final static private Logger log = Logger.getLogger(ServerThread.class);
   
  -   private static final boolean trace = log.isTraceEnabled();
  +   // Static ---------------------------------------------------------------------------------------
  +
  +   private static boolean trace = log.isTraceEnabled();
  +
  +   private static int idGenerator = 0;
  +
  +   public static synchronized int nextID()
  +   {
  +      return idGenerator++;
  +   }
  +
  +   // Attributes -----------------------------------------------------------------------------------
  +
  +   protected volatile boolean running;
  +   protected volatile boolean handlingResponse;
  +   protected volatile boolean shutdown;
   
  -   protected SocketServerInvoker invoker;
      protected LRUPool clientpool;
      protected LinkedList threadpool;
  -   protected volatile boolean running = true;
  -   protected volatile boolean handlingResponse = true; // start off as true so that nobody can interrupt us
  -   protected volatile boolean shutdown = false;
  -
  -   protected static int idGenerator = 0;
  -
  -   private SocketWrapper socketWrapper = null;
  -   protected String serverSocketClassName = ServerSocketWrapper.class.getName();
  -   protected Class serverSocketClass = null;
  -   private Constructor serverSocketConstructor = null;
  +
  +   protected String serverSocketClassName;
  +   protected Class serverSocketClass;
  +
  +   private Socket socket;
  +   private int timeout;
  +   protected SocketServerInvoker invoker;
  +   private Constructor serverSocketConstructor;
   
      // the unique identity of the thread, which won't change during the life of the thread. The
      // thread may get associated with different IP addresses though.
      private int id = Integer.MIN_VALUE;
   
  -   /**
  -    * Indicates if will check the socket connection when
  -    * getting from pool by sending byte over the connection
  -    * to validate is still good.
  -    */
  -   private boolean shouldCheckConnection = false;
  +   // Indicates if will check the socket connection when getting from pool by sending byte over the
  +   // connection to validate is still good.
  +    private boolean shouldCheckConnection;
   
  -   /**
  -    * Will indicate when the last request has been processed (used in determining
  -    * idle connection/thread timeout)
  -    */
  +   // Will indicate when the last request has been processed (used in determining idle
  +   // connection/thread timeout)
      private long lastRequestHandledTimestamp = System.currentTimeMillis();
   
  -
  -   public static synchronized int nextID()
  -   {
  -      return idGenerator++;
  -   }
  +   // Constructors ---------------------------------------------------------------------------------
   
      public ServerThread(Socket socket, SocketServerInvoker invoker, LRUPool clientpool,
                          LinkedList threadpool, int timeout, String serverSocketClassName)
  @@ -112,10 +117,14 @@
      {
         super();
   
  +      running = true;
  +      handlingResponse = true; // start off as true so that nobody can interrupt us
  +
         setName(getWorkerThreadName(socket));
   
  +      this.socket = socket;
  +      this.timeout = timeout;
         this.serverSocketClassName = serverSocketClassName;
  -      this.socketWrapper = createServerSocketWrapper(socket, timeout, invoker.getLocator().getParameters());
         this.invoker = invoker;
         this.clientpool = clientpool;
         this.threadpool = threadpool;
  @@ -131,6 +140,100 @@
         }
      }
   
  +   // Thread overrides -----------------------------------------------------------------------------
  +
  +   public void run()
  +   {
  +      try
  +      {
  +         while (true)
  +         {
  +            dorun();
  +
  +            // The following code has been changed to eliminate a race condition with
  +            // SocketServerInvoker.cleanup().
  +            //
  +            // A ServerThread can shutdown for two reasons:
  +            // 1. the client shuts down, and
  +            // 2. the server shuts down.
  +            //
  +            // If both occur around the same time, a problem arises.  If a ServerThread starts to
  +            // shut down because the client shut down, it will test shutdown, and if it gets to the
  +            // test before SocketServerInvoker.cleanup() calls ServerThread.stop() to set shutdown
  +            // to true, it will return itself to threadpool.  If it moves from clientpool to
  +            // threadpool at just the right time, SocketServerInvoker could miss it in both places
  +            // and never call stop(), leaving it alive, resulting in a memory leak.  The solution is
  +            // to synchronize parts of ServerThread.run() and SocketServerInvoker.cleanup() so that
  +            // they interact atomically.
  +
  +            synchronized (this)
  +            {
  +               synchronized (clientpool)
  +               {
  +                  synchronized (threadpool)
  +                  {
  +                     if (shutdown)
  +                     {
  +                        invoker = null;
  +                        return; // exit thread
  +                     }
  +                     else
  +                     {
  +                        if(trace) { log.trace(this + " removing itself from clientpool and going to threadpool"); }
  +                        clientpool.remove(this);
  +                        threadpool.add(this);
  +                        Thread.interrupted(); // clear any interruption so that we can be pooled.
  +                        clientpool.notify();
  +                     }
  +                  }
  +               }
  +
  +               try
  +               {
  +                  if(trace) { log.trace(this + " begins to wait"); }
  +
  +                  wait();
  +
  +                  if(trace) { log.trace(this + " woke up after wait"); }
  +               }
  +               catch (InterruptedException e)
  +               {
  +                  if (shutdown)
  +                  {
  +                     invoker = null;
  +                     return; // exit thread
  +                  }
  +
  +                  throw e;
  +               }
  +            }
  +         }
  +      }
  +      catch (Exception e)
  +      {
  +         log.debug(this + " exiting run on exception, definitively thrown out of the threadpool", e);
  +      }
  +   }
  +
  +   // Public ---------------------------------------------------------------------------------------
  +
  +   public synchronized void wakeup(Socket socket, int timeout, SocketServerInvoker invoker)
  +      throws Exception
  +   {
  +      // rename the worker thread to reflect the new socket it is handling
  +      setName(getWorkerThreadName(socket));
  +
  +      this.socket = socket;
  +      this.timeout = timeout;
  +      this.invoker = invoker;
  +
  +      running = true;
  +      handlingResponse = true;
  +      notify();
  +
  +      if(trace) { log.trace(this + " has notified on mutex"); }
  +   }
  +
      public long getLastRequestTimestamp()
      {
         return lastRequestHandledTimestamp;
  @@ -140,14 +243,13 @@
      {
         shutdown = true;
         running = false;
  -      // This is a race and there is a chance
  -      // that a invocation is going on at the time
  -      // of the interrupt.  But I see no way right
  -      // now to protect for this.
  -
  -      // NOTE ALSO!:
  -      // Shutdown should never be synchronized.
  -      // We don't want to hold up accept() thread! (via LRUpool)
  +
  +      // This is a race and there is a chance that a invocation is going on at the time of the
  +      // interrupt.  But I see no way right now to protect for this.
  +
  +      // NOTE ALSO!: Shutdown should never be synchronized. We don't want to hold up accept()
  +      // thread! (via LRUpool)
  +
         if (!handlingResponse)
         {
            try
  @@ -159,15 +261,12 @@
            {
            }
         }
  -
      }
   
      /**
  -    * Sets if server thread should check connection before continue to process on
  -    * next invocation request.  If is set to true, will send an ACK to client to
  -    * verify client is still connected on same socket.
  -    *
  -    * @param checkConnection
  +    * Sets if server thread should check connection before continue to process on next invocation
  +    * request.  If is set to true, will send an ACK to client to verify client is still connected
  +    * on same socket.
       */
      public void shouldCheckConnection(boolean checkConnection)
      {
  @@ -175,67 +274,24 @@
      }
   
      /**
  -    * Indicates if server will check with client (via an ACK) to
  -    * see if is still there.
  +    * Indicates if server will check with client (via an ACK) to see if is still there.
       */
      public boolean getCheckingConnection()
      {
         return this.shouldCheckConnection;
      }
   
  -   private SocketWrapper createServerSocketWrapper(Socket socket, int timeout, Map metadata)
  -      throws Exception
  -   {
  -      if (serverSocketConstructor == null)
  -      {
  -         if(serverSocketClass == null)
  -         {
  -            serverSocketClass = ClassLoaderUtility.loadClass(serverSocketClassName, getClass());
  -         }
  -
  -         try
  -         {
  -            serverSocketConstructor = serverSocketClass.
  -               getConstructor(new Class[]{Socket.class, Map.class, Integer.class});
  -         }
  -         catch (NoSuchMethodException e)
  -         {
  -            serverSocketConstructor = serverSocketClass.getConstructor(new Class[]{Socket.class});
  -         }
  -
  -      }
  -
  -      SocketWrapper serverSocketWrapper = null;
  -
  -      if (serverSocketConstructor.getParameterTypes().length == 3)
  -      {
  -         serverSocketWrapper = (SocketWrapper)serverSocketConstructor.
  -            newInstance(new Object[]{socket, metadata, new Integer(timeout)});
  -      }
  -      else
  +   public void evict()
         {
  -         serverSocketWrapper =
  -            (SocketWrapper)serverSocketConstructor.newInstance(new Object[]{socket});
  -         serverSocketWrapper.setTimeout(timeout);
  -      }
  +      running = false;
   
  -      return serverSocketWrapper;
  -   }
  +      // This is a race and there is a chance that a invocation is going on at the time of the
  +      // interrupt.  But I see no way right now to protect for this. There may not be a problem
  +      // because interrupt only effects threads blocking on IO.
   
  +      // NOTE ALSO!: Shutdown should never be synchronized. We don't want to hold up accept()
  +      // thread! (via LRUpool)
   
  -   public void evict()
  -   {
  -      running = false;
  -      // This is a race and there is a chance
  -      // that a invocation is going on at the time
  -      // of the interrupt.  But I see no way right
  -      // now to protect for this.
  -      // There may not be a problem because interrupt only effects
  -      // threads blocking on IO.
  -
  -      // NOTE ALSO!:
  -      // Shutdown should never be synchronized.
  -      // We don't want to hold up accept() thread! (via LRUpool)
         if (!handlingResponse)
         {
            try
  @@ -250,16 +306,14 @@
      }
   
      /**
  -    * This method is intended to be used when need to unblock
  -    * I/O read, which the thread will automatically loop back to
  -    * do after processing a request.  Calling this method will cause
  -    * the underlying socket to be closed.
  +    * This method is intended to be used when need to unblock I/O read, which the thread will
  +    * automatically loop back to do after processing a request.
       */
      public void unblock()
      {
         try
         {
  -         socketWrapper.close();
  +         socket.close();
         }
         catch (IOException e)
         {
  @@ -267,196 +321,159 @@
         }
      }
   
  -
  -   public synchronized void wakeup(Socket socket, int timeout, Map metadata) throws Exception
  +   public String toString()
      {
  -      // rename the worker thread to reflect the new socket it is handling
  -      setName(getWorkerThreadName(socket));
  +      return getName();
  +   }
  +
  +   // Package protected ----------------------------------------------------------------------------
  +
  +   // Protected ------------------------------------------------------------------------------------
   
  -      socketWrapper = createServerSocketWrapper(socket, timeout, metadata);
  +   /**
  +    * This is needed because Object*Streams leak
  +    */
  +   protected void dorun()
  +   {
  +      if(trace) { log.trace("beginning dorun()"); }
   
         running = true;
         handlingResponse = true;
  -      notify();
   
  -      if(trace) { log.trace(this + " has woken up"); }
  -   }
  +      // lazy initialize the socketWrapper on the worker thread itself. We do this to avoid to have
  +      // it done on the acceptor thread (prone to lockup)
  +      SocketWrapper socketWrapper = null;
   
  -   public void run()
  -   {
         try
         {
  -         while (true)
  +         if(trace) { log.trace("creating the socket wrapper"); }
  +
  +         socketWrapper =
  +            createServerSocketWrapper(socket, timeout, invoker.getLocator().getParameters());
  +
  +         // Always do first one without an ACK because its not needed
  +         if(trace) { log.trace("processing first invocation without acknowledging"); }
  +         processInvocation(socketWrapper);
  +      }
  +      catch (Exception ex)
            {
  -            dorun();
  +         log.error("Worker thread initialization failure", ex);
  +         running = false;
  +      }
   
  -/*
  -* The following code has been changed to eliminate a race condition with SocketServerInvoker.cleanup
  -().
  -* A ServerThread can shutdown for two reasons:
  -*
  -*   1. the client shuts down, and
  -*   2. the server shuts down.
  -*
  -* If both occur around the same time, a problem arises.  If a ServerThread starts to shut
  -* down because the client shut down, it will test shutdown, and if it gets to the test
  -* before SocketServerInvoker.cleanup() calls ServerThread.stop() to set shutdown to true, it
  -* will return itself to threadpool.  If it moves from clientpool to threadpool at just the
  -* right time, SocketServerInvoker could miss it in both places and never call stop(), leaving
  -* it alive, resulting in a memory leak.  The solution is to synchronize parts of
  -* ServerThread.run() and SocketServerInvoker.cleanup() so that they interact atomically.
  -*/
  -            synchronized (this)
  +      // Re-use loop
  +      while (running)
               {
  -               synchronized (clientpool)
  +         try
                  {
  -                  synchronized (threadpool)
  +            acknowledge(socketWrapper);
  +            processInvocation(socketWrapper);
  +         }
  +         catch (AcknowledgeFailure e)
                     {
  -                     if (shutdown)
  +            if (!shutdown && trace)
                        {
  -                        invoker = null;
  -                        return; // exit thread
  +               log.trace("keep alive acknowledge failed!");
                        }
  -                     else
  +            running = false;
  +         }
  +         catch(SocketTimeoutException ste)
  +         {
  +            if(!shutdown)
                        {
                           if(trace)
                           {
  -                           log.trace("removing from clientpool and adding to threadpool " + this);
  +                  log.trace(ste);
                           }
  -                        clientpool.remove(this);
  -                        threadpool.add(this);
  -                        Thread.interrupted(); // clear any interruption so that we can be pooled.
  -                        clientpool.notify();
                        }
  +            running = false;
                     }
  +         catch (InterruptedIOException e)
  +         {
  +            if (!shutdown)
  +            {
  +               log.error("Socket IO interrupted", e);
                  }
  +            running = false;
   
  -               try
  +         }
  +         catch (InterruptedException e)
                  {
                     if(trace)
                     {
  -                     log.trace("begin thread wait");
  +               log.trace(e);
                     }
  -                  this.wait();
  -                  if(trace)
  +            if (!shutdown)
                     {
  -                     log.trace("WAKEUP in SERVER THREAD");
  +               log.error("interrupted", e);
                     }
                  }
  -               catch (InterruptedException e)
  +         catch (EOFException eof)
                  {
  -                  if (shutdown)
  +            if (!shutdown && trace)
                     {
  -                     invoker = null;
  -                     return; // exit thread
  -                  }
  -
  -                  throw e;
  -               }
  -            }
  +               log.trace("EOFException received. This is likely due to client finishing communication.");
            }
  +            running = false;
         }
  -      catch (Exception ignored)
  +         catch (SocketException sex)
         {
  -         log.debug("Exiting run on exception", ignored);
  -      }
  -   }
  -
  -   public String toString()
  +            if (!shutdown && trace)
      {
  -      return getName();
  +               log.trace("SocketException received. This is likely due to client disconnecting and resetting connection.");
      }
  -
  -   private String getWorkerThreadName(Socket currentSocket)
  +            running = false;
  +         }
  +         catch (Exception ex)
      {
  -      if (id == Integer.MIN_VALUE)
  +            if (!shutdown)
         {
  -         id = nextID();
  +               log.error("failed", ex);
  +               running = false;
         }
  -
  -      StringBuffer sb = new StringBuffer("WorkerThread#");
  -      sb.append(id).append('[');
  -      sb.append(currentSocket.getInetAddress().getHostAddress());
  -      sb.append(':');
  -      sb.append(currentSocket.getPort());
  -      sb.append(']');
  -
  -      return sb.toString();
  +         }
  +         // clear any interruption so that thread can be pooled.
  +         handlingResponse = false;
  +         Thread.interrupted();
      }
   
  -   protected void acknowledge() throws Exception
  -   {
  -      if (shouldCheckConnection)
  -      {
  -
  -         // HERE IS THE RACE between ACK received and handlingResponse = true
  -         // We can't synchronize because readByte blocks and client is expecting
  -         // a response and we don't want to hang client.
  -         // see shutdown and evict for more details
  -         // There may not be a problem because interrupt only effects
  -         // threads blocking on IO. and this thread will just continue.
  -         handlingResponse = true;
  -
  +      // Ok, we've been shutdown.  Do appropriate cleanups.
            try
            {
  -            if(trace) { log.trace("checking connection"); }
  -            socketWrapper.checkConnection();
  -         }
  -         catch (EOFException e)
  +         if (socketWrapper != null)
            {
  -            throw new AcknowledgeFailure();
  -         }
  -         catch (SocketException se)
  +            InputStream in = socketWrapper.getInputStream();
  +            if (in != null)
            {
  -            throw new AcknowledgeFailure();
  +               in.close();
            }
  -         catch (IOException ioe)
  +            OutputStream out = socketWrapper.getOutputStream();
  +            if (out != null)
            {
  -            throw new AcknowledgeFailure();
  -         }
  -
  -         handlingResponse = false;
  +               out.close();
         }
      }
  -
  -   protected Object versionedRead(InputStream inputStream, ServerInvoker invoker,
  -                                  ClassLoader classLoader, int version)
  -      throws IOException, ClassNotFoundException
  -   {
  -      //TODO: -TME - Should I even botther to check for version here?  Only one way to do processing
  -      //             at this point, regardless of version.
  -      switch (version)
  -      {
  -         case Version.VERSION_1:
  -         case Version.VERSION_2:
  +      }
  +      catch (Exception ex)
            {
  -            //TODO: -TME Need better way to get the unmarshaller (via config)
  -            UnMarshaller unmarshaller = MarshalFactory.
  -               getUnMarshaller(invoker.getLocator(), classLoader);
  +         log.debug("failed to close in/out", ex);
  +      }
   
  -            if (unmarshaller == null)
  +      try
               {
  -               unmarshaller = MarshalFactory.
  -                  getUnMarshaller(invoker.getDataType(), invoker.getSerializationType());
  +         if (socketWrapper != null)
  +         {
  +            socketWrapper.close();
               }
  -
  -            if(trace) { log.trace("blocking to read invocation from the input stream"); }
  -
  -            Object o = unmarshaller.read(inputStream, null);
  -
  -            if(trace) { log.trace("read " + o + " from input stream"); }
  -
  -            return o;
            }
  -         default:
  +      catch (Exception ex)
            {
  -            throw new IOException("Can not read data for version " + version +
  -               ".  Supported versions: " + Version.VERSION_1 + "," + Version.VERSION_2);
  -         }
  +         log.error("failed to close socket wrapper", ex);
         }
  +      socketWrapper = null;
      }
   
  -   protected void processInvocation() throws Exception
  +   protected void processInvocation(SocketWrapper socketWrapper) throws Exception
      {
         if(trace) { log.trace("processing invocation"); }
   
  @@ -558,6 +575,117 @@
         lastRequestHandledTimestamp = System.currentTimeMillis();
      }
   
  +   protected void acknowledge(SocketWrapper socketWrapper) throws Exception
  +   {
  +      if (shouldCheckConnection)
  +      {
  +         // HERE IS THE RACE between ACK received and handlingResponse = true. We can't synchronize
  +         // because readByte blocks and client is expecting a response and we don't want to hang
  +         // client. See shutdown and evict for more details. There may not be a problem because
  +         // interrupt only effects threads blocking on IO. and this thread will just continue.
  +
  +         handlingResponse = true;
  +
  +         try
  +         {
  +            if(trace) { log.trace("checking connection"); }
  +            socketWrapper.checkConnection();
  +         }
  +         catch (EOFException e)
  +         {
  +            throw new AcknowledgeFailure();
  +         }
  +         catch (SocketException se)
  +         {
  +            throw new AcknowledgeFailure();
  +         }
  +         catch (IOException ioe)
  +         {
  +            throw new AcknowledgeFailure();
  +         }
  +
  +         handlingResponse = false;
  +      }
  +   }
  +
  +   protected Object versionedRead(InputStream inputStream, ServerInvoker invoker,
  +                                  ClassLoader classLoader, int version)
  +      throws IOException, ClassNotFoundException
  +   {
  +      //TODO: -TME - Should I even botther to check for version here?  Only one way to do processing
  +      //             at this point, regardless of version.
  +      switch (version)
  +      {
  +         case Version.VERSION_1:
  +         case Version.VERSION_2:
  +         {
  +            //TODO: -TME Need better way to get the unmarshaller (via config)
  +            UnMarshaller unmarshaller = MarshalFactory.
  +               getUnMarshaller(invoker.getLocator(), classLoader);
  +
  +            if (unmarshaller == null)
  +            {
  +               unmarshaller = MarshalFactory.
  +                  getUnMarshaller(invoker.getDataType(), invoker.getSerializationType());
  +            }
  +
  +            if(trace) { log.trace("blocking to read invocation from the input stream"); }
  +
  +            Object o = unmarshaller.read(inputStream, null);
  +
  +            if(trace) { log.trace("read " + o + " from input stream"); }
  +
  +            return o;
  +         }
  +         default:
  +         {
  +            throw new IOException("Can not read data for version " + version +
  +               ".  Supported versions: " + Version.VERSION_1 + "," + Version.VERSION_2);
  +         }
  +      }
  +   }
  +
  +   // Private --------------------------------------------------------------------------------------
  +
  +   private SocketWrapper createServerSocketWrapper(Socket socket, int timeout, Map metadata)
  +      throws Exception
  +   {
  +      if (serverSocketConstructor == null)
  +      {
  +         if(serverSocketClass == null)
  +         {
  +            serverSocketClass = ClassLoaderUtility.loadClass(serverSocketClassName, getClass());
  +         }
  +
  +         try
  +         {
  +            serverSocketConstructor = serverSocketClass.
  +               getConstructor(new Class[]{Socket.class, Map.class, Integer.class});
  +         }
  +         catch (NoSuchMethodException e)
  +         {
  +            serverSocketConstructor = serverSocketClass.getConstructor(new Class[]{Socket.class});
  +         }
  +
  +      }
  +
  +      SocketWrapper serverSocketWrapper = null;
  +
  +      if (serverSocketConstructor.getParameterTypes().length == 3)
  +      {
  +         serverSocketWrapper = (SocketWrapper)serverSocketConstructor.
  +            newInstance(new Object[]{socket, metadata, new Integer(timeout)});
  +      }
  +      else
  +      {
  +         serverSocketWrapper =
  +            (SocketWrapper)serverSocketConstructor.newInstance(new Object[]{socket});
  +
  +         serverSocketWrapper.setTimeout(timeout);
  +      }
  +      return serverSocketWrapper;
  +   }
  +
      private boolean isOneway(Map metadata)
      {
         boolean isOneway = false;
  @@ -592,7 +720,6 @@
                                         invoker.getSupportedSubsystems()[0],
                                         obj, null, null, null);
         }
  -
      }
   
      private void versionedWrite(OutputStream outputStream, SocketServerInvoker invoker,
  @@ -608,7 +735,8 @@
   
               if (marshaller == null)
               {
  -               marshaller = MarshalFactory.getMarshaller(invoker.getDataType(), invoker.getSerializationType());
  +               marshaller = MarshalFactory.
  +                  getMarshaller(invoker.getDataType(), invoker.getSerializationType());
               }
   
               marshaller.write(resp, outputStream);
  @@ -616,10 +744,10 @@
            }
            default:
            {
  -            throw new IOException("Can not write data for version " + version + ".  Supported version: " + Version.VERSION_1 + ", " + Version.VERSION_2);
  +            throw new IOException("Can not write data for version " + version +
  +               ".  Supported version: " + Version.VERSION_1 + ", " + Version.VERSION_2);
            }
         }
  -
      }
   
      private int readVersion(InputStream inputStream) throws IOException
  @@ -638,131 +766,24 @@
         outputStream.write(version);
      }
   
  -   /**
  -    * This is needed because Object*Streams leak
  -    */
  -   protected void dorun()
  -   {
  -      if(trace) { log.trace("beginning dorun()"); }
  -
  -      running = true;
  -      handlingResponse = true;
  -
  -      // Always do first one without an ACK because its not needed
  -      try
  +   private String getWorkerThreadName(Socket currentSocket)
         {
  -         if(trace) { log.trace("processing first invocation without acknowledging"); }
  -         processInvocation();
  -      }
  -      catch (Exception ex)
  +      if (id == Integer.MIN_VALUE)
         {
  -         log.error("failed to process invocation!", ex);
  -         running = false;
  +         id = nextID();
         }
   
  -      // Re-use loop
  -      while (running)
  -      {
  -         try
  -         {
  -            acknowledge();
  -            processInvocation();
  -         }
  -         catch (AcknowledgeFailure e)
  -         {
  -            if (!shutdown && trace)
  -            {
  -               log.trace("keep alive acknowledge failed!");
  -            }
  -            running = false;
  -         }
  -         catch(SocketTimeoutException ste)
  -         {
  -            if(!shutdown)
  -            {
  -               if(trace)
  -               {
  -                  log.trace(ste);
  -               }
  -            }
  -            running = false;
  -         }
  -         catch (InterruptedIOException e)
  -         {
  -            if (!shutdown)
  -            {
  -               log.error("Socket IO interrupted", e);
  -            }
  -            running = false;
  +      StringBuffer sb = new StringBuffer("WorkerThread#");
  +      sb.append(id).append('[');
  +      sb.append(currentSocket.getInetAddress().getHostAddress());
  +      sb.append(':');
  +      sb.append(currentSocket.getPort());
  +      sb.append(']');
   
  -         }
  -         catch (InterruptedException e)
  -         {
  -            if(trace)
  -            {
  -               log.trace(e);
  -            }
  -            if (!shutdown)
  -            {
  -               log.error("interrupted", e);
  -            }
  -         }
  -         catch (EOFException eof)
  -         {
  -            if (!shutdown && trace)
  -            {
  -               log.trace("EOFException received. This is likely due to client finishing communication.");
  -            }
  -            running = false;
  -         }
  -         catch (SocketException sex)
  -         {
  -            if (!shutdown && trace)
  -            {
  -               log.trace("SocketException received. This is likely due to client disconnecting and resetting connection.");
  -            }
  -            running = false;
  -         }
  -         catch (Exception ex)
  -         {
  -            if (!shutdown)
  -            {
  -               log.error("failed", ex);
  -               running = false;
  -            }
  -         }
  -         // clear any interruption so that thread can be pooled.
  -         handlingResponse = false;
  -         Thread.interrupted();
  +      return sb.toString();
         }
   
  -      // Ok, we've been shutdown.  Do appropriate cleanups.
  -      try
  -      {
  -         InputStream in = socketWrapper.getInputStream();
  -         if (in != null)
  -         {
  -            in.close();
  -         }
  -         OutputStream out = socketWrapper.getOutputStream();
  -         if (out != null)
  -         {
  -            out.close();
  -         }
  -      }
  -      catch (Exception ex)
  -      {
  -      }
  -      try
  -      {
  -         socketWrapper.close();
  -      }
  -      catch (Exception ex)
  -      {
  -         log.error("Failed cleanup", ex);
  -      }
  -      socketWrapper = null;
  -   }
  +   // Inner classes --------------------------------------------------------------------------------
   
      public static class AcknowledgeFailure extends Exception
      {
  
  
  
  1.30.2.8  +2 -2      JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: SocketServerInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java,v
  retrieving revision 1.30.2.7
  retrieving revision 1.30.2.8
  diff -u -b -r1.30.2.7 -r1.30.2.8
  --- SocketServerInvoker.java	16 Jan 2007 08:15:05 -0000	1.30.2.7
  +++ SocketServerInvoker.java	16 Jan 2007 14:17:15 -0000	1.30.2.8
  @@ -49,7 +49,7 @@
    * @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    *
  - * @version $Revision: 1.30.2.7 $
  + * @version $Revision: 1.30.2.8 $
    * @jmx:mbean
    */
   public class SocketServerInvoker extends ServerInvoker implements Runnable, SocketServerInvokerMBean
  @@ -603,7 +603,7 @@
         else
         {
            if(trace) { log.trace(this + " reusing " + worker); }
  -         worker.wakeup(socket, getTimeout(), locator.getParameters());
  +         worker.wakeup(socket, getTimeout(), this);
         }
      }
   
  
  
  



More information about the jboss-cvs-commits mailing list