[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...
Ron Sigal
ron_sigal at yahoo.com
Tue Jan 16 03:16:43 EST 2007
User: rsigal
Date: 07/01/16 03:16:43
Modified: src/main/org/jboss/remoting/transport/bisocket
BisocketServerInvoker.java
BisocketClientInvoker.java
Log:
JBREM-650: Updated to conform to new version on remoting_2_x branch.
Revision Changes Path
1.3 +155 -182 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BisocketServerInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -b -r1.2 -r1.3
--- BisocketServerInvoker.java 27 Dec 2006 05:58:10 -0000 1.2
+++ BisocketServerInvoker.java 16 Jan 2007 08:16:43 -0000 1.3
@@ -35,6 +35,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
+import java.util.Timer;
import java.util.TimerTask;
import org.jboss.remoting.Client;
@@ -46,20 +47,23 @@
import org.jboss.remoting.transport.socket.LRUPool;
import org.jboss.remoting.transport.socket.ServerThread;
import org.jboss.remoting.transport.socket.SocketServerInvoker;
-import org.jboss.remoting.util.TimerUtil;
+import org.jboss.logging.Logger;
/**
*
* @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
* <p>
* Copyright Nov 23, 2006
* </p>
*/
public class BisocketServerInvoker extends SocketServerInvoker
{
+ private static final Logger log = Logger.getLogger(BisocketServerInvoker.class);
+
private static Map listenerIdToServerInvokerMap = Collections.synchronizedMap(new HashMap());
+ private static Timer timer;
private Map listenerIdToInvokerLocatorMap = new HashMap();
private ServerSocket secondaryServerSocket;
@@ -78,12 +82,6 @@
}
-// public static void addBisocketServerInvoker(String listenerId, BisocketServerInvoker invoker)
-// {
-// listenerIdToServerInvokerMap.put(listenerId, invoker);
-// }
-
-
public BisocketServerInvoker(InvokerLocator locator)
{
super(locator);
@@ -128,11 +126,8 @@
secondaryServerSocketThread.setName("secondaryServerSocketThread");
secondaryServerSocketThread.setDaemon(true);
secondaryServerSocketThread.start();
- log.info("started secondary port: " + host + ":" + freePort);
+ log.debug("started secondary port: " + host + ":" + freePort);
}
- log.info("started: " + this);
-// ServerInvocationHandler handler = new InternalInvocationHandler(secondaryServerSocket);
-// addInvocationHandler(Bisocket.BISOCKET_INTERNAL_SUBSYSTEM, handler);
}
@@ -147,44 +142,47 @@
{
boolean firstConnection;
- // first connection
+ // restarting connection
if (locator == null)
{
firstConnection = false;
- locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
+ BisocketClientInvoker clientInvoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
+
+ try
+ {
+ locator = clientInvoker.getSecondaryLocator();
+ }
+ catch (Throwable t)
+ {
+ log.error("unable to get secondary locator");
+ throw new IOException("unable to get secondary locator: " + t.getMessage());
}
- // restarted connection
+ }
+ // first connection
else
{
firstConnection = true;
listenerIdToInvokerLocatorMap.put(listenerId, locator);
}
- log.info(this + ": creating control connection: " + locator);
+ log.debug("creating control connection: " + locator);
Socket socket = null;
if (socketFactory != null)
socket = socketFactory.createSocket(locator.getHost(), locator.getPort());
else
socket = new Socket(locator.getHost(), locator.getPort());
- log.info(this + ": created socket: " + socket);
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
- log.info("got DataOutputStream: " + dos);
if (firstConnection)
{
- log.info("writing: " + Bisocket.CREATE_ORDINARY_SOCKET);
dos.write(Bisocket.CREATE_ORDINARY_SOCKET);
- log.info("wrote: " + Bisocket.CREATE_ORDINARY_SOCKET);
}
else
{
- log.info("writing: " + Bisocket.CREATE_CONTROL_SOCKET);
dos.write(Bisocket.CREATE_CONTROL_SOCKET);
- log.info("wrote: " + Bisocket.CREATE_CONTROL_SOCKET);
}
dos.writeUTF(listenerId);
- log.info(this + ": wrote listenerId: " + listenerId);
Thread thread = new ControlConnectionThread(socket, listenerId);
thread.setName("control: " + socket.toString());
thread.setDaemon(true);
@@ -195,14 +193,16 @@
}
thread.start();
- log.info("created control connection: " + locator);
+ log.debug("created control connection: " + socket.toString());
if (controlMonitorTimerTask == null)
{
+ if (timer == null)
+ {
+ timer = new Timer();
+ }
controlMonitorTimerTask = new ControlMonitorTimerTask();
- TimerUtil.schedule(controlMonitorTimerTask, pingFrequency);
- log.info(this + ": scheduled ControlMonitorTimerTask: " + controlMonitorTimerTask);
- log.info("pingFrequency: " + pingFrequency);
+ timer.schedule(controlMonitorTimerTask, pingFrequency, pingFrequency);
}
}
@@ -217,7 +217,6 @@
{
this.pingFrequency = pingFrequency;
pingWindow = 2 * pingFrequency;
- log.info("set ping frequency: " + pingFrequency);
}
@@ -243,13 +242,10 @@
protected void cleanup()
{
super.cleanup();
- log.info(this + ": $$$entering cleanup()");
- log.info("controlMonitorTimerTask: " + this.controlMonitorTimerTask);
if (controlMonitorTimerTask != null)
- {
controlMonitorTimerTask.shutdown();
- }
+
Iterator it = controlConnectionThreadMap.values().iterator();
while (it.hasNext())
{
@@ -257,19 +253,15 @@
t.shutdown();
it.remove();
}
- log.info("secondaryServerSocketThread: " + secondaryServerSocketThread);
+
if (secondaryServerSocketThread != null)
- {
secondaryServerSocketThread.shutdown();
- }
+
if (secondaryServerSocket != null)
{
try
{
secondaryServerSocket.close();
- log.info("closed secondary port: " +
- secondaryServerSocket.getInetAddress() + ":" +
- secondaryServerSocket.getLocalPort());
}
catch (IOException e)
{
@@ -297,7 +289,6 @@
{
if(Bisocket.GET_SECONDARY_INVOKER_LOCATOR.equals(ii.getMethodName()))
{
- log.info("returning secondaryLocator: " + secondaryLocator);
return secondaryLocator;
}
@@ -312,8 +303,6 @@
if (listenerId != null)
{
listenerIdToServerInvokerMap.put(listenerId, this);
- log.info("registered " + listenerId + " -> " + this);
-// new Exception().printStackTrace();
}
}
}
@@ -360,12 +349,14 @@
{
if (lastPing < 0)
{
- log.info("returning true");
return true;
}
long currentTime = System.currentTimeMillis();
- log.info(this + ": elapsed: " + (currentTime - lastPing));
- log.info("returning: " + ((currentTime - lastPing > pingWindow) ? false : true));
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("elapsed: " + (currentTime - lastPing));
+ }
return (currentTime - lastPing <= pingWindow);
}
@@ -376,7 +367,6 @@
public void run()
{
- log.info(this + ": starting ControlConnectionThread");
running = true;
while (running)
{
@@ -390,8 +380,6 @@
switch (action)
{
case Bisocket.CREATE_ORDINARY_SOCKET:
-// String listenerId = dis.readUTF();
-// dis.readUTF();
InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
if (socketFactory != null)
socket = socketFactory.createSocket(locator.getHost(), locator.getPort());
@@ -403,8 +391,6 @@
break;
case Bisocket.PING:
- log.info(this + ": got ping");
-// log.info("lastPing: " + lastPing);
continue;
case -1:
@@ -421,12 +407,13 @@
if (running)
{
if ("socket closed".equals(e.getMessage()) ||
- "Socket is closed".equals(e.getMessage()))
+ "Socket is closed".equals(e.getMessage()) ||
+ "Connection reset".equals(e.getMessage()))
{
shutdown();
return;
}
- log.error("Unable to create new Socket: " + e.getMessage());
+ log.error("Unable to read from control connection: " + e.getMessage());
e.printStackTrace();
if (++errorCount > 5)
{
@@ -450,7 +437,9 @@
clientpool, threadpool,
getTimeout(), serverSocketClass);
thread.start();
- log.info("created: " + thread);
+
+ if (log.isDebugEnabled())
+ log.debug("created: " + thread);
}
catch (Exception e)
{
@@ -469,12 +458,6 @@
}
-// protected ServerThread createServerThread(Socket socket) throws Exception
-// {
-// return new BisocketServerThread(socket, this, clientpool, threadpool, getTimeout(), serverSocketClass);
-// }
-
-
class SecondaryServerSocketThread extends Thread
{
private ServerSocket secondaryServerSocket;
@@ -489,7 +472,6 @@
{
running = false;
interrupt();
- log.info(this + ": shut down");
}
public void run()
@@ -499,32 +481,31 @@
try
{
Socket socket = secondaryServerSocket.accept();
- log.info(this + ": accepted: " + socket);
+ log.debug("accepted: " + socket);
DataInputStream dis = new DataInputStream(socket.getInputStream());
int action = dis.read();
- log.info(this + ": action: " + action);
String listenerId = dis.readUTF();
- log.info(this + ": listenerId: " + listenerId);
switch (action)
{
case Bisocket.CREATE_CONTROL_SOCKET:
BisocketClientInvoker invoker;
- invoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
- log.info("calling replaceControlSocket()");
+ invoker = BisocketClientInvoker.getBisocketCallbackClientInvoker(listenerId);
if (invoker == null)
{
- log.error("unrecognized listenerId: " + listenerId);
- log.error("unable to create control socket");
- continue;
+ log.debug("SecondaryServerSocketThread: transferring socket: " + listenerId);
+ BisocketClientInvoker.transferSocket(listenerId, socket);
}
+ else
+ {
invoker.replaceControlSocket(socket);
- log.info("SecondaryServerSocketThread: created secondary socket: " + listenerId);
+ log.debug("SecondaryServerSocketThread: created secondary socket: " + listenerId);
+ }
break;
case Bisocket.CREATE_ORDINARY_SOCKET:
BisocketClientInvoker.transferSocket(listenerId, socket);
- log.info("SecondaryServerSocketThread: transferred socket: " + listenerId);
+ log.debug("SecondaryServerSocketThread: transferred socket: " + listenerId);
break;
default:
@@ -557,30 +538,43 @@
{
running = false;
cancel();
- log.info("shutting down " + this);
}
public void run()
{
+ if (log.isTraceEnabled())
+ log.trace("checking connections");
+
Collection controlConnectionThreads = null;
synchronized (controlConnectionThreadMap)
{
controlConnectionThreads = new HashSet(controlConnectionThreadMap.values());
}
+ if (controlConnectionThreads.isEmpty())
+ cancel();
+
Iterator it = controlConnectionThreads.iterator();
while (it.hasNext() & running)
{
- ControlConnectionThread t = (ControlConnectionThread) it.next();
+ final ControlConnectionThread t = (ControlConnectionThread) it.next();
if (!t.checkConnection())
{
- log.info(this + ": detected failure on control connection: requesting new control connection");
+ log.warn(this + ": detected failure on control connection " + t + ": requesting new control connection");
t.shutdown();
- it.remove();
+
+ synchronized (controlConnectionThreadMap)
+ {
+ controlConnectionThreadMap.remove(t.getListenerId());
+ }
if (!running)
return;
+ new Thread()
+ {
+ public void run()
+ {
try
{
createControlConnection(t.getListenerId(), null);
@@ -588,34 +582,13 @@
catch (IOException e)
{
InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(t.getListenerId());
- log.error(this + ": " + "Unable to recreate control connection: " + locator, e);
- e.printStackTrace();
+ log.error("Unable to recreate control connection: " + locator, e);
+ }
}
+ }.start();
+
}
}
}
}
-
-
-// class InternalInvocationHandler implements ServerInvocationHandler
-// {
-// InvokerLocator locator;
-//
-// InternalInvocationHandler(ServerSocket ss)
-// {
-// String host = ss.getInetAddress().getHostAddress();
-// int port = ss.getLocalPort();
-// locator = new InvokerLocator(null, host, port, null, null);
-// }
-//
-// public Object invoke(InvocationRequest invocation) throws Throwable
-// {
-// return locator;
-// }
-//
-// public void setMBeanServer(MBeanServer server) {}
-// public void setInvoker(ServerInvoker invoker) {}
-// public void addListener(InvokerCallbackHandler callbackHandler) {}
-// public void removeListener(InvokerCallbackHandler callbackHandler) {}
-// }
}
\ No newline at end of file
1.3 +71 -44 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BisocketClientInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -b -r1.2 -r1.3
--- BisocketClientInvoker.java 27 Dec 2006 05:58:10 -0000 1.2
+++ BisocketClientInvoker.java 16 Jan 2007 08:16:43 -0000 1.3
@@ -31,6 +31,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
import java.util.TimerTask;
import org.jboss.logging.Logger;
@@ -43,11 +44,20 @@
import org.jboss.remoting.marshal.UnMarshaller;
import org.jboss.remoting.transport.BidirectionalClientInvoker;
import org.jboss.remoting.transport.socket.SocketClientInvoker;
-import org.jboss.remoting.util.TimerUtil;
/**
- * BisocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker,
- * which must be a BisocketServerInvoker.
+ * The bisocket transport, an extension of the socket transport, is designed to allow
+ * a callback server to function behind a firewall. All connections are created by
+ * a Socket constructor or factory on the client side connecting to a ServerSocket on
+ * the server side. When a callback client invoker on the server side needs to
+ * open a connection to the callback server, it requests a connection by sending a
+ * request message over a control connection to the client side.
+ *
+ * Because all connections are created in one direction, the bisocket transport is
+ * asymmetric, in the sense that client invokers and server invokers behave differently
+ * on the client side and on the server side.
+ *
+ *
*
* @author <a href="mailto:ron.sigal at jboss.com">Ron Sigal</a>
*/
@@ -57,15 +67,19 @@
{
private static final Logger log = Logger.getLogger(BisocketClientInvoker.class);
private static Map listenerIdToClientInvokerMap = Collections.synchronizedMap(new HashMap());
+ private static Map listenerIdToCallbackClientInvokerMap = Collections.synchronizedMap(new HashMap());
private static Map listenerIdToSocketsMap = new HashMap();
+ private static Timer timer;
- private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;;
protected String listenerId;
+
+ private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;;
private InvokerLocator secondaryLocator;
private Socket controlSocket;
private OutputStream controlOutputStream;
private Object controlLock = new Object();
private PingTimerTask pingTimerTask;
+ private boolean isCallbackInvoker;
static BisocketClientInvoker getBisocketClientInvoker(String listenerId)
@@ -74,6 +88,12 @@
}
+ static BisocketClientInvoker getBisocketCallbackClientInvoker(String listenerId)
+ {
+ return (BisocketClientInvoker) listenerIdToCallbackClientInvokerMap.get(listenerId);
+ }
+
+
static void transferSocket(String listenerId, Socket socket)
{
Set sockets = null;
@@ -111,7 +131,8 @@
listenerId = (String) config.get(Client.LISTENER_ID_KEY);
if (listenerId != null)
{
- listenerIdToClientInvokerMap.put(listenerId, this);
+ isCallbackInvoker = true;
+ listenerIdToCallbackClientInvokerMap.put(listenerId, this);
synchronized (listenerIdToSocketsMap)
{
@@ -119,7 +140,7 @@
listenerIdToSocketsMap.put(listenerId, new HashSet());
}
- log.info("registered " + listenerId + " -> " + this);
+ log.debug("registered " + listenerId + " -> " + this);
}
// look for socketTimeout param
@@ -151,7 +172,6 @@
public void setPingFrequency(int pingFrequency)
{
this.pingFrequency = pingFrequency;
- log.info("set ping frequency: " + pingFrequency);
}
@@ -162,22 +182,13 @@
// Callback client on server side.
if (listenerId != null)
{
-// pingTimerTask = new PingTimerTask();
-// TimerUtil.schedule(pingTimerTask, pingFrequency);
-// log.info("scheduled PingTimerTask");
return;
}
// Client on client side.
try
{
- InternalInvocation ii = new InternalInvocation(Bisocket.GET_SECONDARY_INVOKER_LOCATOR, null);
- InvocationRequest r = new InvocationRequest(null, null, ii, null, null, null);
-// secondaryLocator = (InvokerLocator) invoke(r);
- Object o = invoke(r);
- log.info("secondary locator: " + o);
- secondaryLocator = (InvokerLocator) o;
- log.info("got secondary locator: " + secondaryLocator);
+ secondaryLocator = getSecondaryLocator();
}
catch (Throwable e)
{
@@ -192,7 +203,11 @@
super.handleDisconnect();
if (listenerId != null)
{
+ if (isCallbackInvoker)
+ listenerIdToCallbackClientInvokerMap.remove(listenerId);
+ else
listenerIdToClientInvokerMap.remove(listenerId);
+
listenerIdToSocketsMap.remove(listenerId);
if (pingTimerTask != null)
pingTimerTask.shutDown();
@@ -215,6 +230,7 @@
{
Map requestPayload = ir.getRequestPayload();
String listenerId = (String) requestPayload.get(Client.LISTENER_ID_KEY);
+ listenerIdToClientInvokerMap.put(listenerId, this);
BisocketServerInvoker callbackServerInvoker;
callbackServerInvoker = BisocketServerInvoker.getBisocketServerInvoker(listenerId);
callbackServerInvoker.createControlConnection(listenerId, secondaryLocator);
@@ -270,18 +286,18 @@
controlSocket = (Socket) it.next();
it.remove();
controlOutputStream = controlSocket.getOutputStream();
- log.info("got control socket");
+ log.debug("got control socket");
pingTimerTask = new PingTimerTask();
- TimerUtil.schedule(pingTimerTask, pingFrequency);
- log.info("scheduled PingTimerTask: " + pingTimerTask);
+ if (timer == null)
+ {
+ timer = new Timer();
+ }
+ timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
}
}
-
- log.info("requesting socket");
synchronized (controlLock)
{
- log.info("writing CREATE_ORDINARY_SOCKET");
controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
}
@@ -309,7 +325,7 @@
Iterator it = sockets.iterator();
Socket socket = (Socket) it.next();
it.remove();
- log.info("socket found");
+ log.debug("socket found");
return socket;
}
}
@@ -317,23 +333,35 @@
void replaceControlSocket(Socket socket) throws IOException
{
- log.info("entering replaceControlSocket");
synchronized (controlLock)
{
controlSocket = socket;
controlOutputStream = controlSocket.getOutputStream();
- log.info("replaced control socket");
+ log.debug("replaced control socket");
}
if (pingTimerTask != null)
pingTimerTask.cancel();
pingTimerTask = new PingTimerTask();
- TimerUtil.schedule(pingTimerTask, pingFrequency);
- log.info("replaced PingTimerTask: " + pingTimerTask);
+ if (timer == null)
+ {
+ timer = new Timer();
+ }
+ timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
}
+ InvokerLocator getSecondaryLocator() throws Throwable
+ {
+ InternalInvocation ii = new InternalInvocation(Bisocket.GET_SECONDARY_INVOKER_LOCATOR, null);
+ InvocationRequest r = new InvocationRequest(null, null, ii, null, null, null);
+ Object o = invoke(r);
+ log.debug("secondary locator: " + o);
+ secondaryLocator = (InvokerLocator) o;
+ return secondaryLocator;
+ }
+
protected Object checkType(Object o, Class c) throws IOException
{
if (c.isInstance(o))
@@ -386,7 +414,6 @@
{
try
{
- log.info(this + ": sending ping");
controlOutputStream.write(Bisocket.PING);
}
catch (IOException e)
More information about the jboss-cvs-commits
mailing list