[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...
Ron Sigal
ron_sigal at yahoo.com
Sat Dec 30 02:47:51 EST 2006
User: rsigal
Date: 06/12/30 02:47:51
Modified: src/main/org/jboss/remoting/transport/bisocket Tag:
remoting_2_x BisocketClientInvoker.java
Log:
JBREM-650: (1) Distinguised behavior on client and server; (2) reorganized createSocket(); (3) added controlLock.
Revision Changes Path
No revision
No revision
1.1.2.3 +129 -99 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BisocketClientInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java,v
retrieving revision 1.1.2.2
retrieving revision 1.1.2.3
diff -u -b -r1.1.2.2 -r1.1.2.3
--- BisocketClientInvoker.java 19 Dec 2006 06:06:06 -0000 1.1.2.2
+++ BisocketClientInvoker.java 30 Dec 2006 07:47:51 -0000 1.1.2.3
@@ -22,7 +22,6 @@
package org.jboss.remoting.transport.bisocket;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
@@ -32,6 +31,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
import java.util.TimerTask;
import org.jboss.logging.Logger;
@@ -44,11 +44,20 @@
import org.jboss.remoting.marshal.UnMarshaller;
import org.jboss.remoting.transport.BidirectionalClientInvoker;
import org.jboss.remoting.transport.socket.SocketClientInvoker;
-import org.jboss.remoting.util.TimerUtil;
/**
- * BisocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker,
- * which must be a BisocketServerInvoker.
+ * The bisocket transport, an extension of the socket transport, is designed to allow
+ * a callback server to function behind a firewall. All connections are created by
+ * a Socket constructor or factory on the client side connecting to a ServerSocket on
+ * the server side. When a callback client invoker on the server side needs to
+ * open a connection to the callback server, it requests a connection by sending a
+ * request message over a control connection to the client side.
+ *
+ * Because all connections are created in one direction, the bisocket transport is
+ * asymmetric, in the sense that client invokers and server invokers behave differently
+ * on the client side and on the server side.
+ *
+ *
*
* @author <a href="mailto:ron.sigal at jboss.com">Ron Sigal</a>
*/
@@ -58,22 +67,40 @@
{
private static final Logger log = Logger.getLogger(BisocketClientInvoker.class);
private static Map listenerIdToClientInvokerMap = Collections.synchronizedMap(new HashMap());
+ private static Map listenerIdToCallbackClientInvokerMap = Collections.synchronizedMap(new HashMap());
private static Map listenerIdToSocketsMap = new HashMap();
+ private static Timer timer;
- private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;;
protected String listenerId;
+
+ private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;;
private InvokerLocator secondaryLocator;
private Socket controlSocket;
private OutputStream controlOutputStream;
+ private Object controlLock = new Object();
private PingTimerTask pingTimerTask;
+ private boolean isCallbackInvoker;
+ /**
+ *
+ * FIXME Comment this
+ *
+ * @param listenerId
+ * @return
+ */
static BisocketClientInvoker getBisocketClientInvoker(String listenerId)
{
return (BisocketClientInvoker) listenerIdToClientInvokerMap.get(listenerId);
}
+ static BisocketClientInvoker getBisocketCallbackClientInvoker(String listenerId)
+ {
+ return (BisocketClientInvoker) listenerIdToCallbackClientInvokerMap.get(listenerId);
+ }
+
+
static void transferSocket(String listenerId, Socket socket)
{
Set sockets = null;
@@ -111,7 +138,8 @@
listenerId = (String) config.get(Client.LISTENER_ID_KEY);
if (listenerId != null)
{
- listenerIdToClientInvokerMap.put(listenerId, this);
+ isCallbackInvoker = true;
+ listenerIdToCallbackClientInvokerMap.put(listenerId, this);
synchronized (listenerIdToSocketsMap)
{
@@ -119,7 +147,7 @@
listenerIdToSocketsMap.put(listenerId, new HashSet());
}
- log.info("registered " + listenerId + " -> " + this);
+ log.debug("registered " + listenerId + " -> " + this);
}
// look for socketTimeout param
@@ -151,7 +179,6 @@
public void setPingFrequency(int pingFrequency)
{
this.pingFrequency = pingFrequency;
- log.info("set ping frequency: " + pingFrequency);
}
@@ -162,22 +189,13 @@
// Callback client on server side.
if (listenerId != null)
{
-// pingTimerTask = new PingTimerTask();
-// TimerUtil.schedule(pingTimerTask, pingFrequency);
-// log.info("scheduled PingTimerTask");
return;
}
// Client on client side.
try
{
- InternalInvocation ii = new InternalInvocation(Bisocket.GET_SECONDARY_INVOKER_LOCATOR, null);
- InvocationRequest r = new InvocationRequest(null, null, ii, null, null, null);
-// secondaryLocator = (InvokerLocator) invoke(r);
- Object o = invoke(r);
- log.info("secondary locator: " + o);
- secondaryLocator = (InvokerLocator) o;
- log.info("got secondary locator: " + secondaryLocator);
+ secondaryLocator = getSecondaryLocator();
}
catch (Throwable e)
{
@@ -192,7 +210,11 @@
super.handleDisconnect();
if (listenerId != null)
{
+ if (isCallbackInvoker)
+ listenerIdToCallbackClientInvokerMap.remove(listenerId);
+ else
listenerIdToClientInvokerMap.remove(listenerId);
+
listenerIdToSocketsMap.remove(listenerId);
if (pingTimerTask != null)
pingTimerTask.shutDown();
@@ -215,6 +237,7 @@
{
Map requestPayload = ir.getRequestPayload();
String listenerId = (String) requestPayload.get(Client.LISTENER_ID_KEY);
+ listenerIdToClientInvokerMap.put(listenerId, this);
BisocketServerInvoker callbackServerInvoker;
callbackServerInvoker = BisocketServerInvoker.getBisocketServerInvoker(listenerId);
callbackServerInvoker.createControlConnection(listenerId, secondaryLocator);
@@ -231,91 +254,85 @@
if (listenerId == null)
return super.createSocket(address, port);
- Socket socket = null;
Set sockets = null;
synchronized (listenerIdToSocketsMap)
{
sockets = (Set) listenerIdToSocketsMap.get(listenerId);
- }
-
- synchronized (sockets)
- {
- Iterator it = sockets.iterator();
- if (it.hasNext())
- {
- socket = (Socket) it.next();
- it.remove();
-
- if (controlSocket != null)
- return socket;
- else
- {
- controlSocket = socket;
- controlOutputStream = controlSocket.getOutputStream();
- log.info("got control socket");
- pingTimerTask = new PingTimerTask();
- TimerUtil.schedule(pingTimerTask, pingFrequency);
- log.info("scheduled PingTimerTask");
-
- if (it.hasNext())
+ if (sockets == null)
{
- socket = (Socket) it.next();
- it.remove();
- return socket;
- }
+ sockets = new HashSet();
+ listenerIdToSocketsMap.put(listenerId, sockets);
}
}
if (controlSocket == null)
{
+ synchronized (sockets)
+ {
+ if (sockets.isEmpty())
+ {
while (true)
{
try
{
- sockets.wait(getTimeout());
- if (sockets.isEmpty())
- throw new IOException("Timed out trying to create control socket");
+ sockets.wait(1000);
break;
}
- catch (InterruptedException ignored) {}
+ catch (InterruptedException ignored)
+ {
+ log.warn("unexpected interrupt");
+ }
+ }
}
- it = sockets.iterator();
+ if (sockets.isEmpty())
+ throw new IOException("Timed out trying to create control socket");
+
+ Iterator it = sockets.iterator();
controlSocket = (Socket) it.next();
- controlOutputStream = controlSocket.getOutputStream();
it.remove();
- log.info("got control socket");
+ controlOutputStream = controlSocket.getOutputStream();
+ log.debug("got control socket");
pingTimerTask = new PingTimerTask();
- TimerUtil.schedule(pingTimerTask, pingFrequency);
- log.info("scheduled PingTimerTask");
+ if (timer == null)
+ {
+ timer = new Timer();
+ }
+ timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
+ }
}
- log.info("requesting socket");
-
- synchronized (controlOutputStream)
+ synchronized (controlLock)
{
controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
}
-// controlOutputStream.writeUTF(listenerId);
+ synchronized (sockets)
+ {
+ if (sockets.isEmpty())
+ {
while (true)
{
try
{
-// sockets.wait(getTimeout());
sockets.wait(1000);
- if (sockets.isEmpty())
- throw new IOException("Timed out trying to create socket");
break;
}
- catch (InterruptedException ignored) {}
+ catch (InterruptedException ignored)
+ {
+ log.warn("unexpected interrupt");
+ }
+ }
}
- it = sockets.iterator();
- socket = (Socket) it.next();
+ if (sockets.isEmpty())
+ throw new IOException("Timed out trying to create socket");
+
+ Iterator it = sockets.iterator();
+ Socket socket = (Socket) it.next();
it.remove();
- log.info("got socket: " + socket);
+ log.debug("socket found");
return socket;
}
}
@@ -323,21 +340,35 @@
void replaceControlSocket(Socket socket) throws IOException
{
- synchronized (controlSocket)
+ synchronized (controlLock)
{
controlSocket = socket;
- }
controlOutputStream = controlSocket.getOutputStream();
+ log.debug("replaced control socket");
+ }
if (pingTimerTask != null)
pingTimerTask.cancel();
pingTimerTask = new PingTimerTask();
- TimerUtil.schedule(pingTimerTask, pingFrequency);
- log.info("replaced PingTimerTask");
+ if (timer == null)
+ {
+ timer = new Timer();
+ }
+ timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
}
+ InvokerLocator getSecondaryLocator() throws Throwable
+ {
+ InternalInvocation ii = new InternalInvocation(Bisocket.GET_SECONDARY_INVOKER_LOCATOR, null);
+ InvocationRequest r = new InvocationRequest(null, null, ii, null, null, null);
+ Object o = invoke(r);
+ log.debug("secondary locator: " + o);
+ secondaryLocator = (InvokerLocator) o;
+ return secondaryLocator;
+ }
+
protected Object checkType(Object o, Class c) throws IOException
{
if (c.isInstance(o))
@@ -386,11 +417,10 @@
public void run()
{
- synchronized (controlOutputStream)
+ synchronized (controlLock)
{
try
{
- log.info("sending ping");
controlOutputStream.write(Bisocket.PING);
}
catch (IOException e)
More information about the jboss-cvs-commits
mailing list