[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...
Ovidiu Feodorov
ovidiu.feodorov at jboss.com
Tue Jan 16 09:17:15 EST 2007
User: ovidiu
Date: 07/01/16 09:17:15
Modified: src/main/org/jboss/remoting/transport/socket Tag:
remoting_2_x ServerThread.java
SocketServerInvoker.java
Log:
Fixed http://jira.jboss.org/jira/browse/JBREM-666 by moving all I/O initialization-related
operations from the acceptor thread to the worker thread.
Revision Changes Path
No revision
No revision
1.29.2.9 +398 -377 JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerThread.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java,v
retrieving revision 1.29.2.8
retrieving revision 1.29.2.9
diff -u -b -r1.29.2.8 -r1.29.2.9
--- ServerThread.java 16 Jan 2007 08:15:05 -0000 1.29.2.8
+++ ServerThread.java 16 Jan 2007 14:17:15 -0000 1.29.2.9
@@ -61,50 +61,55 @@
* @author <a href="mailto:tom at jboss.org">Tom Elrod</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
- * @version $Revision: 1.29.2.8 $
+ * @version $Revision: 1.29.2.9 $
*/
public class ServerThread extends Thread
{
+ // Constants ------------------------------------------------------------------------------------
+
final static private Logger log = Logger.getLogger(ServerThread.class);
- private static final boolean trace = log.isTraceEnabled();
+ // Static ---------------------------------------------------------------------------------------
+
+ private static boolean trace = log.isTraceEnabled();
+
+ private static int idGenerator = 0;
+
+ public static synchronized int nextID()
+ {
+ return idGenerator++;
+ }
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ protected volatile boolean running;
+ protected volatile boolean handlingResponse;
+ protected volatile boolean shutdown;
- protected SocketServerInvoker invoker;
protected LRUPool clientpool;
protected LinkedList threadpool;
- protected volatile boolean running = true;
- protected volatile boolean handlingResponse = true; // start off as true so that nobody can interrupt us
- protected volatile boolean shutdown = false;
-
- protected static int idGenerator = 0;
-
- private SocketWrapper socketWrapper = null;
- protected String serverSocketClassName = ServerSocketWrapper.class.getName();
- protected Class serverSocketClass = null;
- private Constructor serverSocketConstructor = null;
+
+ protected String serverSocketClassName;
+ protected Class serverSocketClass;
+
+ private Socket socket;
+ private int timeout;
+ protected SocketServerInvoker invoker;
+ private Constructor serverSocketConstructor;
// the unique identity of the thread, which won't change during the life of the thread. The
// thread may get associated with different IP addresses though.
private int id = Integer.MIN_VALUE;
- /**
- * Indicates if will check the socket connection when
- * getting from pool by sending byte over the connection
- * to validate is still good.
- */
- private boolean shouldCheckConnection = false;
+ // Indicates if will check the socket connection when getting from pool by sending byte over the
+ // connection to validate is still good.
+ private boolean shouldCheckConnection;
- /**
- * Will indicate when the last request has been processed (used in determining
- * idle connection/thread timeout)
- */
+ // Will indicate when the last request has been processed (used in determining idle
+ // connection/thread timeout)
private long lastRequestHandledTimestamp = System.currentTimeMillis();
-
- public static synchronized int nextID()
- {
- return idGenerator++;
- }
+ // Constructors ---------------------------------------------------------------------------------
public ServerThread(Socket socket, SocketServerInvoker invoker, LRUPool clientpool,
LinkedList threadpool, int timeout, String serverSocketClassName)
@@ -112,10 +117,14 @@
{
super();
+ running = true;
+ handlingResponse = true; // start off as true so that nobody can interrupt us
+
setName(getWorkerThreadName(socket));
+ this.socket = socket;
+ this.timeout = timeout;
this.serverSocketClassName = serverSocketClassName;
- this.socketWrapper = createServerSocketWrapper(socket, timeout, invoker.getLocator().getParameters());
this.invoker = invoker;
this.clientpool = clientpool;
this.threadpool = threadpool;
@@ -131,6 +140,100 @@
}
}
+ // Thread overrides -----------------------------------------------------------------------------
+
+ public void run()
+ {
+ try
+ {
+ while (true)
+ {
+ dorun();
+
+ // The following code has been changed to eliminate a race condition with
+ // SocketServerInvoker.cleanup().
+ //
+ // A ServerThread can shutdown for two reasons:
+ // 1. the client shuts down, and
+ // 2. the server shuts down.
+ //
+ // If both occur around the same time, a problem arises. If a ServerThread starts to
+ // shut down because the client shut down, it will test shutdown, and if it gets to the
+ // test before SocketServerInvoker.cleanup() calls ServerThread.stop() to set shutdown
+ // to true, it will return itself to threadpool. If it moves from clientpool to
+ // threadpool at just the right time, SocketServerInvoker could miss it in both places
+ // and never call stop(), leaving it alive, resulting in a memory leak. The solution is
+ // to synchronize parts of ServerThread.run() and SocketServerInvoker.cleanup() so that
+ // they interact atomically.
+
+ synchronized (this)
+ {
+ synchronized (clientpool)
+ {
+ synchronized (threadpool)
+ {
+ if (shutdown)
+ {
+ invoker = null;
+ return; // exit thread
+ }
+ else
+ {
+ if(trace) { log.trace(this + " removing itself from clientpool and going to threadpool"); }
+ clientpool.remove(this);
+ threadpool.add(this);
+ Thread.interrupted(); // clear any interruption so that we can be pooled.
+ clientpool.notify();
+ }
+ }
+ }
+
+ try
+ {
+ if(trace) { log.trace(this + " begins to wait"); }
+
+ wait();
+
+ if(trace) { log.trace(this + " woke up after wait"); }
+ }
+ catch (InterruptedException e)
+ {
+ if (shutdown)
+ {
+ invoker = null;
+ return; // exit thread
+ }
+
+ throw e;
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ log.debug(this + " exiting run on exception, definitively thrown out of the threadpool", e);
+ }
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public synchronized void wakeup(Socket socket, int timeout, SocketServerInvoker invoker)
+ throws Exception
+ {
+ // rename the worker thread to reflect the new socket it is handling
+ setName(getWorkerThreadName(socket));
+
+ this.socket = socket;
+ this.timeout = timeout;
+ this.invoker = invoker;
+
+ running = true;
+ handlingResponse = true;
+ notify();
+
+ if(trace) { log.trace(this + " has notified on mutex"); }
+ }
+
public long getLastRequestTimestamp()
{
return lastRequestHandledTimestamp;
@@ -140,14 +243,13 @@
{
shutdown = true;
running = false;
- // This is a race and there is a chance
- // that a invocation is going on at the time
- // of the interrupt. But I see no way right
- // now to protect for this.
-
- // NOTE ALSO!:
- // Shutdown should never be synchronized.
- // We don't want to hold up accept() thread! (via LRUpool)
+
+ // This is a race and there is a chance that a invocation is going on at the time of the
+ // interrupt. But I see no way right now to protect for this.
+
+ // NOTE ALSO!: Shutdown should never be synchronized. We don't want to hold up accept()
+ // thread! (via LRUpool)
+
if (!handlingResponse)
{
try
@@ -159,15 +261,12 @@
{
}
}
-
}
/**
- * Sets if server thread should check connection before continue to process on
- * next invocation request. If is set to true, will send an ACK to client to
- * verify client is still connected on same socket.
- *
- * @param checkConnection
+ * Sets if server thread should check connection before continue to process on next invocation
+ * request. If is set to true, will send an ACK to client to verify client is still connected
+ * on same socket.
*/
public void shouldCheckConnection(boolean checkConnection)
{
@@ -175,67 +274,24 @@
}
/**
- * Indicates if server will check with client (via an ACK) to
- * see if is still there.
+ * Indicates if server will check with client (via an ACK) to see if is still there.
*/
public boolean getCheckingConnection()
{
return this.shouldCheckConnection;
}
- private SocketWrapper createServerSocketWrapper(Socket socket, int timeout, Map metadata)
- throws Exception
- {
- if (serverSocketConstructor == null)
- {
- if(serverSocketClass == null)
- {
- serverSocketClass = ClassLoaderUtility.loadClass(serverSocketClassName, getClass());
- }
-
- try
- {
- serverSocketConstructor = serverSocketClass.
- getConstructor(new Class[]{Socket.class, Map.class, Integer.class});
- }
- catch (NoSuchMethodException e)
- {
- serverSocketConstructor = serverSocketClass.getConstructor(new Class[]{Socket.class});
- }
-
- }
-
- SocketWrapper serverSocketWrapper = null;
-
- if (serverSocketConstructor.getParameterTypes().length == 3)
- {
- serverSocketWrapper = (SocketWrapper)serverSocketConstructor.
- newInstance(new Object[]{socket, metadata, new Integer(timeout)});
- }
- else
+ public void evict()
{
- serverSocketWrapper =
- (SocketWrapper)serverSocketConstructor.newInstance(new Object[]{socket});
- serverSocketWrapper.setTimeout(timeout);
- }
+ running = false;
- return serverSocketWrapper;
- }
+ // This is a race and there is a chance that a invocation is going on at the time of the
+ // interrupt. But I see no way right now to protect for this. There may not be a problem
+ // because interrupt only effects threads blocking on IO.
+ // NOTE ALSO!: Shutdown should never be synchronized. We don't want to hold up accept()
+ // thread! (via LRUpool)
- public void evict()
- {
- running = false;
- // This is a race and there is a chance
- // that a invocation is going on at the time
- // of the interrupt. But I see no way right
- // now to protect for this.
- // There may not be a problem because interrupt only effects
- // threads blocking on IO.
-
- // NOTE ALSO!:
- // Shutdown should never be synchronized.
- // We don't want to hold up accept() thread! (via LRUpool)
if (!handlingResponse)
{
try
@@ -250,16 +306,14 @@
}
/**
- * This method is intended to be used when need to unblock
- * I/O read, which the thread will automatically loop back to
- * do after processing a request. Calling this method will cause
- * the underlying socket to be closed.
+ * This method is intended to be used when need to unblock I/O read, which the thread will
+ * automatically loop back to do after processing a request.
*/
public void unblock()
{
try
{
- socketWrapper.close();
+ socket.close();
}
catch (IOException e)
{
@@ -267,196 +321,159 @@
}
}
-
- public synchronized void wakeup(Socket socket, int timeout, Map metadata) throws Exception
+ public String toString()
{
- // rename the worker thread to reflect the new socket it is handling
- setName(getWorkerThreadName(socket));
+ return getName();
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
- socketWrapper = createServerSocketWrapper(socket, timeout, metadata);
+ /**
+ * This is needed because Object*Streams leak
+ */
+ protected void dorun()
+ {
+ if(trace) { log.trace("beginning dorun()"); }
running = true;
handlingResponse = true;
- notify();
- if(trace) { log.trace(this + " has woken up"); }
- }
+ // lazy initialize the socketWrapper on the worker thread itself. We do this to avoid to have
+ // it done on the acceptor thread (prone to lockup)
+ SocketWrapper socketWrapper = null;
- public void run()
- {
try
{
- while (true)
+ if(trace) { log.trace("creating the socket wrapper"); }
+
+ socketWrapper =
+ createServerSocketWrapper(socket, timeout, invoker.getLocator().getParameters());
+
+ // Always do first one without an ACK because its not needed
+ if(trace) { log.trace("processing first invocation without acknowledging"); }
+ processInvocation(socketWrapper);
+ }
+ catch (Exception ex)
{
- dorun();
+ log.error("Worker thread initialization failure", ex);
+ running = false;
+ }
-/*
-* The following code has been changed to eliminate a race condition with SocketServerInvoker.cleanup
-().
-* A ServerThread can shutdown for two reasons:
-*
-* 1. the client shuts down, and
-* 2. the server shuts down.
-*
-* If both occur around the same time, a problem arises. If a ServerThread starts to shut
-* down because the client shut down, it will test shutdown, and if it gets to the test
-* before SocketServerInvoker.cleanup() calls ServerThread.stop() to set shutdown to true, it
-* will return itself to threadpool. If it moves from clientpool to threadpool at just the
-* right time, SocketServerInvoker could miss it in both places and never call stop(), leaving
-* it alive, resulting in a memory leak. The solution is to synchronize parts of
-* ServerThread.run() and SocketServerInvoker.cleanup() so that they interact atomically.
-*/
- synchronized (this)
+ // Re-use loop
+ while (running)
{
- synchronized (clientpool)
+ try
{
- synchronized (threadpool)
+ acknowledge(socketWrapper);
+ processInvocation(socketWrapper);
+ }
+ catch (AcknowledgeFailure e)
{
- if (shutdown)
+ if (!shutdown && trace)
{
- invoker = null;
- return; // exit thread
+ log.trace("keep alive acknowledge failed!");
}
- else
+ running = false;
+ }
+ catch(SocketTimeoutException ste)
+ {
+ if(!shutdown)
{
if(trace)
{
- log.trace("removing from clientpool and adding to threadpool " + this);
+ log.trace(ste);
}
- clientpool.remove(this);
- threadpool.add(this);
- Thread.interrupted(); // clear any interruption so that we can be pooled.
- clientpool.notify();
}
+ running = false;
}
+ catch (InterruptedIOException e)
+ {
+ if (!shutdown)
+ {
+ log.error("Socket IO interrupted", e);
}
+ running = false;
- try
+ }
+ catch (InterruptedException e)
{
if(trace)
{
- log.trace("begin thread wait");
+ log.trace(e);
}
- this.wait();
- if(trace)
+ if (!shutdown)
{
- log.trace("WAKEUP in SERVER THREAD");
+ log.error("interrupted", e);
}
}
- catch (InterruptedException e)
+ catch (EOFException eof)
{
- if (shutdown)
+ if (!shutdown && trace)
{
- invoker = null;
- return; // exit thread
- }
-
- throw e;
- }
- }
+ log.trace("EOFException received. This is likely due to client finishing communication.");
}
+ running = false;
}
- catch (Exception ignored)
+ catch (SocketException sex)
{
- log.debug("Exiting run on exception", ignored);
- }
- }
-
- public String toString()
+ if (!shutdown && trace)
{
- return getName();
+ log.trace("SocketException received. This is likely due to client disconnecting and resetting connection.");
}
-
- private String getWorkerThreadName(Socket currentSocket)
+ running = false;
+ }
+ catch (Exception ex)
{
- if (id == Integer.MIN_VALUE)
+ if (!shutdown)
{
- id = nextID();
+ log.error("failed", ex);
+ running = false;
}
-
- StringBuffer sb = new StringBuffer("WorkerThread#");
- sb.append(id).append('[');
- sb.append(currentSocket.getInetAddress().getHostAddress());
- sb.append(':');
- sb.append(currentSocket.getPort());
- sb.append(']');
-
- return sb.toString();
+ }
+ // clear any interruption so that thread can be pooled.
+ handlingResponse = false;
+ Thread.interrupted();
}
- protected void acknowledge() throws Exception
- {
- if (shouldCheckConnection)
- {
-
- // HERE IS THE RACE between ACK received and handlingResponse = true
- // We can't synchronize because readByte blocks and client is expecting
- // a response and we don't want to hang client.
- // see shutdown and evict for more details
- // There may not be a problem because interrupt only effects
- // threads blocking on IO. and this thread will just continue.
- handlingResponse = true;
-
+ // Ok, we've been shutdown. Do appropriate cleanups.
try
{
- if(trace) { log.trace("checking connection"); }
- socketWrapper.checkConnection();
- }
- catch (EOFException e)
+ if (socketWrapper != null)
{
- throw new AcknowledgeFailure();
- }
- catch (SocketException se)
+ InputStream in = socketWrapper.getInputStream();
+ if (in != null)
{
- throw new AcknowledgeFailure();
+ in.close();
}
- catch (IOException ioe)
+ OutputStream out = socketWrapper.getOutputStream();
+ if (out != null)
{
- throw new AcknowledgeFailure();
- }
-
- handlingResponse = false;
+ out.close();
}
}
-
- protected Object versionedRead(InputStream inputStream, ServerInvoker invoker,
- ClassLoader classLoader, int version)
- throws IOException, ClassNotFoundException
- {
- //TODO: -TME - Should I even botther to check for version here? Only one way to do processing
- // at this point, regardless of version.
- switch (version)
- {
- case Version.VERSION_1:
- case Version.VERSION_2:
+ }
+ catch (Exception ex)
{
- //TODO: -TME Need better way to get the unmarshaller (via config)
- UnMarshaller unmarshaller = MarshalFactory.
- getUnMarshaller(invoker.getLocator(), classLoader);
+ log.debug("failed to close in/out", ex);
+ }
- if (unmarshaller == null)
+ try
{
- unmarshaller = MarshalFactory.
- getUnMarshaller(invoker.getDataType(), invoker.getSerializationType());
+ if (socketWrapper != null)
+ {
+ socketWrapper.close();
}
-
- if(trace) { log.trace("blocking to read invocation from the input stream"); }
-
- Object o = unmarshaller.read(inputStream, null);
-
- if(trace) { log.trace("read " + o + " from input stream"); }
-
- return o;
}
- default:
+ catch (Exception ex)
{
- throw new IOException("Can not read data for version " + version +
- ". Supported versions: " + Version.VERSION_1 + "," + Version.VERSION_2);
- }
+ log.error("failed to close socket wrapper", ex);
}
+ socketWrapper = null;
}
- protected void processInvocation() throws Exception
+ protected void processInvocation(SocketWrapper socketWrapper) throws Exception
{
if(trace) { log.trace("processing invocation"); }
@@ -558,6 +575,117 @@
lastRequestHandledTimestamp = System.currentTimeMillis();
}
+ protected void acknowledge(SocketWrapper socketWrapper) throws Exception
+ {
+ if (shouldCheckConnection)
+ {
+ // HERE IS THE RACE between ACK received and handlingResponse = true. We can't synchronize
+ // because readByte blocks and client is expecting a response and we don't want to hang
+ // client. See shutdown and evict for more details. There may not be a problem because
+ // interrupt only effects threads blocking on IO. and this thread will just continue.
+
+ handlingResponse = true;
+
+ try
+ {
+ if(trace) { log.trace("checking connection"); }
+ socketWrapper.checkConnection();
+ }
+ catch (EOFException e)
+ {
+ throw new AcknowledgeFailure();
+ }
+ catch (SocketException se)
+ {
+ throw new AcknowledgeFailure();
+ }
+ catch (IOException ioe)
+ {
+ throw new AcknowledgeFailure();
+ }
+
+ handlingResponse = false;
+ }
+ }
+
+ protected Object versionedRead(InputStream inputStream, ServerInvoker invoker,
+ ClassLoader classLoader, int version)
+ throws IOException, ClassNotFoundException
+ {
+ //TODO: -TME - Should I even botther to check for version here? Only one way to do processing
+ // at this point, regardless of version.
+ switch (version)
+ {
+ case Version.VERSION_1:
+ case Version.VERSION_2:
+ {
+ //TODO: -TME Need better way to get the unmarshaller (via config)
+ UnMarshaller unmarshaller = MarshalFactory.
+ getUnMarshaller(invoker.getLocator(), classLoader);
+
+ if (unmarshaller == null)
+ {
+ unmarshaller = MarshalFactory.
+ getUnMarshaller(invoker.getDataType(), invoker.getSerializationType());
+ }
+
+ if(trace) { log.trace("blocking to read invocation from the input stream"); }
+
+ Object o = unmarshaller.read(inputStream, null);
+
+ if(trace) { log.trace("read " + o + " from input stream"); }
+
+ return o;
+ }
+ default:
+ {
+ throw new IOException("Can not read data for version " + version +
+ ". Supported versions: " + Version.VERSION_1 + "," + Version.VERSION_2);
+ }
+ }
+ }
+
+ // Private --------------------------------------------------------------------------------------
+
+ private SocketWrapper createServerSocketWrapper(Socket socket, int timeout, Map metadata)
+ throws Exception
+ {
+ if (serverSocketConstructor == null)
+ {
+ if(serverSocketClass == null)
+ {
+ serverSocketClass = ClassLoaderUtility.loadClass(serverSocketClassName, getClass());
+ }
+
+ try
+ {
+ serverSocketConstructor = serverSocketClass.
+ getConstructor(new Class[]{Socket.class, Map.class, Integer.class});
+ }
+ catch (NoSuchMethodException e)
+ {
+ serverSocketConstructor = serverSocketClass.getConstructor(new Class[]{Socket.class});
+ }
+
+ }
+
+ SocketWrapper serverSocketWrapper = null;
+
+ if (serverSocketConstructor.getParameterTypes().length == 3)
+ {
+ serverSocketWrapper = (SocketWrapper)serverSocketConstructor.
+ newInstance(new Object[]{socket, metadata, new Integer(timeout)});
+ }
+ else
+ {
+ serverSocketWrapper =
+ (SocketWrapper)serverSocketConstructor.newInstance(new Object[]{socket});
+
+ serverSocketWrapper.setTimeout(timeout);
+ }
+ return serverSocketWrapper;
+ }
+
private boolean isOneway(Map metadata)
{
boolean isOneway = false;
@@ -592,7 +720,6 @@
invoker.getSupportedSubsystems()[0],
obj, null, null, null);
}
-
}
private void versionedWrite(OutputStream outputStream, SocketServerInvoker invoker,
@@ -608,7 +735,8 @@
if (marshaller == null)
{
- marshaller = MarshalFactory.getMarshaller(invoker.getDataType(), invoker.getSerializationType());
+ marshaller = MarshalFactory.
+ getMarshaller(invoker.getDataType(), invoker.getSerializationType());
}
marshaller.write(resp, outputStream);
@@ -616,10 +744,10 @@
}
default:
{
- throw new IOException("Can not write data for version " + version + ". Supported version: " + Version.VERSION_1 + ", " + Version.VERSION_2);
+ throw new IOException("Can not write data for version " + version +
+ ". Supported version: " + Version.VERSION_1 + ", " + Version.VERSION_2);
}
}
-
}
private int readVersion(InputStream inputStream) throws IOException
@@ -638,131 +766,24 @@
outputStream.write(version);
}
- /**
- * This is needed because Object*Streams leak
- */
- protected void dorun()
- {
- if(trace) { log.trace("beginning dorun()"); }
-
- running = true;
- handlingResponse = true;
-
- // Always do first one without an ACK because its not needed
- try
+ private String getWorkerThreadName(Socket currentSocket)
{
- if(trace) { log.trace("processing first invocation without acknowledging"); }
- processInvocation();
- }
- catch (Exception ex)
+ if (id == Integer.MIN_VALUE)
{
- log.error("failed to process invocation!", ex);
- running = false;
+ id = nextID();
}
- // Re-use loop
- while (running)
- {
- try
- {
- acknowledge();
- processInvocation();
- }
- catch (AcknowledgeFailure e)
- {
- if (!shutdown && trace)
- {
- log.trace("keep alive acknowledge failed!");
- }
- running = false;
- }
- catch(SocketTimeoutException ste)
- {
- if(!shutdown)
- {
- if(trace)
- {
- log.trace(ste);
- }
- }
- running = false;
- }
- catch (InterruptedIOException e)
- {
- if (!shutdown)
- {
- log.error("Socket IO interrupted", e);
- }
- running = false;
+ StringBuffer sb = new StringBuffer("WorkerThread#");
+ sb.append(id).append('[');
+ sb.append(currentSocket.getInetAddress().getHostAddress());
+ sb.append(':');
+ sb.append(currentSocket.getPort());
+ sb.append(']');
- }
- catch (InterruptedException e)
- {
- if(trace)
- {
- log.trace(e);
- }
- if (!shutdown)
- {
- log.error("interrupted", e);
- }
- }
- catch (EOFException eof)
- {
- if (!shutdown && trace)
- {
- log.trace("EOFException received. This is likely due to client finishing communication.");
- }
- running = false;
- }
- catch (SocketException sex)
- {
- if (!shutdown && trace)
- {
- log.trace("SocketException received. This is likely due to client disconnecting and resetting connection.");
- }
- running = false;
- }
- catch (Exception ex)
- {
- if (!shutdown)
- {
- log.error("failed", ex);
- running = false;
- }
- }
- // clear any interruption so that thread can be pooled.
- handlingResponse = false;
- Thread.interrupted();
+ return sb.toString();
}
- // Ok, we've been shutdown. Do appropriate cleanups.
- try
- {
- InputStream in = socketWrapper.getInputStream();
- if (in != null)
- {
- in.close();
- }
- OutputStream out = socketWrapper.getOutputStream();
- if (out != null)
- {
- out.close();
- }
- }
- catch (Exception ex)
- {
- }
- try
- {
- socketWrapper.close();
- }
- catch (Exception ex)
- {
- log.error("Failed cleanup", ex);
- }
- socketWrapper = null;
- }
+ // Inner classes --------------------------------------------------------------------------------
public static class AcknowledgeFailure extends Exception
{
1.30.2.8 +2 -2 JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: SocketServerInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java,v
retrieving revision 1.30.2.7
retrieving revision 1.30.2.8
diff -u -b -r1.30.2.7 -r1.30.2.8
--- SocketServerInvoker.java 16 Jan 2007 08:15:05 -0000 1.30.2.7
+++ SocketServerInvoker.java 16 Jan 2007 14:17:15 -0000 1.30.2.8
@@ -49,7 +49,7 @@
* @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
- * @version $Revision: 1.30.2.7 $
+ * @version $Revision: 1.30.2.8 $
* @jmx:mbean
*/
public class SocketServerInvoker extends ServerInvoker implements Runnable, SocketServerInvokerMBean
@@ -603,7 +603,7 @@
else
{
if(trace) { log.trace(this + " reusing " + worker); }
- worker.wakeup(socket, getTimeout(), locator.getParameters());
+ worker.wakeup(socket, getTimeout(), this);
}
}
More information about the jboss-cvs-commits
mailing list