[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...
Ron Sigal
ron_sigal at yahoo.com
Wed Aug 29 03:16:01 EDT 2007
User: rsigal
Date: 07/08/29 03:16:01
Modified: src/main/org/jboss/remoting/transport/bisocket Tag:
remoting_2_x BisocketClientInvoker.java
Log:
JBREM-797: Added "running" variable to PingTimerTask. Also, synchronized with branch remoting_2_2_0_GA: JBREM-774: replaceControlSocket() and handleDisconnect() close control socket; JBREM-785: Declares a stack variable listenerId in transport(); JBREM-767: Network i/o moved from constructor to handleConnect(); JBREM-784: Added listenerIdToControlSocketsMap; JBREM-788: Synchronized access to static maps in handleDisconnect(); JBREM-767: handleDisconnect() wakes up any threads blocked in createSocket(); JBREM-766: Put wait() in a loop in handleConnect() and createSocket().
Revision Changes Path
No revision
No revision
1.1.2.20 +155 -62 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BisocketClientInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java,v
retrieving revision 1.1.2.19
retrieving revision 1.1.2.20
diff -u -b -r1.1.2.19 -r1.1.2.20
--- BisocketClientInvoker.java 2 Jun 2007 05:39:35 -0000 1.1.2.19
+++ BisocketClientInvoker.java 29 Aug 2007 07:16:01 -0000 1.1.2.20
@@ -72,6 +72,7 @@
private static Map listenerIdToClientInvokerMap = Collections.synchronizedMap(new HashMap());
private static Map listenerIdToCallbackClientInvokerMap = Collections.synchronizedMap(new HashMap());
private static Map listenerIdToSocketsMap = new HashMap();
+ private static Map listenerIdToControlSocketsMap = new HashMap();
private static Timer timer;
private static Object timerLock = new Object();
@@ -108,10 +109,24 @@
}
- static void transferSocket(String listenerId, Socket socket)
+ static void transferSocket(String listenerId, Socket socket, boolean isControlSocket)
{
Set sockets = null;
+ if (isControlSocket)
+ {
+ synchronized (listenerIdToControlSocketsMap)
+ {
+ sockets = (Set) listenerIdToControlSocketsMap.get(listenerId);
+ if (sockets == null)
+ {
+ sockets = new HashSet();
+ listenerIdToControlSocketsMap.put(listenerId, sockets);
+ }
+ }
+ }
+ else
+ {
synchronized (listenerIdToSocketsMap)
{
sockets = (Set) listenerIdToSocketsMap.get(listenerId);
@@ -121,6 +136,7 @@
listenerIdToSocketsMap.put(listenerId, sockets);
}
}
+ }
synchronized (sockets)
{
@@ -147,7 +163,7 @@
{
isCallbackInvoker = true;
listenerIdToCallbackClientInvokerMap.put(listenerId, this);
- log.debug("registered " + listenerId + " -> " + this);
+ log.debug(this + " :registered " + listenerId + " -> " + this);
}
// look for pingFrequency param
@@ -183,18 +199,46 @@
}
}
}
+ }
+
+ public int getMaxRetries()
+ {
+ return maxRetries;
+ }
+
+
+ public void setMaxRetries(int maxRetries)
+ {
+ this.maxRetries = maxRetries;
+ }
+
+
+ public int getPingFrequency()
+ {
+ return pingFrequency;
+ }
+
+ public void setPingFrequency(int pingFrequency)
+ {
+ this.pingFrequency = pingFrequency;
+ }
+
+
+ protected void handleConnect() throws ConnectionFailedException
+ {
+ // Callback client on server side.
if (isCallbackInvoker)
{
Set sockets = null;
- synchronized (listenerIdToSocketsMap)
+ synchronized (listenerIdToControlSocketsMap)
{
- sockets = (Set) listenerIdToSocketsMap.get(listenerId);
+ sockets = (Set) listenerIdToControlSocketsMap.get(listenerId);
if (sockets == null)
{
sockets = new HashSet();
- listenerIdToSocketsMap.put(listenerId, sockets);
+ listenerIdToControlSocketsMap.put(listenerId, sockets);
}
}
@@ -202,25 +246,40 @@
{
if (sockets.isEmpty())
{
+ long wait = timeout;
+ long start = System.currentTimeMillis();
+
+ while (timeout == 0 || wait > 0)
+ {
try
{
- sockets.wait(timeout);
+ sockets.wait(wait);
+ break;
}
- catch (InterruptedException ignored)
+ catch (InterruptedException e)
{
- log.warn("unexpected interrupt");
- throw new InterruptedIOException("Attempt to create control socket interrupted");
+ log.debug("unexpected interrupt");
+ if (timeout > 0)
+ wait = timeout - (System.currentTimeMillis() - start);
+ }
}
}
if (sockets.isEmpty())
- throw new IOException("Timed out trying to create control socket");
+ throw new ConnectionFailedException("Timed out trying to create control socket");
Iterator it = sockets.iterator();
controlSocket = (Socket) it.next();
it.remove();
+ try
+ {
controlOutputStream = controlSocket.getOutputStream();
- log.debug("got control socket: " + controlSocket);
+ }
+ catch (IOException e1)
+ {
+ throw new ConnectionFailedException("Unable to get control socket output stream");
+ }
+ log.debug("got control socket( " + listenerId + "): " + controlSocket);
pingTimerTask = new PingTimerTask(this);
synchronized (timerLock)
@@ -241,38 +300,7 @@
}
}
}
- }
- }
- public int getMaxRetries()
- {
- return maxRetries;
- }
-
-
- public void setMaxRetries(int maxRetries)
- {
- this.maxRetries = maxRetries;
- }
-
-
- public int getPingFrequency()
- {
- return pingFrequency;
- }
-
-
- public void setPingFrequency(int pingFrequency)
- {
- this.pingFrequency = pingFrequency;
- }
-
-
- protected void handleConnect() throws ConnectionFailedException
- {
- // Callback client on server side.
- if (isCallbackInvoker)
- {
// Bisocket callback client invoker doesn't share socket pools because of the danger
// that two distinct callback servers could have the same "artifical" port.
pool = new LinkedList();
@@ -290,6 +318,18 @@
{
if (isCallbackInvoker)
{
+ if (controlSocket != null)
+ {
+ try
+ {
+ controlSocket.close();
+ }
+ catch (IOException e)
+ {
+ log.debug("unable to close control socket: " + controlSocket);
+ }
+ }
+
listenerIdToCallbackClientInvokerMap.remove(listenerId);
for (Iterator it = pool.iterator(); it.hasNext();)
{
@@ -309,7 +349,26 @@
super.handleDisconnect();
}
- listenerIdToSocketsMap.remove(listenerId);
+ synchronized (listenerIdToControlSocketsMap)
+ {
+ listenerIdToControlSocketsMap.remove(listenerId);
+ }
+
+ Set sockets = null;
+ synchronized (listenerIdToSocketsMap)
+ {
+ sockets = (Set) listenerIdToSocketsMap.remove(listenerId);
+ }
+
+ // Wake up any threads blocked in createSocket().
+ if (sockets != null)
+ {
+ synchronized (sockets)
+ {
+ sockets.notifyAll();
+ }
+ }
+
if (pingTimerTask != null)
pingTimerTask.shutDown();
}
@@ -324,6 +383,7 @@
Marshaller marshaller, UnMarshaller unmarshaller)
throws IOException, ConnectionFailedException, ClassNotFoundException
{
+ String listenerId = null;
if (invocation instanceof InvocationRequest)
{
InvocationRequest ir = (InvocationRequest) invocation;
@@ -371,6 +431,12 @@
synchronized (listenerIdToSocketsMap)
{
sockets = (Set) listenerIdToSocketsMap.get(listenerId);
+
+ if (sockets == null)
+ {
+ sockets = new HashSet();
+ listenerIdToSocketsMap.put(listenerId, sockets);
+ }
}
synchronized (controlLock)
@@ -382,24 +448,37 @@
{
if (sockets.isEmpty())
{
+ long wait = timeout;
+ long start = System.currentTimeMillis();
+
+ while (timeout == 0 || wait > 0)
+ {
try
{
- sockets.wait(timeout);
+ sockets.wait(wait);
+ break;
}
catch (InterruptedException e)
{
- log.warn("unexpected interrupt");
- throw new InterruptedIOException("Attempt to create callback socket interrupted");
+ log.debug("unexpected interrupt");
+ if (timeout > 0)
+ wait = timeout - (System.currentTimeMillis() - start);
+ }
}
}
if (sockets.isEmpty())
+ {
+ if (!isConnected())
+ throw new IOException("Unable to create socket: invoker is disconnected");
+ else
throw new IOException("Timed out trying to create socket");
+ }
Iterator it = sockets.iterator();
Socket socket = (Socket) it.next();
it.remove();
- log.debug("found socket: " + socket);
+ log.debug(this + " found socket (" + listenerId + "): " + socket);
return socket;
}
}
@@ -409,9 +488,15 @@
{
synchronized (controlLock)
{
+ if (controlSocket != null)
+ {
+ controlSocket.close();
+ }
+
+ log.debug(this + " replacing control socket: " + controlSocket);
controlSocket = socket;
+ log.debug(this + " control socket replaced by: " + socket);
controlOutputStream = controlSocket.getOutputStream();
- log.debug("replaced control socket");
}
if (pingTimerTask != null)
@@ -494,6 +579,7 @@
private int maxRetries;
private Exception savedException;
private boolean pingSent;
+ private boolean running = true;
PingTimerTask(BisocketClientInvoker invoker)
{
@@ -506,6 +592,7 @@
{
synchronized (controlLock)
{
+ running = false;
controlOutputStream = null;
}
cancel();
@@ -521,6 +608,9 @@
{
synchronized (controlLock)
{
+ if (!running)
+ return;
+
controlOutputStream.write(Bisocket.PING);
}
pingSent = true;
@@ -533,6 +623,9 @@
}
}
+ if (!running)
+ return;
+
if (!pingSent)
{
log.warn("Unable to send ping: shutting down PingTimerTask", savedException);
More information about the jboss-cvs-commits
mailing list