[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...
Ron Sigal
ron_sigal at yahoo.com
Wed Mar 14 01:06:25 EDT 2007
User: rsigal
Date: 07/03/14 01:06:25
Modified: src/main/org/jboss/remoting/transport/bisocket Tag:
remoting_2_x BisocketServerInvoker.java
Log:
JBREM-725, JBREM-726: (1) Creates control connection socket in loop; (2) fixed NPE in createControlConnection(); (3) added control connection teardown to handleInternalInvocation(); (4) added some synchronization;
Revision Changes Path
No revision
No revision
1.1.2.15 +127 -20 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BisocketServerInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java,v
retrieving revision 1.1.2.14
retrieving revision 1.1.2.15
diff -u -b -r1.1.2.14 -r1.1.2.15
--- BisocketServerInvoker.java 12 Mar 2007 18:59:26 -0000 1.1.2.14
+++ BisocketServerInvoker.java 14 Mar 2007 05:06:25 -0000 1.1.2.15
@@ -25,6 +25,7 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -57,7 +58,7 @@
/**
*
* @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
- * @version $Revision: 1.1.2.14 $
+ * @version $Revision: 1.1.2.15 $
* <p>
* Copyright Nov 23, 2006
* </p>
@@ -69,7 +70,7 @@
private static Map listenerIdToServerInvokerMap = Collections.synchronizedMap(new HashMap());
private static Timer timer;
- private Map listenerIdToInvokerLocatorMap = new HashMap();
+ private Map listenerIdToInvokerLocatorMap = Collections.synchronizedMap(new HashMap());
private ServerSocket secondaryServerSocket;
private InvokerLocator secondaryLocator;
private SecondaryServerSocketThread secondaryServerSocketThread;
@@ -77,6 +78,7 @@
private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;
private int pingWindowFactor = Bisocket.PING_WINDOW_FACTOR_DEFAULT;
private int pingWindow = pingWindowFactor * pingFrequency;
+ private int socketCreationRetries = Bisocket.MAX_RETRIES_DEFAULT;
private ControlMonitorTimerTask controlMonitorTimerTask;
protected boolean isCallbackServer = false;
@@ -103,6 +105,22 @@
{
if (isCallbackServer)
{
+ Object val = configuration.get(Bisocket.MAX_RETRIES);
+ if (val != null)
+ {
+ try
+ {
+ int nVal = Integer.valueOf((String) val).intValue();
+ socketCreationRetries = nVal;
+ log.debug("Setting socket creation retry limit: " + socketCreationRetries);
+ }
+ catch (Exception e)
+ {
+ log.warn("Could not convert " + Bisocket.MAX_RETRIES +
+ " value of " + val + " to an int value.");
+ }
+ }
+
if(maxPoolSize <= 0)
{
maxPoolSize = MAX_POOL_SIZE_DEFAULT;
@@ -155,6 +173,12 @@
firstConnection = false;
BisocketClientInvoker clientInvoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
+ if (clientInvoker == null)
+ {
+ log.warn("Unable to retrieve client invoker: must have disconnected");
+ throw new ClientUnavailableException();
+ }
+
try
{
locator = clientInvoker.getSecondaryLocator();
@@ -162,7 +186,7 @@
}
catch (Throwable t)
{
- log.error("unable to get secondary locator");
+ log.error("unable to get secondary locator", t);
throw new IOException("unable to get secondary locator: " + t.getMessage());
}
}
@@ -176,10 +200,43 @@
log.debug("creating control connection: " + locator);
Socket socket = null;
+ IOException savedException = null;
+
+ for (int i = 0; i < socketCreationRetries; i++)
+ {
+ try
+ {
if (socketFactory != null)
socket = socketFactory.createSocket(locator.getHost(), locator.getPort());
else
socket = new Socket(locator.getHost(), locator.getPort());
+ }
+ catch (IOException e)
+ {
+ log.debug("Error creating a control socket", e);
+ savedException = e;
+ }
+
+ if (socket != null)
+ break;
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ log.debug("received interrupt");
+ throw new InterruptedIOException("interrupt in createControlConnection()");
+ }
+ }
+
+ if (socket == null)
+ {
+ log.error("unable to create control connection after "
+ + socketCreationRetries + " retries", savedException);
+ throw savedException;
+ }
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
if (firstConnection)
@@ -218,10 +275,21 @@
public void destroyControlConnection(String listenerId) throws IOException
{
listenerIdToInvokerLocatorMap.remove(listenerId);
- Thread t = (Thread) controlConnectionThreadMap.remove(listenerId);
+ Thread t = null;
+
+ synchronized (controlConnectionThreadMap)
+ {
+ t = (Thread) controlConnectionThreadMap.remove(listenerId);
+ }
+
+ if (t != null)
+ {
((ControlConnectionThread)t).shutdown();
- controlMonitorTimerTask.shutdown();
- controlMonitorTimerTask = null;
+ }
+ else
+ {
+ log.warn("unrecognized listener ID: " + listenerId);
+ }
}
@@ -312,12 +380,17 @@
if (controlMonitorTimerTask != null)
controlMonitorTimerTask.shutdown();
- Iterator it = controlConnectionThreadMap.values().iterator();
+ Iterator it = null;
+ synchronized (controlConnectionThreadMap)
+ {
+ it = controlConnectionThreadMap.values().iterator();
+ }
+
while (it.hasNext())
{
ControlConnectionThread t = (ControlConnectionThread) it.next();
- t.shutdown();
it.remove();
+ t.shutdown();
}
if (secondaryServerSocketThread != null)
@@ -452,6 +525,8 @@
if (listenerId != null)
{
listenerIdToServerInvokerMap.remove(listenerId);
+ BisocketClientInvoker.removeBisocketClientInvoker(listenerId);
+ destroyControlConnection(listenerId);
}
}
}
@@ -538,7 +613,9 @@
case Bisocket.CREATE_ORDINARY_SOCKET:
InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
- for (int i = 0; i < 10; i++)
+ IOException savedException = null;
+
+ for (int i = 0; i < socketCreationRetries; i++)
{
try
{
@@ -547,9 +624,10 @@
else
socket = new Socket(locator.getHost(), locator.getPort());
}
- catch (Exception e)
+ catch (IOException e)
{
log.debug("Error creating a socket", e);
+ savedException = e;
}
if (socket != null)
@@ -561,9 +639,24 @@
}
catch (InterruptedException e)
{
- log.debug("unexpected interruption");
+ if (running)
+ {
+ log.warn("received unexpected interrupt");
+ continue;
+ }
+ else
+ {
+ return;
}
}
+ }
+
+ if (socket == null)
+ {
+ log.error("Unable to create socket after " + socketCreationRetries
+ + " retries", savedException);
+ continue;
+ }
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
dos.write(Bisocket.CREATE_ORDINARY_SOCKET);
@@ -747,7 +840,10 @@
}
if (controlConnectionThreads.isEmpty())
+ {
shutdown();
+ return;
+ }
Iterator it = controlConnectionThreads.iterator();
while (it.hasNext())
@@ -763,7 +859,7 @@
controlConnectionThreadMap.remove(t.getListenerId());
}
- new Thread()
+ Thread t2 = new Thread()
{
public void run()
{
@@ -776,17 +872,28 @@
{
invoker.createControlConnection(t.getListenerId(), null);
}
+ catch (ClientUnavailableException e)
+ {
+ Object locator = listenerIdToInvokerLocatorMap.get(t.getListenerId());
+ log.warn("Unable to recreate control connection: " + locator);
+ }
catch (IOException e)
{
- InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(t.getListenerId());
+ Object locator = listenerIdToInvokerLocatorMap.get(t.getListenerId());
log.error("Unable to recreate control connection: " + locator, e);
}
}
}
- }.start();
+ };
+ t2.setName("controlConnectionRecreate:" + t.getName());
+ t2.start();
}
}
}
}
+
+ static class ClientUnavailableException extends IOException
+ {
+ }
}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list