[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...
Ron Sigal
ron_sigal at yahoo.com
Mon Jan 22 20:02:29 EST 2007
User: rsigal
Date: 07/01/22 20:02:29
Modified: src/main/org/jboss/remoting/transport/socket
MicroSocketClientInvoker.java
Log:
JBREM-684: In transport() oneway invocations return socket to pool.
JBREM-662: Added Ovidiu's bailout flag from remoting_2_x branch.
Also, brought in Ovidiu's logging changes from remoting_2_x branch.
Revision Changes Path
1.20 +74 -46 JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: MicroSocketClientInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -b -r1.19 -r1.20
--- MicroSocketClientInvoker.java 12 Jan 2007 18:56:22 -0000 1.19
+++ MicroSocketClientInvoker.java 23 Jan 2007 01:02:29 -0000 1.20
@@ -6,10 +6,10 @@
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.RemoteClientInvoker;
import org.jboss.remoting.Version;
+import org.jboss.remoting.serialization.ClassLoaderUtility;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;
import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
-import org.jboss.remoting.serialization.ClassLoaderUtility;
import org.jboss.util.propertyeditor.PropertyEditors;
import java.io.IOException;
@@ -32,12 +32,12 @@
*
* @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
- * @version $Revision: 1.19 $
+ * @version $Revision: 1.20 $
*/
public class MicroSocketClientInvoker extends RemoteClientInvoker
{
private static final Logger log = Logger.getLogger(MicroSocketClientInvoker.class);
- private static final boolean isTraceEnabled = log.isTraceEnabled();
+ private static final boolean trace = log.isTraceEnabled();
protected InetAddress addr;
protected int port;
@@ -134,14 +134,17 @@
private boolean reuseAddress = true;
+ // flag being set on true by a disconnect request. If trying to create a connection goes on in a
+ // loop and a disconnect request arrives, this flag will be used to sent this information into
+ // the connect loop
+ private volatile boolean bailOut;
+
public MicroSocketClientInvoker(InvokerLocator locator)
- throws IOException
{
this(locator, null);
}
public MicroSocketClientInvoker(InvokerLocator locator, Map configuration)
- throws IOException
{
super(locator, configuration);
try
@@ -201,13 +204,12 @@
{
try
{
- int nVal = Integer.valueOf((String) val).intValue();
- maxPoolSize = nVal;
- log.debug("Setting SocketClientInvoker::maxPoolSize to: " + maxPoolSize);
+ maxPoolSize = Integer.valueOf((String)val).intValue();
+ log.debug(this + " setting SocketClientInvoker::maxPoolSize to: " + maxPoolSize);
}
catch (Exception e)
{
- log.warn("Could not convert " + MAX_POOL_SIZE_FLAG + " value of " + val + " to a int value.");
+ log.warn("Could not convert " + MAX_POOL_SIZE_FLAG + " value of " + val + " to a int value");
}
}
// look for client socket class name
@@ -218,7 +220,7 @@
if (value.length() > 0)
{
clientSocketClassName = value;
- log.debug("Setting ClientSocket class name to: " + clientSocketClassName);
+ log.debug(this + " setting ClientSocket class name to " + clientSocketClassName);
}
}
@@ -237,7 +239,6 @@
/**
* Indicates if will check socket connection when returning from
* pool by sending byte to the server. Default value will be false.
- * @return
*/
public boolean checkingConnection()
{
@@ -247,7 +248,6 @@
/**
* Returns if newly created sockets will have SO_REUSEADDR enabled.
* Default is for this to be true.
- * @return
*/
public boolean getReuseAddress()
{
@@ -264,6 +264,13 @@
reuseAddress = reuse;
}
+ public synchronized void disconnect()
+ {
+ if (trace) { log.trace(this + " disconnecting ..."); }
+ bailOut = true;
+ super.disconnect();
+ }
+
protected void finalize() throws Throwable
{
disconnect();
@@ -285,8 +292,6 @@
* Each implementation of the remote client invoker should have
* a default data type that is uses in the case it is not specified
* in the invoker locator uri.
- *
- * @return
*/
protected String getDefaultDataType()
{
@@ -323,15 +328,17 @@
{
throw new CannotConnectException("Can not get connection to server. Problem establishing socket connection for locator - " + locator, e);
}
+
long end = System.currentTimeMillis() - start;
getSocketTime += end;
+
try
{
-
int version = Version.getDefaultVersion();
boolean performVersioning = Version.performVersioning();
OutputStream outputStream = socketWrapper.getOutputStream();
+
if (performVersioning)
{
writeVersion(outputStream, version);
@@ -345,37 +352,40 @@
start = System.currentTimeMillis();
// check to see if is one way invocation and return if is
+ boolean oneway = false;
if(metadata != null)
{
Object val = metadata.get(org.jboss.remoting.Client.ONEWAY_FLAG);
if(val != null && val instanceof String && Boolean.valueOf((String)val).booleanValue())
{
- if(isTraceEnabled)
- {
- log.trace("Oneway invocation, so not waiting for response. Returning null.");
- }
- return null;
+ if(trace) { log.trace(this + " sent oneway invocation, so not waiting for response, returning null"); }
+// return null;
+ oneway = true;
}
}
+ if (!oneway)
+ {
InputStream inputStream = socketWrapper.getInputStream();
if (performVersioning)
{
version = readVersion(inputStream);
if (version == -1)
{
- throw new SocketException();
+ throw new SocketException("end of file");
}
}
response = versionedRead(inputStream, unmarshaller, version);
+ }
end = System.currentTimeMillis() - start;
readTime += end;
}
catch (SocketException sex)
{
- log.debug(sex);
+ log.debug(this + " got SocketException " + sex);
+
try
{
socketWrapper.close();
@@ -383,6 +393,7 @@
}
catch (Exception ex)
{
+ if (trace) { log.trace(this + " couldn't successfully close its socketWrapper", ex); }
}
/**
@@ -400,6 +411,8 @@
}
catch (Exception ex)
{
+ log.debug(this + " got exception " + ex);
+
try
{
socketWrapper.close();
@@ -434,7 +447,7 @@
}
else
{
- if (isTraceEnabled)
+ if (trace)
{
log.trace("Pool was already full, will close the connection");
}
@@ -448,12 +461,7 @@
}
}
- // Return response
- if (isTraceEnabled)
- {
- log.trace("Response: " + response);
- }
-
+ if (trace) { log.trace(this + " received " + response); }
return response;
}
@@ -480,7 +488,7 @@
protected Object handleException(Exception ex, SocketWrapper socketWrapper)
throws ClassNotFoundException, MarshalException
{
- log.error("Got marshalling exception, exiting", ex);
+ log.error(this + " got marshalling exception, exiting ...", ex);
if (ex instanceof ClassNotFoundException)
{
//TODO: -TME Add better exception handling for class not found exception
@@ -488,7 +496,7 @@
throw (ClassNotFoundException) ex;
}
- throw new MarshalException("Failed to communicate. Problem during marshalling/unmarshalling", ex);
+ throw new MarshalException("Failed to communicate. Problem during marshalling/unmarshalling.", ex);
}
private Object versionedRead(InputStream inputStream, UnMarshaller unmarshaller, int version) throws IOException, ClassNotFoundException
@@ -499,6 +507,7 @@
case Version.VERSION_1:
case Version.VERSION_2:
{
+ if (trace) { log.trace(this + " reading response from input stream"); }
return unmarshaller.read(inputStream, null);
}
default:
@@ -516,7 +525,10 @@
case Version.VERSION_1:
case Version.VERSION_2:
{
+ if (trace) { log.trace(this + " writing invocation on output stream"); }
marshaller.write(invocation, outputStream);
+ if (trace) { log.trace(this + " done writing invocation on output stream"); }
+
return;
}
default:
@@ -529,16 +541,19 @@
//TODO: -TME Exact same method in ServerThread
private int readVersion(InputStream inputStream) throws IOException
{
- return inputStream.read();
+ if (trace) { log.trace(this + " reading version from input stream"); }
+ int version = inputStream.read();
+ if (trace) { log.trace(this + " read version " + version + " from input stream"); }
+ return version;
}
//TODO: -TME Exact same method in ServerThread
private void writeVersion(OutputStream outputStream, int version) throws IOException
{
+ if (trace) { log.trace(this + " writing version " + version + " on output stream"); }
outputStream.write(version);
}
-
/**
* Close all sockets in a specific pool.
*/
@@ -585,6 +600,9 @@
while (it.hasNext())
{
ServerAddress sa = (ServerAddress) it.next();
+
+ if (trace) { log.trace("clearing pool for " + sa); }
+
clearPool(sa);
}
}
@@ -671,6 +689,13 @@
//
for (int i = 0; i < numberOfRetries; i++)
{
+ if (bailOut)
+ {
+ log.debug(this + " has been concurrently disconnected, " +
+ "bailing out from trying to create a new connection");
+ break;
+ }
+
synchronized (pool)
{
// if connection within pool, use it
@@ -679,6 +704,7 @@
pooled = getPooledConnection();
}
}
+
synchronized(usedPoolLock)
{
if (pooled != null)
@@ -692,18 +718,21 @@
// not in use, then need create a new connection which
// will be latered returned to the pool (thus filling out
// the pool, since starts out empty).
+
Socket socket = null;
+ long timestamp = System.currentTimeMillis();
try
{
- if (isTraceEnabled)
+ if (trace)
{
- log.trace("Creating socket number " + (counter++));
+ log.trace(this + " creating socket " + (counter++) + ", attempt " + (i + 1));
}
socket = createSocket(address.address, address.port);
}
catch (Exception ex)
{
- log.debug(ex);
+ log.debug(this + " got Exception " + ex + ", creation attempt took " + (System.currentTimeMillis() - timestamp) + " ms");
+
if (i + 1 < numberOfRetries)
{
Thread.sleep(1);
@@ -781,7 +810,7 @@
}
catch (Exception ex)
{
- if (isTraceEnabled)
+ if (trace)
{
log.trace("Couldn't reuse connection from pool");
}
@@ -806,5 +835,4 @@
return address.address;
}
-
}
More information about the jboss-cvs-commits
mailing list