[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...
Ron Sigal
ron_sigal at yahoo.com
Mon Nov 12 01:48:39 EST 2007
User: rsigal
Date: 07/11/12 01:48:39
Modified: src/main/org/jboss/remoting/transport/socket Tag:
remoting_2_x SocketServerInvoker.java
ServerThread.java
Log:
JBREM-807, JBREM-821: Reorganized synchronization during wakeup and shutdown.
Revision Changes Path
No revision
No revision
1.30.2.14 +22 -34 JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: SocketServerInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java,v
retrieving revision 1.30.2.13
retrieving revision 1.30.2.14
diff -u -b -r1.30.2.13 -r1.30.2.14
--- SocketServerInvoker.java 30 Aug 2007 17:48:38 -0000 1.30.2.13
+++ SocketServerInvoker.java 12 Nov 2007 06:48:39 -0000 1.30.2.14
@@ -49,7 +49,7 @@
* @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
- * @version $Revision: 1.30.2.13 $
+ * @version $Revision: 1.30.2.14 $
* @jmx:mbean
*/
public class SocketServerInvoker extends ServerInvoker implements Runnable, SocketServerInvokerMBean
@@ -297,6 +297,14 @@
{
running = false;
+ try
+ {
+ serverSocket.close();
+ }
+ catch(Exception e)
+ {
+ }
+
maxPoolSize = 0; // so ServerThreads don't reinsert themselves
if(acceptThreads != null)
{
@@ -333,8 +341,6 @@
if (threadpool != null)
{
- synchronized(threadpool)
- {
int threadsToShutdown = threadpool.size();
for(int i = 0; i < threadsToShutdown; i++)
{
@@ -344,15 +350,8 @@
}
}
}
- }
- try
- {
- serverSocket.close();
- }
- catch(Exception e)
- {
- }
+ log.debug(this + " exiting");
}
/**
@@ -555,6 +554,7 @@
finally
{
thread.shutdown();
+ log.debug(this + "shut down");
}
}
@@ -570,55 +570,42 @@
ServerThread worker = null;
boolean newThread = false;
- while(worker == null)
+ synchronized(clientpool)
+ {
+ while(worker == null && running)
{
if(trace) { log.trace(this + " trying to get a worker thread from threadpool for processing"); }
- synchronized(threadpool)
- {
if(threadpool.size() > 0)
{
worker = (ServerThread)threadpool.removeFirst();
-
if(trace) { log.trace(this + (worker == null ? " found NO threads in threadpool" : " got " + worker + " from threadpool")); }
}
else if (trace) { { log.trace(this + " has an empty threadpool"); } }
- }
if(worker == null)
{
- synchronized(clientpool)
- {
if(clientpool.size() < maxPoolSize)
{
if(trace) { log.trace(this + " creating new worker thread"); }
worker = new ServerThread(socket, this, clientpool, threadpool,
getTimeout(), serverSocketClass);
-
if(trace) { log.trace(this + " created " + worker); }
-
newThread = true;
}
if(worker == null)
{
if(trace) {log.trace(this + " trying to evict a thread from clientpool"); }
-
clientpool.evict();
-
if(trace) {log.trace(this + " waiting for a thread from clientpool"); }
-
clientpool.wait();
-
if(trace) { log.trace(this + " notified of clientpool thread availability"); }
}
}
}
- }
- synchronized(clientpool)
- {
clientpool.insert(worker, worker);
}
@@ -709,6 +696,7 @@
}
}
}
+ log.debug("ServerSocketRefresh shutting down");
}
/**
1.29.2.25 +70 -43 JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerThread.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java,v
retrieving revision 1.29.2.24
retrieving revision 1.29.2.25
diff -u -b -r1.29.2.24 -r1.29.2.25
--- ServerThread.java 30 Aug 2007 17:35:14 -0000 1.29.2.24
+++ ServerThread.java 12 Nov 2007 06:48:39 -0000 1.29.2.25
@@ -68,7 +68,7 @@
* @author <a href="mailto:tom at jboss.org">Tom Elrod</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
- * @version $Revision: 1.29.2.24 $
+ * @version $Revision: 1.29.2.25 $
*/
public class ServerThread extends Thread
{
@@ -175,35 +175,41 @@
//
// If both occur around the same time, a problem arises. If a ServerThread starts to
// shut down because the client shut down, it will test shutdown, and if it gets to the
- // test before SocketServerInvoker.cleanup() calls ServerThread.stop() to set shutdown
+ // test before SocketServerInvoker.cleanup() calls ServerThread.shutdown() to set shutdown
// to true, it will return itself to threadpool. If it moves from clientpool to
// threadpool at just the right time, SocketServerInvoker could miss it in both places
- // and never call stop(), leaving it alive, resulting in a memory leak. The solution is
+ // and never call shutdown(), leaving it alive, resulting in a memory leak. The solution is
// to synchronize parts of ServerThread.run() and SocketServerInvoker.cleanup() so that
// they interact atomically.
- synchronized (this)
- {
synchronized (clientpool)
{
- synchronized (threadpool)
- {
+ if(trace) { log.trace(this + " removing itself from clientpool"); }
+ clientpool.remove(this);
+ clientpool.notifyAll();
+
if (shutdown)
{
+ if (trace) log.trace(this + " exiting");
invoker = null;
return; // exit thread
}
else
{
- if(trace) { log.trace(this + " removing itself from clientpool and going to threadpool"); }
- clientpool.remove(this);
+ if(trace) { log.trace(this + " returning itself to threadpool"); }
threadpool.add(this);
Thread.interrupted(); // clear any interruption so that we can be pooled.
- clientpool.notify();
- }
}
}
+ synchronized (this)
+ {
+ // If running == true, then SocketServerInvoker has already removed this
+ // ServerThread from threadpool and called wakeup(), in which case run()
+ // should continue immediately.
+ if (running)
+ continue;
+
while (true)
{
try
@@ -214,15 +220,17 @@
if(trace) { log.trace(this + " woke up after wait"); }
- break;
- }
- catch (InterruptedException e)
- {
if (shutdown)
{
invoker = null;
+ if (trace) log.trace(this + " exiting");
return; // exit thread
}
+
+ break;
+ }
+ catch (InterruptedException e)
+ {
}
}
}
@@ -259,7 +267,7 @@
return lastRequestHandledTimestamp;
}
- public void shutdown()
+ public synchronized void shutdown()
{
shutdown = true;
running = false;
@@ -270,17 +278,25 @@
// NOTE ALSO!: Shutdown should never be synchronized. We don't want to hold up accept()
// thread! (via LRUpool)
- if (!handlingResponse)
+ synchronized (this)
{
try
{
- this.interrupt();
- Thread.interrupted(); // clear
+ if (socketWrapper != null)
+ {
+ log.debug(this + " closing socketWrapper: " + socketWrapper);
+ socketWrapper.close();
}
- catch (Exception ignored)
+ }
+ catch (Exception ex)
{
+ log.debug("failed to close socket wrapper", ex);
}
+ socketWrapper = null;
}
+
+ if (trace) log.trace(this + " shutting down");
+ notifyAll();
}
/**
@@ -478,6 +494,8 @@
// log.debug("failed to close in/out", ex);
// }
+ synchronized (this)
+ {
try
{
if (socketWrapper != null)
@@ -492,6 +510,7 @@
}
socketWrapper = null;
}
+ }
protected void processInvocation(SocketWrapper socketWrapper) throws Exception
{
@@ -532,6 +551,14 @@
}
}
+ completeProcessInvocation(inputStream, performVersioning, version);
+ }
+
+ protected synchronized void completeProcessInvocation(InputStream inputStream,
+ boolean performVersioning,
+ int version)
+ throws Exception
+ {
Object obj = versionedRead(inputStream, invoker, getClass().getClassLoader(), version);
// setting timestamp since about to start processing
More information about the jboss-cvs-commits
mailing list