Author: ron.sigal(a)jboss.com
Date: 2009-08-14 17:01:47 -0400 (Fri, 14 Aug 2009)
New Revision: 5333
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
Log:
JBREM-1147: createSocket() periodically checks for updated controlOuputStream.
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
===================================================================
---
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java 2009-08-14
21:00:16 UTC (rev 5332)
+++
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java 2009-08-14
21:01:47 UTC (rev 5333)
@@ -85,6 +85,8 @@
protected String listenerId;
private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;
+ private int pingWindowFactor = Bisocket.PING_WINDOW_FACTOR_DEFAULT;
+ private int pingWindow = pingWindowFactor * pingFrequency;
private int maxRetries = Bisocket.MAX_RETRIES_DEFAULT;
private Socket controlSocket;
private OutputStream controlOutputStream;
@@ -190,6 +192,26 @@
}
}
+ val = configuration.get(Bisocket.PING_WINDOW_FACTOR);
+ if (val != null && val instanceof String && ((String)
val).length() > 0)
+ {
+ try
+ {
+ pingWindowFactor = Integer.valueOf(((String) val)).intValue();
+ log.debug(this + " setting pingWindowFactor to " +
pingWindowFactor);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Invalid format for " + "\"" +
Bisocket.PING_WINDOW_FACTOR + "\": " + val);
+ }
+ }
+ else if (val != null)
+ {
+ log.warn("\"" + Bisocket.PING_WINDOW_FACTOR + "\"
must be specified as a String");
+ }
+
+ pingWindow = pingWindowFactor * pingFrequency;
+
val = configuration.get(Bisocket.MAX_RETRIES);
if (val != null)
{
@@ -231,7 +253,20 @@
this.pingFrequency = pingFrequency;
}
-
+
+ public int getPingWindowFactor()
+ {
+ return pingWindowFactor;
+ }
+
+
+ public void setPingWindowFactor(int pingWindowFactor)
+ {
+ this.pingWindowFactor = pingWindowFactor;
+ pingWindow = pingWindowFactor * pingFrequency;
+ }
+
+
protected void handleConnect() throws ConnectionFailedException
{
// Callback client on server side.
@@ -454,35 +489,47 @@
synchronized (controlLock)
{
- controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
- }
-
- synchronized (sockets)
- {
- if (!sockets.isEmpty())
+ if (log.isTraceEnabled()) log.trace(this + " writing
Bisocket.CREATE_ORDINARY_SOCKET on " + controlOutputStream);
+ try
{
- Iterator it = sockets.iterator();
- Socket socket = (Socket) it.next();
- it.remove();
- log.debug(this + " found socket (" + listenerId + "): " +
socket);
- return socket;
+ controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
+ if (log.isTraceEnabled()) log.trace(this + " wrote
Bisocket.CREATE_ORDINARY_SOCKET");
+
+ synchronized (sockets)
+ {
+ if (!sockets.isEmpty())
+ {
+ Iterator it = sockets.iterator();
+ Socket socket = (Socket) it.next();
+ it.remove();
+ log.debug(this + " found socket (" + listenerId + "):
" + socket);
+ return socket;
+ }
+ }
}
+ catch (IOException e)
+ {
+ log.debug(this + " unable to write
Bisocket.CREATE_ORDINARY_SOCKET", e);
+ }
}
long timeRemaining = timeout;
- long start = System.currentTimeMillis();
+ long pingFailedWindow = 2 * pingWindow;
+ long pingFailedTimeRemaining = pingFailedWindow;
+ long start = System.currentTimeMillis();
+ OutputStream savedControlOutputStream = controlOutputStream;
- while (isConnected() && !pingFailed.flag && (timeout == 0 ||
timeRemaining > 0))
+ while (isConnected() && (!pingFailed.flag || pingFailedTimeRemaining >
0) && (timeout == 0 || timeRemaining > 0))
{
synchronized (sockets)
- {
+ {
try
{
sockets.wait(1000);
}
catch (InterruptedException e)
{
- log.debug("unexpected interrupt");
+ log.debug(this + " unexpected interrupt");
}
if (!sockets.isEmpty())
@@ -494,9 +541,26 @@
return socket;
}
}
-
+
+ if (savedControlOutputStream != controlOutputStream)
+ {
+ savedControlOutputStream = controlOutputStream;
+ log.debug(this + " rewriting Bisocket.CREATE_ORDINARY_SOCKET on " +
controlOutputStream);
+ try
+ {
+ controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
+ log.debug(this + " rewrote Bisocket.CREATE_ORDINARY_SOCKET");
+ }
+ catch (IOException e)
+ {
+ log.debug(this + " unable to rewrite
Bisocket.CREATE_ORDINARY_SOCKET" + e.getMessage());
+ }
+ }
+
+ long elapsed = System.currentTimeMillis() - start;
if (timeout > 0)
- timeRemaining = timeout - (System.currentTimeMillis() - start);
+ timeRemaining = timeout - elapsed;
+ pingFailedTimeRemaining = pingFailedWindow - elapsed;
}
if (!isConnected())
@@ -526,6 +590,7 @@
controlSocket = socket;
log.debug(this + " control socket replaced by: " + socket);
controlOutputStream = controlSocket.getOutputStream();
+ log.debug("controlOutputStream replaced by: " + controlOutputStream);
}
if (pingTimerTask != null)