[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...
Ron Sigal
ron_sigal at yahoo.com
Sat Jun 30 03:31:09 EDT 2007
User: rsigal
Date: 07/06/30 03:31:09
Modified: src/main/org/jboss/remoting/transport/socket Tag:
remoting_2_x MicroSocketClientInvoker.java
Log:
JBREM-641: Added management of connections used in client side oneway invocations.
Revision Changes Path
No revision
No revision
1.16.2.28 +247 -13 JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: MicroSocketClientInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java,v
retrieving revision 1.16.2.27
retrieving revision 1.16.2.28
diff -u -b -r1.16.2.27 -r1.16.2.28
--- MicroSocketClientInvoker.java 19 May 2007 02:41:08 -0000 1.16.2.27
+++ MicroSocketClientInvoker.java 30 Jun 2007 07:31:09 -0000 1.16.2.28
@@ -3,11 +3,15 @@
import org.jboss.logging.Logger;
import org.jboss.remoting.CannotConnectException;
import org.jboss.remoting.ConnectionFailedException;
+import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.RemoteClientInvoker;
import org.jboss.remoting.ServerInvoker;
import org.jboss.remoting.Version;
import org.jboss.remoting.serialization.ClassLoaderUtility;
+import org.jboss.remoting.callback.CallbackPoller;
+import org.jboss.remoting.invocation.InternalInvocation;
+import org.jboss.remoting.invocation.OnewayInvocation;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;
import org.jboss.remoting.marshal.VersionedMarshaller;
@@ -23,12 +27,17 @@
import java.net.Socket;
import java.net.SocketException;
import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
import java.rmi.MarshalException;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
/**
* SocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker, which must be
@@ -38,7 +47,7 @@
* @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
- * @version $Revision: 1.16.2.27 $
+ * @version $Revision: 1.16.2.28 $
*/
public class MicroSocketClientInvoker extends RemoteClientInvoker
{
@@ -68,6 +77,12 @@
*/
public static final String CLIENT_SOCKET_CLASS_FLAG = "clientSocketClass";
+ /** Key for setting timeout used by OnewayConnectionTask */
+ public static final String ONEWAY_CONNECTION_DELAY = "onewayConnectionDelay";
+
+ /** Key for setting timeout used by OnewayConnectionTask */
+ public static final String ONEWAY_CONNECTION_TIMEOUT = "onewayConnectionTimeout";
+
/**
* Default value for enable TCP nodelay. Value is false.
*/
@@ -91,6 +106,11 @@
*/
public static final int MAX_POOL_SIZE = 50;
+ /** Default delay value used by OnewayConnectionTask. Value is 5 seconds. */
+ public static final int ONEWAY_CONNECTION_DELAY_DEFAULT = 5000;
+
+ /** Default timeout value used by OnewayConnectionTask. Value is 1 seconds. */
+ public static final int ONEWAY_CONNECTION_TIMEOUT_DEFAULT = 1000;
// Static ---------------------------------------------------------------------------------------
@@ -103,6 +123,8 @@
protected static final Map connectionPools = new HashMap();
+ protected Timer onewayConnectionTimer;
+
// Performance measurements
public static long getSocketTime = 0;
public static long readTime = 0;
@@ -192,6 +214,8 @@
protected int numberOfRetries;
protected int numberOfCallRetries;
protected int maxPoolSize;
+ protected int onewayConnectionDelay;
+ protected int onewayConnectionTimeout;
/**
* Pool for this invoker. This is shared between all instances of proxies attached to a specific
@@ -228,6 +252,8 @@
numberOfCallRetries = MAX_CALL_RETRIES;
pool = null;
maxPoolSize = MAX_POOL_SIZE;
+ onewayConnectionDelay = ONEWAY_CONNECTION_DELAY_DEFAULT;
+ onewayConnectionTimeout = ONEWAY_CONNECTION_TIMEOUT_DEFAULT;
usedPooled = 0;
usedPoolLock = new Object();
@@ -431,6 +457,38 @@
shouldCheckConnection = true;
log.debug(this + " setting shouldCheckConnection to " + shouldCheckConnection);
}
+
+ // look for onewayConnectionDelay param
+ val = params.get(ONEWAY_CONNECTION_DELAY);
+ if (val != null)
+ {
+ try
+ {
+ onewayConnectionDelay = Integer.valueOf((String)val).intValue();
+ log.debug(this + " setting onewayConnectionDelay to " + onewayConnectionDelay);
+ }
+ catch (Exception e)
+ {
+ log.warn(this + " could not convert " + ONEWAY_CONNECTION_DELAY + " value of " +
+ val + " to an int value");
+ }
+ }
+
+ // look for onewayConnectionTimeout param
+ val = params.get(ONEWAY_CONNECTION_TIMEOUT);
+ if (val != null)
+ {
+ try
+ {
+ onewayConnectionTimeout = Integer.valueOf((String)val).intValue();
+ log.debug(this + " setting onewayConnectionTimeout to " + onewayConnectionTimeout);
+ }
+ catch (Exception e)
+ {
+ log.warn(this + " could not convert " + ONEWAY_CONNECTION_TIMEOUT + " value of " +
+ val + " to an int value");
+ }
+ }
}
protected ServerAddress createServerAddress()
@@ -653,6 +711,33 @@
handleException(sockEx, socketWrapper);
}
+ boolean clientSideOneway = true;
+ if (invocation instanceof InvocationRequest)
+ {
+ InvocationRequest ir = (InvocationRequest) invocation;
+ if (ir.getParameter() instanceof OnewayInvocation)
+ clientSideOneway = false;
+ }
+
+ if (clientSideOneway)
+ {
+ // Hand over to oneway connection task to test if connection may be reused.
+ synchronized (OnewayConnectionTask.class)
+ {
+ if (onewayConnectionTimer == null)
+ {
+ onewayConnectionTimer = new Timer(true);
+ log.debug(this + " created oneway connection timer: " + onewayConnectionTimer);
+ }
+ }
+
+ TimerTask onewayConnectionTask
+ = new OnewayConnectionTask(this, socketWrapper, marshaller, unmarshaller);
+ onewayConnectionTimer.schedule(onewayConnectionTask, onewayConnectionDelay);
+ if (trace) log.trace(this + " scheduled OnewayConnectionTask for: " + socketWrapper);
+ }
+ else
+ {
// Put socket back in pool for reuse
synchronized (pool)
{
@@ -677,6 +762,7 @@
}
}
}
+ }
if (trace && !oneway) { log.trace(this + " received response " + response); }
return response;
@@ -996,4 +1082,152 @@
// Inner classes --------------------------------------------------------------------------------
+ /**
+ * OnewayConnectionTask takes sockets used for client side oneway invocations and,
+ * once the oneway invocation has concluded, returns the socket to the socket pool.
+ */
+ protected static class OnewayConnectionTask extends TimerTask
+ {
+ private static InvocationRequest invocation;
+
+ static
+ {
+ Object[] params = new Object[1];
+ params[0] = "onewayEcho";
+ String echoMethod = InternalInvocation.ECHO;
+ InternalInvocation ii = new InternalInvocation(echoMethod, params);
+ invocation = new InvocationRequest(null, null, ii, null, null, null);
+ }
+
+ private MicroSocketClientInvoker invoker;
+ private SocketWrapper wrapper;
+ private Marshaller marshaller;
+ private UnMarshaller unmarshaller;
+ private int onewayConnectionTimeout;
+
+
+ public OnewayConnectionTask(MicroSocketClientInvoker invoker,
+ SocketWrapper wrapper,
+ Marshaller marshaller,
+ UnMarshaller unmarshaller)
+ {
+ this.invoker = invoker;
+ this.wrapper = wrapper;
+ this.marshaller = marshaller;
+ this.unmarshaller = unmarshaller;
+ this.onewayConnectionTimeout = invoker.onewayConnectionTimeout;
+ }
+
+ public void run()
+ {
+ try
+ {
+ int originalTimeout = wrapper.getTimeout();
+ wrapper.setTimeout(onewayConnectionTimeout);
+ OutputStream outputStream = wrapper.getOutputStream();
+ InputStream inputStream = wrapper.getInputStream();
+ int version = Version.getDefaultVersion();
+ boolean performVersioning = Version.performVersioning();
+
+ if (performVersioning)
+ {
+ invoker.writeVersion(outputStream, version);
+ }
+
+ invoker.versionedWrite(outputStream, marshaller, invocation, version);
+
+ if (performVersioning)
+ {
+ int readVersion = invoker.readVersion(inputStream);
+
+ if (readVersion == -1)
+ {
+ if (trace) log.trace(invoker + "end of file: closing connection");
+ destroyConnection(wrapper, true);
+ }
+ if (readVersion == SocketWrapper.CLOSING)
+ {
+ log.info(invoker + " Received version 254: treating as end of file");
+ destroyConnection(wrapper, true);
+ }
+ }
+
+ invoker.versionedRead(inputStream, unmarshaller, version);
+ returnConnection(wrapper, originalTimeout);
+ }
+ catch (Exception e)
+ {
+ if (trace) log.trace(invoker + " oneway connection task error: ", e);
+ destroyConnection(wrapper, true);
+ }
+ }
+
+ private void returnConnection(SocketWrapper wrapper, int originalTimeout)
+ {
+ if (trace) log.trace(invoker + " returning connection to pool: " + wrapper);
+
+ // Put socket back in pool for reuse
+ try
+ {
+ synchronized (invoker.pool)
+ {
+ if (invoker.pool.size() < invoker.maxPoolSize)
+ {
+ wrapper.setTimeout(originalTimeout);
+ invoker.pool.add(wrapper);
+ synchronized(invoker.usedPoolLock)
+ {
+ invoker.usedPooled--;
+ }
+ if (trace) { log.trace(invoker + " returned " + wrapper + " to pool"); }
+ }
+ else
+ {
+ if (trace) { log.trace(invoker + "'pool is full, will close connection: " + wrapper); }
+ destroyConnection(wrapper, false);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ if (trace)
+ {
+ log.trace(invoker + " error: " + e.getMessage(), e);
+ log.trace(invoker + " closing connection: " + wrapper);
+ }
+ try
+ {
+ destroyConnection(wrapper, true);
+ synchronized(invoker.usedPoolLock)
+ {
+ invoker.usedPooled--;
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+
+ private void destroyConnection(SocketWrapper wrapper, boolean decrement)
+ {
+ if (trace) log.trace(invoker + " destroying connection: " + wrapper);
+
+ try
+ {
+ wrapper.close();
+ }
+ catch (Exception e)
+ {
+ }
+
+ if (decrement)
+ {
+ synchronized(invoker.usedPoolLock)
+ {
+ invoker.usedPooled--;
+ }
+ }
+ }
+ }
}
More information about the jboss-cvs-commits
mailing list