[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...
Ron Sigal
ron_sigal at yahoo.com
Sat Feb 3 00:17:21 EST 2007
User: rsigal
Date: 07/02/03 00:17:21
Modified: src/main/org/jboss/remoting/transport/socket
ServerThread.java
Log:
JBREM-597, JBREM-690, JBREM-692: sync with remoting_2_x.
Revision Changes Path
1.33 +508 -404 JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerThread.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java,v
retrieving revision 1.32
retrieving revision 1.33
diff -u -b -r1.32 -r1.33
--- ServerThread.java 12 Jan 2007 18:56:22 -0000 1.32
+++ ServerThread.java 3 Feb 2007 05:17:21 -0000 1.33
@@ -26,12 +26,14 @@
import org.jboss.logging.Logger;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.InvocationResponse;
+import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.ServerInvoker;
import org.jboss.remoting.Version;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.serialization.ClassLoaderUtility;
import org.jboss.remoting.marshal.MarshalFactory;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;
-import org.jboss.remoting.serialization.ClassLoaderUtility;
import java.io.EOFException;
import java.io.IOException;
@@ -43,6 +45,7 @@
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
@@ -59,65 +62,84 @@
*
* @author <a href="mailto:bill at jboss.org">Bill Burke</a>
* @author <a href="mailto:tom at jboss.org">Tom Elrod</a>
- * @version $Revision: 1.32 $
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ *
+ * @version $Revision: 1.33 $
*/
public class ServerThread extends Thread
{
+ // Constants ------------------------------------------------------------------------------------
+
final static private Logger log = Logger.getLogger(ServerThread.class);
- protected SocketServerInvoker invoker;
+ // Static ---------------------------------------------------------------------------------------
+
+ private static boolean trace = log.isTraceEnabled();
+
+ private static int idGenerator = 0;
+
+ public static synchronized int nextID()
+ {
+ return idGenerator++;
+ }
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ protected volatile boolean running;
+ protected volatile boolean handlingResponse;
+ protected volatile boolean shutdown;
+
protected LRUPool clientpool;
protected LinkedList threadpool;
- protected volatile boolean running = true;
- protected volatile boolean handlingResponse = true; // start off as true so that nobody can interrupt us
- protected volatile boolean shutdown = false;
- protected static int id = 0;
-
- private SocketWrapper socketWrapper = null;
- protected String serverSocketClassName = ServerSocketWrapper.class.getName();
- protected Class serverSocketClass = null;
- private Constructor serverSocketConstructor = null;
- private static final boolean isTrace = log.isTraceEnabled();
- private static final boolean isDebug = log.isDebugEnabled();
+ protected String serverSocketClassName;
+ protected Class serverSocketClass;
- /**
- * Indicates if will check the socket connection when
- * getting from pool by sending byte over the connection
- * to validate is still good.
- */
- private boolean shouldCheckConnection = false;
+ private Socket socket;
+ private int timeout;
+ protected SocketServerInvoker invoker;
+ private Constructor serverSocketConstructor;
- /**
- * Will indicate when the last request has been processed (used in determining
- * idle connection/thread timeout)
- */
+ protected Marshaller marshaller;
+ protected UnMarshaller unmarshaller;
+
+ // the unique identity of the thread, which won't change during the life of the thread. The
+ // thread may get associated with different IP addresses though.
+ private int id = Integer.MIN_VALUE;
+
+ // Indicates if will check the socket connection when getting from pool by sending byte over the
+ // connection to validate is still good.
+ private boolean shouldCheckConnection;
+
+ // Will indicate when the last request has been processed (used in determining idle
+ // connection/thread timeout)
private long lastRequestHandledTimestamp = System.currentTimeMillis();
- public static synchronized int nextID()
- {
- int nextID = id++;
- return nextID;
- }
+ // Constructors ---------------------------------------------------------------------------------
public ServerThread(Socket socket, SocketServerInvoker invoker, LRUPool clientpool,
- LinkedList threadpool, int timeout, String serverSocketClassName) throws Exception
+ LinkedList threadpool, int timeout, String serverSocketClassName)
+ throws Exception
{
- super("SocketServerInvokerThread-" + socket.getInetAddress().getHostAddress() + "-" + nextID());
+ super();
+
+ running = true;
+ handlingResponse = true; // start off as true so that nobody can interrupt us
+
+ setName(getWorkerThreadName(socket));
+
+ this.socket = socket;
+ this.timeout = timeout;
this.serverSocketClassName = serverSocketClassName;
- this.socketWrapper = createServerSocket(socket, timeout, invoker.getLocator().getParameters());
this.invoker = invoker;
this.clientpool = clientpool;
this.threadpool = threadpool;
- init();
- }
+ processNewSocket();
- private void init()
- {
if (invoker != null)
{
Map configMap = invoker.getConfiguration();
- String checkValue = (String) configMap.get(SocketServerInvoker.CHECK_CONNECTION_KEY);
+ String checkValue = (String)configMap.get(SocketServerInvoker.CHECK_CONNECTION_KEY);
if (checkValue != null && checkValue.length() > 0)
{
shouldCheckConnection = Boolean.valueOf(checkValue).booleanValue();
@@ -125,6 +147,101 @@
}
}
+ // Thread overrides -----------------------------------------------------------------------------
+
+ public void run()
+ {
+ try
+ {
+ while (true)
+ {
+ dorun();
+
+ // The following code has been changed to eliminate a race condition with
+ // SocketServerInvoker.cleanup().
+ //
+ // A ServerThread can shutdown for two reasons:
+ // 1. the client shuts down, and
+ // 2. the server shuts down.
+ //
+ // If both occur around the same time, a problem arises. If a ServerThread starts to
+ // shut down because the client shut down, it will test shutdown, and if it gets to the
+ // test before SocketServerInvoker.cleanup() calls ServerThread.stop() to set shutdown
+ // to true, it will return itself to threadpool. If it moves from clientpool to
+ // threadpool at just the right time, SocketServerInvoker could miss it in both places
+ // and never call stop(), leaving it alive, resulting in a memory leak. The solution is
+ // to synchronize parts of ServerThread.run() and SocketServerInvoker.cleanup() so that
+ // they interact atomically.
+
+ synchronized (this)
+ {
+ synchronized (clientpool)
+ {
+ synchronized (threadpool)
+ {
+ if (shutdown)
+ {
+ invoker = null;
+ return; // exit thread
+ }
+ else
+ {
+ if(trace) { log.trace(this + " removing itself from clientpool and going to threadpool"); }
+ clientpool.remove(this);
+ threadpool.add(this);
+ Thread.interrupted(); // clear any interruption so that we can be pooled.
+ clientpool.notify();
+ }
+ }
+ }
+
+ try
+ {
+ if(trace) { log.trace(this + " begins to wait"); }
+
+ wait();
+
+ if(trace) { log.trace(this + " woke up after wait"); }
+ }
+ catch (InterruptedException e)
+ {
+ if (shutdown)
+ {
+ invoker = null;
+ return; // exit thread
+ }
+
+ throw e;
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ log.debug(this + " exiting run on exception, definitively thrown out of the threadpool", e);
+ }
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public synchronized void wakeup(Socket socket, int timeout, SocketServerInvoker invoker)
+ throws Exception
+ {
+ // rename the worker thread to reflect the new socket it is handling
+ setName(getWorkerThreadName(socket));
+
+ this.socket = socket;
+ this.timeout = timeout;
+ this.invoker = invoker;
+
+ running = true;
+ handlingResponse = true;
+ processNewSocket();
+ notify();
+
+ if(trace) { log.trace(this + " has notified on mutex"); }
+ }
+
public long getLastRequestTimestamp()
{
return lastRequestHandledTimestamp;
@@ -134,14 +251,13 @@
{
shutdown = true;
running = false;
- // This is a race and there is a chance
- // that a invocation is going on at the time
- // of the interrupt. But I see no way right
- // now to protect for this.
-
- // NOTE ALSO!:
- // Shutdown should never be synchronized.
- // We don't want to hold up accept() thread! (via LRUpool)
+
+ // This is a race and there is a chance that a invocation is going on at the time of the
+ // interrupt. But I see no way right now to protect for this.
+
+ // NOTE ALSO!: Shutdown should never be synchronized. We don't want to hold up accept()
+ // thread! (via LRUpool)
+
if (!handlingResponse)
{
try
@@ -153,15 +269,12 @@
{
}
}
-
}
/**
- * Sets if server thread should check connection before continue to process on
- * next invocation request. If is set to true, will send an ACK to client to
- * verify client is still connected on same socket.
- *
- * @param checkConnection
+ * Sets if server thread should check connection before continue to process on next invocation
+ * request. If is set to true, will send an ACK to client to verify client is still connected
+ * on same socket.
*/
public void shouldCheckConnection(boolean checkConnection)
{
@@ -169,64 +282,24 @@
}
/**
- * Indicates if server will check with client (via an ACK) to
- * see if is still there.
- *
- * @return
+ * Indicates if server will check with client (via an ACK) to see if is still there.
*/
public boolean getCheckingConnection()
{
return this.shouldCheckConnection;
}
- private SocketWrapper createServerSocket(Socket socket, int timeout, Map metadata) throws Exception
- {
- if (serverSocketConstructor == null)
- {
- if(serverSocketClass == null)
- {
- serverSocketClass = ClassLoaderUtility.loadClass(serverSocketClassName, getClass());
- }
-
- try
- {
- serverSocketConstructor = serverSocketClass.getConstructor(new Class[]{Socket.class, Map.class, Integer.class});
- }
- catch (NoSuchMethodException e)
- {
- serverSocketConstructor = serverSocketClass.getConstructor(new Class[]{Socket.class});
- }
-
- }
-
- SocketWrapper serverSocketWrapper = null;
- if (serverSocketConstructor.getParameterTypes().length == 3)
- {
- serverSocketWrapper = (SocketWrapper) serverSocketConstructor.newInstance(new Object[]{socket, metadata, new Integer(timeout)});
- }
- else
+ public void evict()
{
- serverSocketWrapper = (SocketWrapper) serverSocketConstructor.newInstance(new Object[]{socket});
- serverSocketWrapper.setTimeout(timeout);
- }
+ running = false;
- return serverSocketWrapper;
- }
+ // This is a race and there is a chance that a invocation is going on at the time of the
+ // interrupt. But I see no way right now to protect for this. There may not be a problem
+ // because interrupt only effects threads blocking on IO.
+ // NOTE ALSO!: Shutdown should never be synchronized. We don't want to hold up accept()
+ // thread! (via LRUpool)
- public void evict()
- {
- running = false;
- // This is a race and there is a chance
- // that a invocation is going on at the time
- // of the interrupt. But I see no way right
- // now to protect for this.
- // There may not be a problem because interrupt only effects
- // threads blocking on IO.
-
- // NOTE ALSO!:
- // Shutdown should never be synchronized.
- // We don't want to hold up accept() thread! (via LRUpool)
if (!handlingResponse)
{
try
@@ -241,16 +314,14 @@
}
/**
- * This method is intended to be used when need to unblock
- * I/O read, which the thread will automatically loop back to
- * do after processing a request. Calling this method will cause
- * the underlying socket to be closed.
+ * This method is intended to be used when need to unblock I/O read, which the thread will
+ * automatically loop back to do after processing a request.
*/
public void unblock()
{
try
{
- socketWrapper.close();
+ socket.close();
}
catch (IOException e)
{
@@ -258,216 +329,206 @@
}
}
- public synchronized void wakeup(Socket socket, int timeout, Map metadata) throws Exception
- {
- this.socketWrapper = createServerSocket(socket, timeout, metadata);
- String name = "SocketServerInvokerThread-" + socket.getInetAddress().getHostAddress() + "-" + nextID();
- super.setName(name);
- running = true;
- handlingResponse = true;
- this.notify();
- if(isTrace)
+ public String toString()
{
- log.trace("Have woken up " + getName());
- }
+ return getName();
}
- public void run()
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ /**
+ * This is needed because Object*Streams leak
+ */
+ protected void dorun()
{
+ if(trace) { log.trace("beginning dorun()"); }
+
+ running = true;
+ handlingResponse = true;
+
+ // lazy initialize the socketWrapper on the worker thread itself. We do this to avoid to have
+ // it done on the acceptor thread (prone to lockup)
+ SocketWrapper socketWrapper = null;
+
try
{
- while (true)
- {
- dorun();
+ if(trace) { log.trace("creating the socket wrapper"); }
-/*
-* The following code has been changed to eliminate a race condition with SocketServerInvoker.cleanup
-().
-* A ServerThread can shutdown for two reasons:
-*
-* 1. the client shuts down, and
-* 2. the server shuts down.
-*
-* If both occur around the same time, a problem arises. If a ServerThread starts to shut
-* down because the client shut down, it will test shutdown, and if it gets to the test
-* before SocketServerInvoker.cleanup() calls ServerThread.stop() to set shutdown to true, it
-* will return itself to threadpool. If it moves from clientpool to threadpool at just the
-* right time, SocketServerInvoker could miss it in both places and never call stop(), leaving
-* it alive, resulting in a memory leak. The solution is to synchronize parts of
-* ServerThread.run() and SocketServerInvoker.cleanup() so that they interact atomically.
-*/
- synchronized (this)
- {
- synchronized (clientpool)
+ socketWrapper =
+ createServerSocketWrapper(socket, timeout, invoker.getLocator().getParameters());
+
+ // Always do first one without an ACK because its not needed
+ if(trace) { log.trace("processing first invocation without acknowledging"); }
+ processInvocation(socketWrapper);
+ }
+ catch (Exception ex)
{
- synchronized (threadpool)
+ log.error("Worker thread initialization failure", ex);
+ running = false;
+ }
+
+ // Re-use loop
+ while (running)
{
- if (shutdown)
+ try
{
- invoker = null;
- return; // exit thread
+ acknowledge(socketWrapper);
+ processInvocation(socketWrapper);
}
- else
+ catch (AcknowledgeFailure e)
{
- if(isTrace)
+ if (!shutdown && trace)
{
- log.trace("removing from clientpool and adding to threadpool " + this);
- }
- clientpool.remove(this);
- threadpool.add(this);
- Thread.interrupted(); // clear any interruption so that we can be pooled.
- clientpool.notify();
- }
+ log.trace("keep alive acknowledge failed!");
}
+ running = false;
}
-
- try
+ catch(SocketTimeoutException ste)
{
- if(isTrace)
+ if(!shutdown)
{
- log.trace("begin thread wait");
- }
- this.wait();
- if(isTrace)
+ if(trace)
{
- log.trace("WAKEUP in SERVER THREAD");
+ log.trace(ste);
}
}
- catch (InterruptedException e)
+ running = false;
+ }
+ catch (InterruptedIOException e)
{
- if (shutdown)
+ if (!shutdown)
{
- invoker = null;
- return; // exit thread
+ log.error("Socket IO interrupted", e);
}
+ running = false;
- throw e;
}
+ catch (InterruptedException e)
+ {
+ if(trace)
+ {
+ log.trace(e);
}
+ if (!shutdown)
+ {
+ log.error("interrupted", e);
}
}
- catch (Exception ignored)
+ catch (EOFException eof)
{
- if(isDebug)
+ if (!shutdown && trace)
{
- log.debug("Exiting run on exception", ignored);
- }
+ log.trace("EOFException received. This is likely due to client finishing communication.", eof);
}
+ running = false;
}
-
-
- protected void acknowledge() throws Exception
+ catch (SocketException sex)
{
- if (shouldCheckConnection)
+ if (!shutdown && trace)
{
-
- // HERE IS THE RACE between ACK received and handlingResponse = true
- // We can't synchronize because readByte blocks and client is expecting
- // a response and we don't want to hang client.
- // see shutdown and evict for more details
- // There may not be a problem because interrupt only effects
- // threads blocking on IO. and this thread will just continue.
- handlingResponse = true;
-
- try
- {
- socketWrapper.checkConnection();
+ log.trace("SocketException received. This is likely due to client disconnecting and resetting connection.", sex);
}
- catch (EOFException e)
- {
- throw new AcknowledgeFailure();
+ running = false;
}
- catch (SocketException se)
+ catch (Exception ex)
{
- throw new AcknowledgeFailure();
- }
- catch (IOException ioe)
+ if (!shutdown)
{
- throw new AcknowledgeFailure();
+ log.error("failed", ex);
+ running = false;
}
-
- handlingResponse = false;
}
+ // clear any interruption so that thread can be pooled.
+ handlingResponse = false;
+ Thread.interrupted();
}
- protected Object versionedRead(InputStream inputStream, ServerInvoker invoker,
- ClassLoader classLoader, int version) throws IOException, ClassNotFoundException
- {
- //TODO: -TME - Should I even botther to check for version here? Only one way to do processing at this
- // point, regardless of version.
- switch (version)
- {
- case Version.VERSION_1:
- case Version.VERSION_2:
- {
+ // Ok, we've been shutdown. Do appropriate cleanups.
+ // The stream close code has been moved to SocketWrapper.close().
+// try
+// {
+// if (socketWrapper != null)
+// {
+// InputStream in = socketWrapper.getInputStream();
+// if (in != null)
+// {
+// in.close();
+// }
+// OutputStream out = socketWrapper.getOutputStream();
+// if (out != null)
+// {
+// out.close();
+// }
+// }
+// }
+// catch (Exception ex)
+// {
+// log.debug("failed to close in/out", ex);
+// }
- //TODO: -TME Need better way to get the unmarshaller (via config)
- UnMarshaller unmarshaller = MarshalFactory.getUnMarshaller(invoker.getLocator(), classLoader);
- if (unmarshaller == null)
+ try
{
- unmarshaller = MarshalFactory.getUnMarshaller(invoker.getDataType(), invoker.getSerializationType());
+ if (socketWrapper != null)
+ {
+ socketWrapper.close();
}
- return unmarshaller.read(inputStream, null);
}
- default:
+ catch (Exception ex)
{
- throw new IOException("Can not read data for version " + version + ". Supported versions: " + Version.VERSION_1 + "," + Version.VERSION_2);
- }
+ log.error("failed to close socket wrapper", ex);
}
+ socketWrapper = null;
}
- protected void processInvocation() throws Exception
- {
- if(isTrace)
+ protected void processInvocation(SocketWrapper socketWrapper) throws Exception
{
- log.trace("processing invocation in server thread.");
- }
+ if(trace) { log.trace("preparing to process next invocation invocation"); }
+
handlingResponse = true;
+
// Ok, now read invocation and invoke
//TODO: -TME This needs to be done by ServerInvoker
int version = Version.getDefaultVersion();
boolean performVersioning = Version.performVersioning();
InputStream inputStream = socketWrapper.getInputStream();
+
if (performVersioning)
{
-// if(!inputStream.markSupported())
-// {
-// inputStream = new BufferedInputStream(inputStream);
-// }
-// // marking the stream in case the version is invalid
-// inputStream.mark(2);
-
version = readVersion(inputStream);
+
//TODO: -TME Should I be checking for -1?
- /**
- * This is a best attempt to determine if is old version. Typically, the first
- * byte will be -1, so if is, will reset stream and process as though is older version
- */
- /**
- * Originally this code (now uncommented) and the other commented code was to try to
- * make so could automatically detect older version that would not be sending a byte
- * for the version. However, due to the way the serialization stream manager handles
- * the stream, resetting it does not work, so will probably have to throw away that idea.
- * However, for now, am uncommenting this section because if are using the flag to turn
- * off connection checking (ack back to client), then will get a -1 when the client
- * closes connection. Then when stream passed onto the versionedRead, will get EOFException
- * thrown and will process normally (as though came from the acknowledge, as would have happened
- * if connection checking was turned on). Am hoping this is not a mistake...
- */
+ // This is a best attempt to determine if is old version. Typically, the first byte will
+ // be -1, so if is, will reset stream and process as though is older version.
+
+ // Originally this code (now uncommented) and the other commented code was to try to make
+ // so could automatically detect older version that would not be sending a byte for the
+ // version. However, due to the way the serialization stream manager handles the stream,
+ // resetting it does not work, so will probably have to throw away that idea. However, for
+ // now, am uncommenting this section because if are using the flag to turn off connection
+ // checking (ack back to client), then will get a -1 when the client closes connection.
+ // Then when stream passed onto the versionedRead, will get EOFException thrown and will
+ // process normally (as though came from the acknowledge, as would have happened if
+ // connection checking was turned on). Am hoping this is not a mistake...
+
if(version == -1)
{
version = Version.VERSION_1;
-// inputStream.reset();
}
}
- Object obj = versionedRead(inputStream, invoker, this.getClass().getClassLoader(), version);
+ Object obj = versionedRead(inputStream, invoker, getClass().getClassLoader(), version);
+
+ // setting timestamp since about to start processing
+ lastRequestHandledTimestamp = System.currentTimeMillis();
InvocationRequest req = null;
boolean createdInvocationRequest = false;
boolean isError = false;
+
if(obj instanceof InvocationRequest)
{
req = (InvocationRequest)obj;
@@ -479,28 +540,40 @@
performVersioning = false;
}
-
Object resp = null;
+
try
{
// Make absolutely sure thread interrupted is cleared.
- boolean interrupted = Thread.interrupted();
+ Thread.interrupted();
+
+ if(trace) { log.trace("about to call " + invoker + ".invoke()"); }
+
// call transport on the subclass, get the result to handback
resp = invoker.invoke(req);
+
+ if(trace) { log.trace(invoker + ".invoke() returned " + resp); }
}
catch (Throwable ex)
{
resp = ex;
isError = true;
+ log.error(invoker + ".invoke() call failed: " + ex.getMessage());
+ log.debug(invoker + ".invoke() call failed", ex);
}
Thread.interrupted(); // clear interrupted state so we don't fail on socket writes
- if(!isOneway(req.getRequestPayload()))
+ if(isOneway(req.getRequestPayload()))
+ {
+ if(trace) { log.trace("oneway request, writing no reply on the wire"); }
+ }
+ else
{
if(!createdInvocationRequest)
{
// need to return invocation response
+ if(trace) { log.trace("creating response instance"); }
resp = new InvocationResponse(req.getSessionId(), resp, isError, req.getReturnPayload());
}
@@ -512,214 +585,245 @@
versionedWrite(outputStream, invoker, this.getClass().getClassLoader(), resp, version);
}
+
handlingResponse = false;
+
// set the timestamp for last successful processed request
lastRequestHandledTimestamp = System.currentTimeMillis();
}
- private boolean isOneway(Map metadata)
- {
- boolean isOneway = false;
-
- if (metadata != null)
+ protected void acknowledge(SocketWrapper socketWrapper) throws Exception
{
- Object val = metadata.get(org.jboss.remoting.Client.ONEWAY_FLAG);
- if (val != null && val instanceof String && Boolean.valueOf((String) val).booleanValue())
+ if (shouldCheckConnection)
{
- if (isTrace)
+ // HERE IS THE RACE between ACK received and handlingResponse = true. We can't synchronize
+ // because readByte blocks and client is expecting a response and we don't want to hang
+ // client. See shutdown and evict for more details. There may not be a problem because
+ // interrupt only effects threads blocking on IO. and this thread will just continue.
+
+ handlingResponse = true;
+
+ try
{
- log.trace("Oneway invocation, so not waiting for response. Returning null.");
- }
- isOneway = true;
- }
- }
- return isOneway;
+ if(trace) { log.trace("checking connection"); }
+ socketWrapper.checkConnection();
}
-
- private InvocationRequest createInvocationRequest(Object obj, SocketWrapper socketWrapper)
+ catch (EOFException e)
{
- if(obj instanceof InvocationRequest)
+ throw new AcknowledgeFailure();
+ }
+ catch (SocketException se)
{
- return (InvocationRequest)obj;
+ throw new AcknowledgeFailure();
}
- else
+ catch (IOException ioe)
{
- // need to wrap request with invocation request
- SocketAddress remoteAddress = socketWrapper.getSocket().getRemoteSocketAddress();
-
- InvocationRequest request = new InvocationRequest(remoteAddress.toString(), invoker.getSupportedSubsystems()[0], obj, null, null, null);
- return request;
+ throw new AcknowledgeFailure();
}
+ handlingResponse = false;
+ }
}
- private void versionedWrite(OutputStream outputStream, SocketServerInvoker invoker,
- ClassLoader classLoader, Object resp, int version) throws IOException
+ protected Object versionedRead(InputStream inputStream, ServerInvoker invoker,
+ ClassLoader classLoader, int version)
+ throws IOException, ClassNotFoundException
{
- //TODO: -TME - Should I ever worry about checking version here? Only one way to send data at this point.
+ //TODO: -TME - Should I even botther to check for version here? Only one way to do processing
+ // at this point, regardless of version.
switch (version)
{
case Version.VERSION_1:
case Version.VERSION_2:
{
- Marshaller marshaller = MarshalFactory.getMarshaller(invoker.getLocator(), classLoader);
+ if(trace) { log.trace("blocking to read invocation from unmarshaller"); }
- if (marshaller == null)
- {
- marshaller = MarshalFactory.getMarshaller(invoker.getDataType(), invoker.getSerializationType());
- }
+ Object o = unmarshaller.read(inputStream, null);
- marshaller.write(resp, outputStream);
- return;
+ if(trace) { log.trace("read " + o + " from unmarshaller"); }
+
+ return o;
}
default:
{
- throw new IOException("Can not write data for version " + version + ". Supported version: " + Version.VERSION_1 + ", " + Version.VERSION_2);
+ throw new IOException("Can not read data for version " + version +
+ ". Supported versions: " + Version.VERSION_1 + "," + Version.VERSION_2);
}
}
-
}
- private int readVersion(InputStream inputStream) throws IOException
- {
- return inputStream.read();
- }
+ // Private --------------------------------------------------------------------------------------
- private void writeVersion(OutputStream outputStream, int version) throws IOException
+ private SocketWrapper createServerSocketWrapper(Socket socket, int timeout, Map metadata)
+ throws Exception
{
- outputStream.write(version);
- }
-
- /**
- * This is needed because Object*Streams leak
- */
- protected void dorun()
+ if (serverSocketConstructor == null)
{
- if(isTrace)
+ if(serverSocketClass == null)
{
- log.trace("beginning dorun");
+ serverSocketClass = ClassLoaderUtility.loadClass(serverSocketClassName, getClass());
}
- running = true;
- handlingResponse = true;
- // Always do first one without an ACK because its not needed
try
{
- processInvocation();
+ serverSocketConstructor = serverSocketClass.
+ getConstructor(new Class[]{Socket.class, Map.class, Integer.class});
}
- catch (Exception ex)
+ catch (NoSuchMethodException e)
{
- log.error("failed to process invocation.", ex);
- running = false;
+ serverSocketConstructor = serverSocketClass.getConstructor(new Class[]{Socket.class});
}
- // Re-use loop
- while (running)
+ }
+
+ SocketWrapper serverSocketWrapper = null;
+
+ if (serverSocketConstructor.getParameterTypes().length == 3)
{
- try
+ Map localMetadata = null;
+ if (metadata == null)
{
- acknowledge();
- processInvocation();
+ localMetadata = new HashMap(2);
}
- catch (AcknowledgeFailure e)
- {
- if (!shutdown && isTrace)
+ else
{
- log.trace("Keep alive acknowledge failed.");
+ localMetadata = new HashMap(metadata);
}
- running = false;
+ localMetadata.put(SocketWrapper.MARSHALLER, marshaller);
+ localMetadata.put(SocketWrapper.UNMARSHALLER, unmarshaller);
+ serverSocketWrapper = (SocketWrapper)serverSocketConstructor.
+ newInstance(new Object[]{socket, localMetadata, new Integer(timeout)});
}
- catch(SocketTimeoutException ste)
- {
- if(!shutdown)
- {
- if(isTrace)
+ else
{
- log.trace(ste);
- }
+ serverSocketWrapper =
+ (SocketWrapper)serverSocketConstructor.newInstance(new Object[]{socket});
+
+ serverSocketWrapper.setTimeout(timeout);
}
- running = false;
+ return serverSocketWrapper;
}
- catch (InterruptedIOException e)
- {
- if (!shutdown)
+
+ private boolean isOneway(Map metadata)
{
- log.error("Socket IO interrupted", e);
- }
- running = false;
+ boolean isOneway = false;
- }
- catch (InterruptedException e)
+ if (metadata != null)
{
- if(isTrace)
+ Object val = metadata.get(Client.ONEWAY_FLAG);
+ if (val != null && val instanceof String && Boolean.valueOf((String) val).booleanValue())
{
- log.trace(e);
+ isOneway = true;
}
- if (!shutdown)
- {
- log.error("interrupted", e);
}
+ return isOneway;
}
- catch (EOFException eof)
+
+ private InvocationRequest createInvocationRequest(Object obj, SocketWrapper socketWrapper)
{
- if (!shutdown && isTrace)
+ if(obj instanceof InvocationRequest)
{
- log.trace("EOF received. This is likely due to client finishing comminication.");
+ return (InvocationRequest)obj;
}
- running = false;
+ else
+ {
+ // need to wrap request with invocation request
+ SocketAddress remoteAddress = socketWrapper.getSocket().getRemoteSocketAddress();
+ return new InvocationRequest(remoteAddress.toString(),
+ invoker.getSupportedSubsystems()[0],
+ obj, null, null, null);
}
- catch (SocketException sex)
+ }
+
+ private void processNewSocket()
{
- if (!shutdown && isTrace)
+ InvokerLocator locator = invoker.getLocator();
+ ClassLoader classLoader = getClass().getClassLoader();
+ String dataType = invoker.getDataType();
+ String serializationType = invoker.getSerializationType();
+
+ //TODO: -TME Need better way to get the unmarshaller (via config)
+
+ if (unmarshaller == null)
{
- log.trace("SocketException received. This is likely due to client disconnecting and resetting connection.");
+ unmarshaller = MarshalFactory.getUnMarshaller(locator, classLoader);
}
- running = false;
- }
- catch (Exception ex)
+ if (unmarshaller == null)
{
- if (!shutdown)
+ unmarshaller = MarshalFactory.getUnMarshaller(dataType, serializationType);
+ }
+
+ if (marshaller == null)
{
- log.error("failed", ex);
- running = false;
+ marshaller = MarshalFactory.getMarshaller(locator, classLoader);
}
+ if (marshaller == null)
+ {
+ marshaller = MarshalFactory.getMarshaller(dataType, serializationType);
}
- // clear any interruption so that thread can be pooled.
- handlingResponse = false;
- Thread.interrupted();
+
+
}
- // Ok, we've been shutdown. Do appropriate cleanups.
- try
+
+ private void versionedWrite(OutputStream outputStream, SocketServerInvoker invoker,
+ ClassLoader classLoader, Object resp, int version) throws IOException
+ {
+ //TODO: -TME - Should I ever worry about checking version here? Only one way to send data at this point.
+ switch (version)
{
- InputStream in = socketWrapper.getInputStream();
- if (in != null)
+ case Version.VERSION_1:
+ case Version.VERSION_2:
{
- in.close();
+ marshaller.write(resp, outputStream);
+ if (trace) { log.trace("wrote response to the output stream"); }
+ return;
}
- OutputStream out = socketWrapper.getOutputStream();
- if (out != null)
+ default:
{
- out.close();
+ throw new IOException("Can not write data for version " + version +
+ ". Supported version: " + Version.VERSION_1 + ", " + Version.VERSION_2);
}
}
- catch (Exception ex)
+ }
+
+ private int readVersion(InputStream inputStream) throws IOException
{
+ if(trace) { log.trace("blocking to read version from input stream"); }
+
+ int version = inputStream.read();
+
+ if(trace) { log.trace("read version " + version + " from input stream"); }
+
+ return version;
}
- try
+
+ private void writeVersion(OutputStream outputStream, int version) throws IOException
{
- socketWrapper.close();
+ outputStream.write(version);
}
- catch (Exception ex)
+
+ private String getWorkerThreadName(Socket currentSocket)
+ {
+ if (id == Integer.MIN_VALUE)
{
- log.error("Failed cleanup", ex);
+ id = nextID();
}
- socketWrapper = null;
+
+ StringBuffer sb = new StringBuffer("WorkerThread#");
+ sb.append(id).append('[');
+ sb.append(currentSocket.getInetAddress().getHostAddress());
+ sb.append(':');
+ sb.append(currentSocket.getPort());
+ sb.append(']');
+
+ return sb.toString();
}
+ // Inner classes --------------------------------------------------------------------------------
+
public static class AcknowledgeFailure extends Exception
{
-
}
}
More information about the jboss-cvs-commits
mailing list