[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...
Ovidiu Feodorov
ovidiu.feodorov at jboss.com
Tue Jan 16 03:15:05 EST 2007
User: ovidiu
Date: 07/01/16 03:15:05
Modified: src/main/org/jboss/remoting/transport/socket Tag:
remoting_2_x MicroSocketClientInvoker.java
ServerThread.java SocketServerInvoker.java
Log:
flushing minor changes (trivial refactoring, logging improvments) before attempting a fix for http://jira.jboss.org/jira/browse/JBREM-666
Revision Changes Path
No revision
No revision
1.16.2.6 +5 -5 JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: MicroSocketClientInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java,v
retrieving revision 1.16.2.5
retrieving revision 1.16.2.6
diff -u -b -r1.16.2.5 -r1.16.2.6
--- MicroSocketClientInvoker.java 13 Jan 2007 12:42:36 -0000 1.16.2.5
+++ MicroSocketClientInvoker.java 16 Jan 2007 08:15:05 -0000 1.16.2.6
@@ -32,7 +32,7 @@
*
* @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
- * @version $Revision: 1.16.2.5 $
+ * @version $Revision: 1.16.2.6 $
*/
public class MicroSocketClientInvoker extends RemoteClientInvoker
{
@@ -190,7 +190,7 @@
try
{
enableTcpNoDelay = Boolean.valueOf((String) val).booleanValue();
- log.debug("Setting SocketClientInvoker::enableTcpNoDelay to: " + enableTcpNoDelay);
+ log.debug(this + " setting SocketClientInvoker::enableTcpNoDelay to " + enableTcpNoDelay);
}
catch (Exception e)
{
@@ -204,11 +204,11 @@
try
{
maxPoolSize = Integer.valueOf((String)val).intValue();
- log.debug("Setting SocketClientInvoker::maxPoolSize to: " + maxPoolSize);
+ log.debug(this + " setting SocketClientInvoker::maxPoolSize to: " + maxPoolSize);
}
catch (Exception e)
{
- log.warn("Could not convert " + MAX_POOL_SIZE_FLAG + " value of " + val + " to a int value.");
+ log.warn("Could not convert " + MAX_POOL_SIZE_FLAG + " value of " + val + " to a int value");
}
}
// look for client socket class name
@@ -219,7 +219,7 @@
if (value.length() > 0)
{
clientSocketClassName = value;
- log.debug("Setting ClientSocket class name to: " + clientSocketClassName);
+ log.debug(this + " setting ClientSocket class name to " + clientSocketClassName);
}
}
1.29.2.8 +14 -14 JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerThread.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java,v
retrieving revision 1.29.2.7
retrieving revision 1.29.2.8
diff -u -b -r1.29.2.7 -r1.29.2.8
--- ServerThread.java 16 Jan 2007 00:29:29 -0000 1.29.2.7
+++ ServerThread.java 16 Jan 2007 08:15:05 -0000 1.29.2.8
@@ -61,7 +61,7 @@
* @author <a href="mailto:tom at jboss.org">Tom Elrod</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
- * @version $Revision: 1.29.2.7 $
+ * @version $Revision: 1.29.2.8 $
*/
public class ServerThread extends Thread
{
@@ -115,20 +115,15 @@
setName(getWorkerThreadName(socket));
this.serverSocketClassName = serverSocketClassName;
- this.socketWrapper = createServerSocket(socket, timeout, invoker.getLocator().getParameters());
+ this.socketWrapper = createServerSocketWrapper(socket, timeout, invoker.getLocator().getParameters());
this.invoker = invoker;
this.clientpool = clientpool;
this.threadpool = threadpool;
- init();
- }
-
- private void init()
- {
if (invoker != null)
{
Map configMap = invoker.getConfiguration();
- String checkValue = (String) configMap.get(SocketServerInvoker.CHECK_CONNECTION_KEY);
+ String checkValue = (String)configMap.get(SocketServerInvoker.CHECK_CONNECTION_KEY);
if (checkValue != null && checkValue.length() > 0)
{
shouldCheckConnection = Boolean.valueOf(checkValue).booleanValue();
@@ -188,7 +183,8 @@
return this.shouldCheckConnection;
}
- private SocketWrapper createServerSocket(Socket socket, int timeout, Map metadata) throws Exception
+ private SocketWrapper createServerSocketWrapper(Socket socket, int timeout, Map metadata)
+ throws Exception
{
if (serverSocketConstructor == null)
{
@@ -199,7 +195,8 @@
try
{
- serverSocketConstructor = serverSocketClass.getConstructor(new Class[]{Socket.class, Map.class, Integer.class});
+ serverSocketConstructor = serverSocketClass.
+ getConstructor(new Class[]{Socket.class, Map.class, Integer.class});
}
catch (NoSuchMethodException e)
{
@@ -209,13 +206,16 @@
}
SocketWrapper serverSocketWrapper = null;
+
if (serverSocketConstructor.getParameterTypes().length == 3)
{
- serverSocketWrapper = (SocketWrapper) serverSocketConstructor.newInstance(new Object[]{socket, metadata, new Integer(timeout)});
+ serverSocketWrapper = (SocketWrapper)serverSocketConstructor.
+ newInstance(new Object[]{socket, metadata, new Integer(timeout)});
}
else
{
- serverSocketWrapper = (SocketWrapper) serverSocketConstructor.newInstance(new Object[]{socket});
+ serverSocketWrapper =
+ (SocketWrapper)serverSocketConstructor.newInstance(new Object[]{socket});
serverSocketWrapper.setTimeout(timeout);
}
@@ -273,11 +273,11 @@
// rename the worker thread to reflect the new socket it is handling
setName(getWorkerThreadName(socket));
- this.socketWrapper = createServerSocket(socket, timeout, metadata);
+ socketWrapper = createServerSocketWrapper(socket, timeout, metadata);
running = true;
handlingResponse = true;
- this.notify();
+ notify();
if(trace) { log.trace(this + " has woken up"); }
}
1.30.2.7 +110 -84 JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: SocketServerInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java,v
retrieving revision 1.30.2.6
retrieving revision 1.30.2.7
diff -u -b -r1.30.2.6 -r1.30.2.7
--- SocketServerInvoker.java 16 Jan 2007 00:29:29 -0000 1.30.2.6
+++ SocketServerInvoker.java 16 Jan 2007 08:15:05 -0000 1.30.2.7
@@ -49,13 +49,15 @@
* @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
- * @version $Revision: 1.30.2.6 $
+ * @version $Revision: 1.30.2.7 $
* @jmx:mbean
*/
public class SocketServerInvoker extends ServerInvoker implements Runnable, SocketServerInvokerMBean
{
private static final Logger log = Logger.getLogger(SocketServerInvoker.class);
+ private static boolean trace = log.isTraceEnabled();
+
static int clientCount = 0;
private Properties props = new Properties();
@@ -93,12 +95,6 @@
protected int idleTimeout = -1;
protected IdleTimerTask idleTimerTask = null;
- /**
- * The logging trace level flag
- */
- protected boolean trace = false;
-
-
public SocketServerInvoker(InvokerLocator locator)
{
super(locator);
@@ -115,7 +111,8 @@
* then a new ServerSocket is created that accepts the new connections
* @param serverSocketFactory
*/
- public void setNewServerSocketFactory(ServerSocketFactory serverSocketFactory){
+ public void setNewServerSocketFactory(ServerSocketFactory serverSocketFactory)
+ {
newServerSocketFactory=true;
setServerSocketFactory(serverSocketFactory);
}
@@ -135,8 +132,7 @@
serverSocket.setReuseAddress(reuseAddress);
}
- protected void setup()
- throws Exception
+ protected void setup() throws Exception
{
props.putAll(getConfiguration());
PropertyEditors.mapJavaBeanProperties(this, props, false);
@@ -164,11 +160,10 @@
*/
public synchronized void start() throws IOException
{
-
- trace = log.isTraceEnabled();
-
if(!running)
{
+ log.debug(this + " starting");
+
InetAddress bindAddress = InetAddress.getByName(getServerBindAddress());
if(maxPoolSize <= 0)
@@ -183,7 +178,8 @@
}
catch(IOException e)
{
- log.error("Error starting ServerSocket. Bind port: " + getServerBindPort() + ", bind address: " + bindAddress);
+ log.error("Error starting ServerSocket. Bind port: " + getServerBindPort() +
+ ", bind address: " + bindAddress);
throw e;
}
@@ -192,21 +188,18 @@
threadpool = new LinkedList();
acceptThreads = new Thread[numAcceptThreads];
+
for(int i = 0; i < numAcceptThreads; i++)
{
- if(trace)
- {
- log.trace("Creating another AcceptThread");
- }
+ if(trace) { log.trace(this + " creating another AcceptThread"); }
+
String name = getThreadName(i);
acceptThreads[i] = new Thread(this, name);
- //acceptThreads[i].start();
- if(trace)
- {
- log.trace("Thread accepted");
- }
+
+ if(trace) { log.trace(this + " created and registered " + acceptThreads[i]); }
}
}
+
try
{
super.start();
@@ -243,9 +236,13 @@
}
}
+ log.debug(this + " started");
+
}
- protected ServerSocket createServerSocket(int serverBindPort, int backlog, InetAddress bindAddress) throws IOException
+ protected ServerSocket createServerSocket(int serverBindPort,
+ int backlog,
+ InetAddress bindAddress) throws IOException
{
return getServerSocketFactory().createServerSocket(serverBindPort, backlog, bindAddress);
}
@@ -272,7 +269,6 @@
*/
public synchronized void stop()
{
-
if(running)
{
cleanup();
@@ -299,10 +295,8 @@
}
}
-/*
- * The following code has been changed to avoid a race condition with ServerThread.run() which
- * can result in leaving ServerThreads alive, which causes a memory leak.
- */
+ // The following code has been changed to avoid a race condition with ServerThread.run() which
+ // can result in leaving ServerThreads alive, which causes a memory leak.
if (clientpool != null)
{
synchronized (clientpool)
@@ -450,7 +444,6 @@
}
}
-
public int getIdleTimeout()
{
return idleTimeout;
@@ -510,6 +503,12 @@
if(trace) { log.trace(this + " accepted " + socket); }
+ // the acceptor thread should spend as little time as possbile doing any kind of
+ // operation, and under no circumstances should perform IO on the new socket, which
+ // can potentially block and lock up the server. For this reason, the acceptor thread
+ // should grab a worker thread and delegate all subsequent work to it. This is what
+ // processInvocation() does.
+
processInvocation(socket);
}
catch (SSLException e)
@@ -532,27 +531,34 @@
}
}
+
+ /**
+ * The acceptor thread should spend as little time as possbile doing any kind of operation, and
+ * under no circumstances should perform IO on the new socket, which can potentially block and
+ * lock up the server. For this reason, the acceptor thread should grab a worker thread and
+ * delegate all subsequent work to it.
+ */
protected void processInvocation(Socket socket) throws Exception
{
- ServerThread thread = null;
+ ServerThread worker = null;
boolean newThread = false;
- while(thread == null)
+ while(worker == null)
{
- if(trace) { log.trace(this + " trying to get a worker thread for processing"); }
+ if(trace) { log.trace(this + " trying to get a worker thread from threadpool for processing"); }
synchronized(threadpool)
{
if(threadpool.size() > 0)
{
- thread = (ServerThread)threadpool.removeFirst();
+ worker = (ServerThread)threadpool.removeFirst();
- if(trace) { log.trace(this + (thread == null ? " found NO threads in pool" : " got " + thread + " from pool")); }
+ if(trace) { log.trace(this + (worker == null ? " found NO threads in threadpool" : " got " + worker + " from threadpool")); }
}
- else if (trace) { { log.trace(this + " has an empty thread pool"); } }
+ else if (trace) { { log.trace(this + " has an empty threadpool"); } }
}
- if(thread == null)
+ if(worker == null)
{
synchronized(clientpool)
{
@@ -560,16 +566,18 @@
{
if(trace) { log.trace(this + " creating new worker thread"); }
- thread = new ServerThread(socket, this, clientpool, threadpool,
+ worker = new ServerThread(socket, this, clientpool, threadpool,
getTimeout(), serverSocketClass);
- if(trace) { log.trace(this + " created " + thread); }
+ if(trace) { log.trace(this + " created " + worker); }
newThread = true;
}
- if(thread == null)
+ if(worker == null)
{
+ if(trace) {log.trace(this + " trying to evict a thread from clientpool"); }
+
clientpool.evict();
if(trace) {log.trace(this + " waiting for a thread from clientpool"); }
@@ -584,18 +592,18 @@
synchronized(clientpool)
{
- clientpool.insert(thread, thread);
+ clientpool.insert(worker, worker);
}
if(newThread)
{
- if(trace) {log.trace(this + " starting " + thread); }
- thread.start();
+ if(trace) {log.trace(this + " starting " + worker); }
+ worker.start();
}
else
{
- if(trace) { log.trace(this + " reusing " + thread); }
- thread.wakeup(socket, getTimeout(), locator.getParameters());
+ if(trace) { log.trace(this + " reusing " + worker); }
+ worker.wakeup(socket, getTimeout(), locator.getParameters());
}
}
@@ -634,33 +642,42 @@
* @author Michael Voss
*
*/
- public class ServerSocketRefresh extends Thread{
+ public class ServerSocketRefresh extends Thread
+ {
boolean serverSocketLocked=false;
public ServerSocketRefresh()
{
super("ServerSocketRefresh");
}
- public void run(){
- while(true){
+
+ public void run()
+ {
+ while(true)
+ {
if(isInterrupted())break;
- if(newServerSocketFactory){
+ if(newServerSocketFactory)
+ {
log.debug("got notice about new ServerSocketFactory");
serverSocketLocked=true;
- try {
+ try
+ {
log.debug("refreshing server socket");
refreshServerSocket();
- } catch (IOException e) {
+ } catch (IOException e)
+ {
log.debug("could not refresh server socket");
log.debug("message is: "+e.getMessage());
}
log.debug("server socket refreshed");
serverSocketLocked=false;
-
}
- try {
+ try
+ {
Thread.sleep(10000);
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e)
+ {
interrupt();
}
}
@@ -669,9 +686,18 @@
/**
* let the run() method resume when refresh is completed
*/
- public void release(){
- while (serverSocketLocked){
- try{Thread.sleep(1000);}catch(InterruptedException ignored){}
+ public void release()
+ {
+ while (serverSocketLocked)
+ {
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch(InterruptedException ignored)
+ {
+
+ }
}
}
}
More information about the jboss-cvs-commits
mailing list