Author: ron.sigal(a)jboss.com
Date: 2009-08-14 16:52:58 -0400 (Fri, 14 Aug 2009)
New Revision: 5329
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
Log:
JBREM-1147: createSocket() periodically checks for updated controlOuputStream; JBREM-1140:
removed changes.
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
===================================================================
---
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java 2009-08-13
00:37:00 UTC (rev 5328)
+++
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java 2009-08-14
20:52:58 UTC (rev 5329)
@@ -24,7 +24,6 @@
import java.io.IOException;
import java.io.OutputStream;
-import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.Socket;
import java.util.Collections;
@@ -82,16 +81,17 @@
protected String listenerId;
private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;
+ private int pingWindowFactor = Bisocket.PING_WINDOW_FACTOR_DEFAULT;
+ private int pingWindow = pingWindowFactor * pingFrequency;
private int maxRetries = Bisocket.MAX_RETRIES_DEFAULT;
private Socket controlSocket;
private OutputStream controlOutputStream;
- private InputStream controlInputStream;
private Object controlLock = new Object();
private PingTimerTask pingTimerTask;
protected boolean isCallbackInvoker;
protected BooleanHolder pingFailed = new BooleanHolder(false);
- private boolean enablePingReplies = false;
+
/**
* @param listenerId
* @return
@@ -188,6 +188,26 @@
}
}
+ val = configuration.get(Bisocket.PING_WINDOW_FACTOR);
+ if (val != null && val instanceof String && ((String)
val).length() > 0)
+ {
+ try
+ {
+ pingWindowFactor = Integer.valueOf(((String) val)).intValue();
+ log.debug(this + " setting pingWindowFactor to " +
pingWindowFactor);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Invalid format for " + "\"" +
Bisocket.PING_WINDOW_FACTOR + "\": " + val);
+ }
+ }
+ else if (val != null)
+ {
+ log.warn("\"" + Bisocket.PING_WINDOW_FACTOR + "\"
must be specified as a String");
+ }
+
+ pingWindow = pingWindowFactor * pingFrequency;
+
val = configuration.get(Bisocket.MAX_RETRIES);
if (val != null)
{
@@ -203,14 +223,6 @@
" value of " + val + " to an int value.");
}
}
-
- val = configuration.get(Bisocket.ENABLE_PING_REPLIES);
- if (val != null)
- {
- // Boolean.valueOf doesn't throw exceptions :-)
- boolean bVal = Boolean.valueOf((String) val).booleanValue();
- enablePingReplies = bVal;
- }
}
}
@@ -237,7 +249,20 @@
this.pingFrequency = pingFrequency;
}
-
+
+ public int getPingWindowFactor()
+ {
+ return pingWindowFactor;
+ }
+
+
+ public void setPingWindowFactor(int pingWindowFactor)
+ {
+ this.pingWindowFactor = pingWindowFactor;
+ pingWindow = pingWindowFactor * pingFrequency;
+ }
+
+
protected void handleConnect() throws ConnectionFailedException
{
// Callback client on server side.
@@ -287,7 +312,6 @@
try
{
controlOutputStream = controlSocket.getOutputStream();
- controlInputStream = controlSocket.getInputStream();
}
catch (IOException e1)
{
@@ -461,35 +485,47 @@
synchronized (controlLock)
{
- controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
- }
-
- synchronized (sockets)
- {
- if (!sockets.isEmpty())
+ if (log.isTraceEnabled()) log.trace(this + " writing
Bisocket.CREATE_ORDINARY_SOCKET on " + controlOutputStream);
+ try
{
- Iterator it = sockets.iterator();
- Socket socket = (Socket) it.next();
- it.remove();
- log.debug(this + " found socket (" + listenerId + "): " +
socket);
- return socket;
+ controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
+ if (log.isTraceEnabled()) log.trace(this + " wrote
Bisocket.CREATE_ORDINARY_SOCKET");
+
+ synchronized (sockets)
+ {
+ if (!sockets.isEmpty())
+ {
+ Iterator it = sockets.iterator();
+ Socket socket = (Socket) it.next();
+ it.remove();
+ log.debug(this + " found socket (" + listenerId + "):
" + socket);
+ return socket;
+ }
+ }
}
+ catch (IOException e)
+ {
+ log.debug(this + " unable to write
Bisocket.CREATE_ORDINARY_SOCKET", e);
+ }
}
long timeRemaining = timeout;
- long start = System.currentTimeMillis();
+ long pingFailedWindow = 2 * pingWindow;
+ long pingFailedTimeRemaining = pingFailedWindow;
+ long start = System.currentTimeMillis();
+ OutputStream savedControlOutputStream = controlOutputStream;
- while (isConnected() && !pingFailed.flag && (timeout == 0 ||
timeRemaining > 0))
+ while (isConnected() && (!pingFailed.flag || pingFailedTimeRemaining >
0) && (timeout == 0 || timeRemaining > 0))
{
synchronized (sockets)
- {
+ {
try
{
sockets.wait(1000);
}
catch (InterruptedException e)
{
- log.debug("unexpected interrupt");
+ log.debug(this + " unexpected interrupt");
}
if (!sockets.isEmpty())
@@ -501,9 +537,26 @@
return socket;
}
}
-
+
+ if (savedControlOutputStream != controlOutputStream)
+ {
+ savedControlOutputStream = controlOutputStream;
+ log.debug(this + " rewriting Bisocket.CREATE_ORDINARY_SOCKET on " +
controlOutputStream);
+ try
+ {
+ controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
+ log.debug(this + " rewrote Bisocket.CREATE_ORDINARY_SOCKET");
+ }
+ catch (IOException e)
+ {
+ log.debug(this + " unable to rewrite
Bisocket.CREATE_ORDINARY_SOCKET" + e.getMessage());
+ }
+ }
+
+ long elapsed = System.currentTimeMillis() - start;
if (timeout > 0)
- timeRemaining = timeout - (System.currentTimeMillis() - start);
+ timeRemaining = timeout - elapsed;
+ pingFailedTimeRemaining = pingFailedWindow - elapsed;
}
if (!isConnected())
@@ -533,7 +586,7 @@
controlSocket = socket;
log.debug(this + " control socket replaced by: " + socket);
controlOutputStream = controlSocket.getOutputStream();
- controlInputStream = controlSocket.getInputStream();
+ log.debug("controlOutputStream replaced by: " + controlOutputStream);
}
if (pingTimerTask != null)
@@ -616,16 +669,18 @@
{
private Object controlLock;
private OutputStream controlOutputStream;
- private InputStream controlInputStream;
+ private int maxRetries;
+ private Exception savedException;
+ private boolean pingSent;
private BooleanHolder pingFailed;
- private boolean enablePingReplies;
-
+
PingTimerTask(BisocketClientInvoker invoker)
{
controlLock = invoker.controlLock;
controlOutputStream = invoker.controlOutputStream;
- controlInputStream = invoker.controlInputStream;
- enablePingReplies = invoker.enablePingReplies;
+ maxRetries = invoker.getMaxRetries();
+ pingFailed = invoker.pingFailed;
+ pingFailed.flag = false;
}
public void shutDown()
@@ -633,7 +688,6 @@
synchronized (controlLock)
{
controlOutputStream = null;
- controlInputStream = null;
}
cancel();
try
@@ -648,54 +702,42 @@
}
public void run()
- {
- boolean ok = false;
- try
+ {
+ pingSent = false;
+
+ for (int i = 0; i < maxRetries; i++)
{
- synchronized (controlLock)
+ try
{
- if (controlOutputStream == null || controlInputStream == null)
- return;
-
- controlOutputStream.write(Bisocket.PING);
- if (enablePingReplies)
+ synchronized (controlLock)
{
- int rep = controlInputStream.read();
- if (rep == -1)
- {
- // socket was closed!
- shutDown();
+ if (controlOutputStream == null)
return;
- }
- if (rep != Bisocket.PING)
- {
- // um - protocol error
- log.warn("Protocol error: received unexpected reply to ping on
control socket (are ping replies enabled on the server?); shutting down
PingTimerTask");
- shutDown();
- return;
- }
+
+ controlOutputStream.write(Bisocket.PING);
}
- ok = true;
+ pingSent = true;
+ break;
}
+ catch (Exception e)
+ {
+ savedException = e;
+ log.debug("Unable to send ping: trying again");
+ }
}
- catch (Exception e)
+
+ if (!pingSent)
{
- log.warn("Unable to send ping: shutting down PingTimerTask", e);
+ log.warn("Unable to send ping: shutting down PingTimerTask",
savedException);
+ pingFailed.flag = true;
+ shutDown();
}
- finally
- {
- if (! ok)
- {
- pingFailed.flag = true;
- shutDown();
- }
- }
}
}
static class BooleanHolder
{
- public volatile boolean flag;
+ public boolean flag;
public BooleanHolder(boolean flag)
{
Show replies by date