[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...
Ron Sigal
ron_sigal at yahoo.com
Sat Mar 10 17:11:12 EST 2007
User: rsigal
Date: 07/03/10 17:11:12
Modified: src/main/org/jboss/remoting/transport/bisocket Tag:
remoting_2_x BisocketClientInvoker.java
Log:
JBREM-721, JBREM-722, JBREM-723: (1) transport() looks for REMOVELISTENER; (2) made PingTimerTask a static class; (3) creates control connection in constructor; (4) transport() checks for pull callback connections.
Revision Changes Path
No revision
No revision
1.1.2.11 +87 -61 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BisocketClientInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java,v
retrieving revision 1.1.2.10
retrieving revision 1.1.2.11
diff -u -b -r1.1.2.10 -r1.1.2.11
--- BisocketClientInvoker.java 23 Feb 2007 06:56:35 -0000 1.1.2.10
+++ BisocketClientInvoker.java 10 Mar 2007 22:11:12 -0000 1.1.2.11
@@ -23,6 +23,7 @@
package org.jboss.remoting.transport.bisocket;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Collections;
@@ -139,17 +140,10 @@
{
isCallbackInvoker = true;
listenerIdToCallbackClientInvokerMap.put(listenerId, this);
-
- synchronized (listenerIdToSocketsMap)
- {
- if (listenerIdToSocketsMap.get(listenerId) == null)
- listenerIdToSocketsMap.put(listenerId, new HashSet());
- }
-
log.debug("registered " + listenerId + " -> " + this);
}
- // look for socketTimeout param
+ // look for pingFrequency param
Object val = config.get(Bisocket.PING_FREQUENCY);
if (val != null)
{
@@ -166,6 +160,52 @@
}
}
}
+
+ if (isCallbackInvoker)
+ {
+ Set sockets = null;
+
+ synchronized (listenerIdToSocketsMap)
+ {
+ sockets = (Set) listenerIdToSocketsMap.get(listenerId);
+ if (sockets == null)
+ {
+ sockets = new HashSet();
+ listenerIdToSocketsMap.put(listenerId, sockets);
+ }
+ }
+
+ synchronized (sockets)
+ {
+ if (sockets.isEmpty())
+ {
+ try
+ {
+ sockets.wait(timeout);
+ }
+ catch (InterruptedException ignored)
+ {
+ log.warn("unexpected interrupt");
+ throw new InterruptedIOException("Attempt to create control socket interrupted");
+ }
+ }
+
+ if (sockets.isEmpty())
+ throw new IOException("Timed out trying to create control socket");
+
+ Iterator it = sockets.iterator();
+ controlSocket = (Socket) it.next();
+ it.remove();
+ controlOutputStream = controlSocket.getOutputStream();
+ log.debug("got control socket: " + controlSocket);
+ pingTimerTask = new PingTimerTask(this);
+ if (timer == null)
+ {
+ timer = new Timer(true);
+ }
+ timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
+ }
+ }
}
@@ -254,7 +294,8 @@
if (o instanceof InternalInvocation)
{
InternalInvocation ii = (InternalInvocation) o;
- if (InternalInvocation.ADDLISTENER.equals(ii.getMethodName()))
+ if (InternalInvocation.ADDLISTENER.equals(ii.getMethodName())
+ && ir.getLocator() != null) // getLocator() == null for pull callbacks
{
Map requestPayload = ir.getRequestPayload();
listenerId = (String) requestPayload.get(Client.LISTENER_ID_KEY);
@@ -263,6 +304,17 @@
callbackServerInvoker = BisocketServerInvoker.getBisocketServerInvoker(listenerId);
callbackServerInvoker.createControlConnection(listenerId, secondaryLocator);
}
+ else if (InternalInvocation.REMOVELISTENER.equals(ii.getMethodName()))
+ {
+ Map requestPayload = ir.getRequestPayload();
+ listenerId = (String) requestPayload.get(Client.LISTENER_ID_KEY);
+ listenerIdToClientInvokerMap.remove(listenerId);
+ BisocketServerInvoker callbackServerInvoker;
+ callbackServerInvoker = BisocketServerInvoker.getBisocketServerInvoker(listenerId);
+
+ if (callbackServerInvoker != null)
+ callbackServerInvoker.destroyControlConnection(listenerId);
+ }
}
}
@@ -275,53 +327,18 @@
if (!isCallbackInvoker)
return super.createSocket(address, port, timeout);
- Set sockets = null;
-
- synchronized (listenerIdToSocketsMap)
- {
- sockets = (Set) listenerIdToSocketsMap.get(listenerId);
- if (sockets == null)
- {
- sockets = new HashSet();
- listenerIdToSocketsMap.put(listenerId, sockets);
- }
- }
-
- if (controlSocket == null)
+ if (timeout < 0)
{
- synchronized (sockets)
- {
- if (sockets.isEmpty())
- {
- while (true)
- {
- try
- {
- sockets.wait(1000);
- break;
- }
- catch (InterruptedException ignored)
- {
- log.warn("unexpected interrupt");
- }
- }
+ timeout = getTimeout();
+ if (timeout < 0)
+ timeout = 0;
}
- if (sockets.isEmpty())
- throw new IOException("Timed out trying to create control socket");
+ Set sockets = null;
- Iterator it = sockets.iterator();
- controlSocket = (Socket) it.next();
- it.remove();
- controlOutputStream = controlSocket.getOutputStream();
- log.debug("got control socket: " + controlSocket);
- pingTimerTask = new PingTimerTask();
- if (timer == null)
+ synchronized (listenerIdToSocketsMap)
{
- timer = new Timer(true);
- }
- timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
- }
+ sockets = (Set) listenerIdToSocketsMap.get(listenerId);
}
synchronized (controlLock)
@@ -333,17 +350,14 @@
{
if (sockets.isEmpty())
{
- while (true)
- {
try
{
- sockets.wait(1000);
- break;
+ sockets.wait(timeout);
}
- catch (InterruptedException ignored)
+ catch (InterruptedException e)
{
log.warn("unexpected interrupt");
- }
+ throw new InterruptedIOException("Attempt to create callback socket interrupted");
}
}
@@ -371,7 +385,7 @@
if (pingTimerTask != null)
pingTimerTask.cancel();
- pingTimerTask = new PingTimerTask();
+ pingTimerTask = new PingTimerTask(this);
if (timer == null)
{
timer = new Timer(true);
@@ -427,10 +441,23 @@
}
- class PingTimerTask extends TimerTask
+ static class PingTimerTask extends TimerTask
+ {
+ private Object controlLock;
+ private OutputStream controlOutputStream;
+
+ PingTimerTask(BisocketClientInvoker invoker)
{
+ controlLock = invoker.controlLock;
+ controlOutputStream = invoker.controlOutputStream;
+ }
+
public void shutDown()
{
+ synchronized (controlLock)
+ {
+ controlOutputStream = null;
+ }
cancel();
}
@@ -445,8 +472,7 @@
catch (IOException e)
{
log.warn("Unable to send ping: shutting down PingTimerTask");
- pingTimerTask = null;
- cancel();
+ shutDown();
}
}
}
More information about the jboss-cvs-commits
mailing list