[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...
Ron Sigal
ron_sigal at yahoo.com
Sat Feb 3 00:20:19 EST 2007
User: rsigal
Date: 07/02/03 00:20:19
Modified: src/main/org/jboss/remoting/transport/socket
SocketServerInvoker.java
Log:
Ovidiu's logging changes and reorganization: sync with remoting_2_x.
Revision Changes Path
1.34 +156 -131 JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: SocketServerInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java,v
retrieving revision 1.33
retrieving revision 1.34
diff -u -b -r1.33 -r1.34
--- SocketServerInvoker.java 27 Dec 2006 05:56:48 -0000 1.33
+++ SocketServerInvoker.java 3 Feb 2007 05:20:19 -0000 1.34
@@ -24,11 +24,13 @@
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.ServerInvoker;
-import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
import org.jboss.remoting.util.TimerUtil;
+import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
import org.jboss.util.propertyeditor.PropertyEditors;
+import org.jboss.logging.Logger;
import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
@@ -45,13 +47,17 @@
*
* @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
- * @version $Revision: 1.33 $
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ *
+ * @version $Revision: 1.34 $
* @jmx:mbean
*/
public class SocketServerInvoker extends ServerInvoker implements Runnable, SocketServerInvokerMBean
{
- private InetAddress addr;
- private int port;
+ private static final Logger log = Logger.getLogger(SocketServerInvoker.class);
+
+ private static boolean trace = log.isTraceEnabled();
+
static int clientCount = 0;
private Properties props = new Properties();
@@ -89,12 +95,6 @@
protected int idleTimeout = -1;
protected IdleTimerTask idleTimerTask = null;
- /**
- * The logging trace level flag
- */
- protected boolean trace = false;
-
-
public SocketServerInvoker(InvokerLocator locator)
{
super(locator);
@@ -111,7 +111,8 @@
* then a new ServerSocket is created that accepts the new connections
* @param serverSocketFactory
*/
- public void setNewServerSocketFactory(ServerSocketFactory serverSocketFactory){
+ public void setNewServerSocketFactory(ServerSocketFactory serverSocketFactory)
+ {
newServerSocketFactory=true;
setServerSocketFactory(serverSocketFactory);
}
@@ -121,17 +122,17 @@
* creating a new ServerSocket from new ServerSocketFactory
* @throws IOException
*/
- protected void refreshServerSocket() throws IOException{
+ protected void refreshServerSocket() throws IOException
+ {
newServerSocketFactory=false;
serverSocket.close();
- serverSocket=null;
+ serverSocket = null;
InetAddress bindAddress = InetAddress.getByName(getServerBindAddress());
serverSocket = createServerSocket(getServerBindPort(), backlog, bindAddress);
serverSocket.setReuseAddress(reuseAddress);
}
- protected void setup()
- throws Exception
+ protected void setup() throws Exception
{
props.putAll(getConfiguration());
PropertyEditors.mapJavaBeanProperties(this, props, false);
@@ -159,11 +160,10 @@
*/
public synchronized void start() throws IOException
{
-
- trace = log.isTraceEnabled();
-
if(!running)
{
+ log.debug(this + " starting");
+
InetAddress bindAddress = InetAddress.getByName(getServerBindAddress());
if(maxPoolSize <= 0)
@@ -178,7 +178,8 @@
}
catch(IOException e)
{
- log.error("Error starting ServerSocket. Bind port: " + getServerBindPort() + ", bind address: " + bindAddress);
+ log.error("Error starting ServerSocket. Bind port: " + getServerBindPort() +
+ ", bind address: " + bindAddress);
throw e;
}
@@ -187,21 +188,18 @@
threadpool = new LinkedList();
acceptThreads = new Thread[numAcceptThreads];
+
for(int i = 0; i < numAcceptThreads; i++)
{
- if(trace)
- {
- log.trace("Creating another AcceptThread");
- }
+ if(trace) { log.trace(this + " creating another AcceptThread"); }
+
String name = getThreadName(i);
acceptThreads[i] = new Thread(this, name);
- //acceptThreads[i].start();
- if(trace)
- {
- log.trace("Thread accepted");
- }
+
+ if(trace) { log.trace(this + " created and registered " + acceptThreads[i]); }
}
}
+
try
{
super.start();
@@ -238,16 +236,20 @@
}
}
+ log.debug(this + " started");
+
}
- protected ServerSocket createServerSocket(int serverBindPort, int backlog, InetAddress bindAddress) throws IOException
+ protected ServerSocket createServerSocket(int serverBindPort,
+ int backlog,
+ InetAddress bindAddress) throws IOException
{
return getServerSocketFactory().createServerSocket(serverBindPort, backlog, bindAddress);
}
protected String getThreadName(int i)
{
- return "SocketServerInvoker#" + i + "-" + getServerBindPort();
+ return "AcceptorThread#" + i + ":" + getServerBindPort();
}
public void destroy()
@@ -267,7 +269,6 @@
*/
public synchronized void stop()
{
-
if(running)
{
cleanup();
@@ -294,10 +295,8 @@
}
}
-/*
- * The following code has been changed to avoid a race condition with ServerThread.run() which
- * can result in leaving ServerThreads alive, which causes a memory leak.
- */
+ // The following code has been changed to avoid a race condition with ServerThread.run() which
+ // can result in leaving ServerThreads alive, which causes a memory leak.
if (clientpool != null)
{
synchronized (clientpool)
@@ -342,7 +341,6 @@
/**
* Indicates if SO_REUSEADDR is enabled on server sockets
* Default is true.
- * @return
*/
public boolean getReuseAddress()
{
@@ -446,7 +444,6 @@
}
}
-
public int getIdleTimeout()
{
return idleTimeout;
@@ -487,12 +484,9 @@
public void run()
{
- if(trace)
- {
- log.trace("Started execution of method run");
- }
+ if(trace) { log.trace(this + " started execution of method run()"); }
- ServerSocketRefresh thread=new ServerSocketRefresh();
+ ServerSocketRefresh thread = new ServerSocketRefresh();
thread.start();
try
@@ -501,20 +495,23 @@
{
try
{
- if(trace)
- {
- log.trace("Socket is going to be accepted");
- }
-
thread.release(); //goes on if serversocket refresh is completed
+
+ if(trace) { log.trace(this + " is going to wait on serverSocket.accept()"); }
+
Socket socket = serverSocket.accept();
- if(trace)
- {
- log.trace("Accepted: " + socket);
- }
+
+ if(trace) { log.trace(this + " accepted " + socket); }
+
+ // the acceptor thread should spend as little time as possbile doing any kind of
+ // operation, and under no circumstances should perform IO on the new socket, which
+ // can potentially block and lock up the server. For this reason, the acceptor thread
+ // should grab a worker thread and delegate all subsequent work to it. This is what
+ // processInvocation() does.
+
processInvocation(socket);
}
- catch (javax.net.ssl.SSLException e)
+ catch (SSLException e)
{
log.error("SSLServerSocket error", e);
return;
@@ -523,7 +520,7 @@
{
if(running)
{
- log.error("Failed to accept socket connection", ex);
+ log.error(this + " failed to handle socket", ex);
}
}
}
@@ -534,74 +531,79 @@
}
}
- protected void processInvocation(Socket socket)
- throws Exception
+
+ /**
+ * The acceptor thread should spend as little time as possbile doing any kind of operation, and
+ * under no circumstances should perform IO on the new socket, which can potentially block and
+ * lock up the server. For this reason, the acceptor thread should grab a worker thread and
+ * delegate all subsequent work to it.
+ */
+ protected void processInvocation(Socket socket) throws Exception
{
- ServerThread thread = null;
+ ServerThread worker = null;
boolean newThread = false;
- while(thread == null)
+ while(worker == null)
{
- if(log.isTraceEnabled())
- {
- log.trace("try to get a thread for processing");
- }
+ if(trace) { log.trace(this + " trying to get a worker thread from threadpool for processing"); }
+
synchronized(threadpool)
{
if(threadpool.size() > 0)
{
- thread = (ServerThread) threadpool.removeFirst();
- if(log.isTraceEnabled())
- {
- log.trace("Got thread for processing - " + thread);
- }
+ worker = (ServerThread)threadpool.removeFirst();
+
+ if(trace) { log.trace(this + (worker == null ? " found NO threads in threadpool" : " got " + worker + " from threadpool")); }
}
+ else if (trace) { { log.trace(this + " has an empty threadpool"); } }
}
- if(thread == null)
+
+ if(worker == null)
{
synchronized(clientpool)
{
if(clientpool.size() < maxPoolSize)
{
- thread = new ServerThread(socket, this, clientpool, threadpool, getTimeout(), serverSocketClass);
+ if(trace) { log.trace(this + " creating new worker thread"); }
+
+ worker = new ServerThread(socket, this, clientpool, threadpool,
+ getTimeout(), serverSocketClass);
+
+ if(trace) { log.trace(this + " created " + worker); }
+
newThread = true;
}
- if(thread == null)
+
+ if(worker == null)
{
+ if(trace) {log.trace(this + " trying to evict a thread from clientpool"); }
+
clientpool.evict();
- if(trace)
- {
- log.trace("Waiting for a thread...");
- }
+
+ if(trace) {log.trace(this + " waiting for a thread from clientpool"); }
+
clientpool.wait();
- if(trace)
- {
- log.trace("Notified of available thread");
- }
+
+ if(trace) { log.trace(this + " notified of clientpool thread availability"); }
}
}
}
}
+
synchronized(clientpool)
{
- clientpool.insert(thread, thread);
+ clientpool.insert(worker, worker);
}
if(newThread)
{
- if(trace)
- {
- log.trace("Created a new thread, t=" + thread);
- }
- thread.start();
+ if(trace) {log.trace(this + " starting " + worker); }
+ worker.start();
}
else
{
- if(trace)
- {
- log.trace("Reusing thread t=" + thread);
- }
- thread.wakeup(socket, getTimeout(), locator.getParameters());
+ if(trace) { log.trace(this + " reusing " + worker); }
+ worker.wakeup(socket, getTimeout(), this);
}
}
@@ -609,20 +611,25 @@
* returns true if the transport is bi-directional in nature, for example,
* SOAP in unidirectional and SOCKETs are bi-directional (unless behind a firewall
* for example).
- *
- * @return
*/
public boolean isTransportBiDirectional()
{
return true;
}
+ public String toString()
+ {
+ return "SocketServerInvoker[" +
+ (serverSocket == null ?
+ "UNINITIALIZED" :
+ serverSocket.getInetAddress().getHostAddress() + ":" + serverSocket.getLocalPort()) +
+ "]";
+ }
+
/**
* Each implementation of the remote client invoker should have
* a default data type that is uses in the case it is not specified
* in the invoker locator uri.
- *
- * @return
*/
protected String getDefaultDataType()
{
@@ -635,33 +642,42 @@
* @author Michael Voss
*
*/
- public class ServerSocketRefresh extends Thread{
+ public class ServerSocketRefresh extends Thread
+ {
boolean serverSocketLocked=false;
public ServerSocketRefresh()
{
super("ServerSocketRefresh");
}
- public void run(){
- while(true){
+
+ public void run()
+ {
+ while(true)
+ {
if(isInterrupted())break;
- if(newServerSocketFactory){
+ if(newServerSocketFactory)
+ {
log.debug("got notice about new ServerSocketFactory");
serverSocketLocked=true;
- try {
+ try
+ {
log.debug("refreshing server socket");
refreshServerSocket();
- } catch (IOException e) {
+ } catch (IOException e)
+ {
log.debug("could not refresh server socket");
log.debug("message is: "+e.getMessage());
}
log.debug("server socket refreshed");
serverSocketLocked=false;
-
}
- try {
+ try
+ {
Thread.sleep(10000);
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e)
+ {
interrupt();
}
}
@@ -670,9 +686,18 @@
/**
* let the run() method resume when refresh is completed
*/
- public void release(){
- while (serverSocketLocked){
- try{Thread.sleep(1000);}catch(InterruptedException ignored){}
+ public void release()
+ {
+ while (serverSocketLocked)
+ {
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch(InterruptedException ignored)
+ {
+
+ }
}
}
}
More information about the jboss-cvs-commits
mailing list