[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...
Ron Sigal
ron_sigal at yahoo.com
Sat Dec 30 02:57:15 EST 2006
User: rsigal
Date: 06/12/30 02:57:15
Modified: src/main/org/jboss/remoting/transport/bisocket Tag:
remoting_2_x BisocketServerInvoker.java
Log:
JBREM-650: (1) Replaced TimerUtil with separate Timer; (2) get new secondary locator on restar control connection restart; (3) control connection monitor doesn't fail if callback client never registered; (4) control socket can be created before client invoker is created; (5) control connection is restarted in new thread.
Revision Changes Path
No revision
No revision
1.1.2.4 +86 -92 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BisocketServerInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java,v
retrieving revision 1.1.2.3
retrieving revision 1.1.2.4
diff -u -b -r1.1.2.3 -r1.1.2.4
--- BisocketServerInvoker.java 20 Dec 2006 08:21:01 -0000 1.1.2.3
+++ BisocketServerInvoker.java 30 Dec 2006 07:57:15 -0000 1.1.2.4
@@ -35,6 +35,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
+import java.util.Timer;
import java.util.TimerTask;
import org.jboss.remoting.Client;
@@ -46,13 +47,12 @@
import org.jboss.remoting.transport.socket.LRUPool;
import org.jboss.remoting.transport.socket.ServerThread;
import org.jboss.remoting.transport.socket.SocketServerInvoker;
-import org.jboss.remoting.util.TimerUtil;
/**
*
* @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
- * @version $Revision: 1.1.2.3 $
+ * @version $Revision: 1.1.2.4 $
* <p>
* Copyright Nov 23, 2006
* </p>
@@ -60,6 +60,7 @@
public class BisocketServerInvoker extends SocketServerInvoker
{
private static Map listenerIdToServerInvokerMap = Collections.synchronizedMap(new HashMap());
+ private static Timer timer;
private Map listenerIdToInvokerLocatorMap = new HashMap();
private ServerSocket secondaryServerSocket;
@@ -78,12 +79,6 @@
}
-// public static void addBisocketServerInvoker(String listenerId, BisocketServerInvoker invoker)
-// {
-// listenerIdToServerInvokerMap.put(listenerId, invoker);
-// }
-
-
public BisocketServerInvoker(InvokerLocator locator)
{
super(locator);
@@ -128,10 +123,8 @@
secondaryServerSocketThread.setName("secondaryServerSocketThread");
secondaryServerSocketThread.setDaemon(true);
secondaryServerSocketThread.start();
- log.info("started secondary port: " + host + ":" + freePort);
+ log.debug("started secondary port: " + host + ":" + freePort);
}
-// ServerInvocationHandler handler = new InternalInvocationHandler(secondaryServerSocket);
-// addInvocationHandler(Bisocket.BISOCKET_INTERNAL_SUBSYSTEM, handler);
}
@@ -146,20 +139,30 @@
{
boolean firstConnection;
- // first connection
+ // restarting connection
if (locator == null)
{
firstConnection = false;
- locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
+ BisocketClientInvoker clientInvoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
+
+ try
+ {
+ locator = clientInvoker.getSecondaryLocator();
}
- // restarted connection
+ catch (Throwable t)
+ {
+ log.error("unable to get secondary locator");
+ throw new IOException("unable to get secondary locator: " + t.getMessage());
+ }
+ }
+ // first connection
else
{
firstConnection = true;
listenerIdToInvokerLocatorMap.put(listenerId, locator);
}
- log.info("creating control connection: " + locator);
+ log.debug("creating control connection: " + locator);
Socket socket = null;
if (socketFactory != null)
@@ -187,14 +190,16 @@
}
thread.start();
- log.info("created control connection: " + locator);
+ log.debug("created control connection: " + socket.toString());
if (controlMonitorTimerTask == null)
{
+ if (timer == null)
+ {
+ timer = new Timer();
+ }
controlMonitorTimerTask = new ControlMonitorTimerTask();
- TimerUtil.schedule(controlMonitorTimerTask, pingFrequency);
- log.info(this + ": scheduled ControlMonitorTimerTask: " + controlMonitorTimerTask);
- log.info("pingFrequency: " + pingFrequency);
+ timer.schedule(controlMonitorTimerTask, pingFrequency, pingFrequency);
}
}
@@ -209,7 +214,6 @@
{
this.pingFrequency = pingFrequency;
pingWindow = 2 * pingFrequency;
- log.info("set ping frequency: " + pingFrequency);
}
@@ -235,13 +239,10 @@
protected void cleanup()
{
super.cleanup();
- log.info(this + ": $$$entering cleanup()");
- log.info("controlMonitorTimerTask: " + this.controlMonitorTimerTask);
if (controlMonitorTimerTask != null)
- {
controlMonitorTimerTask.shutdown();
- }
+
Iterator it = controlConnectionThreadMap.values().iterator();
while (it.hasNext())
{
@@ -249,19 +250,15 @@
t.shutdown();
it.remove();
}
- log.info("secondaryServerSocketThread: " + secondaryServerSocketThread);
+
if (secondaryServerSocketThread != null)
- {
secondaryServerSocketThread.shutdown();
- }
+
if (secondaryServerSocket != null)
{
try
{
secondaryServerSocket.close();
- log.info("closed secondary port: " +
- secondaryServerSocket.getInetAddress() + ":" +
- secondaryServerSocket.getLocalPort());
}
catch (IOException e)
{
@@ -289,7 +286,6 @@
{
if(Bisocket.GET_SECONDARY_INVOKER_LOCATOR.equals(ii.getMethodName()))
{
- log.info("returning secondaryLocator: " + secondaryLocator);
return secondaryLocator;
}
@@ -304,7 +300,6 @@
if (listenerId != null)
{
listenerIdToServerInvokerMap.put(listenerId, this);
- log.info("registered " + listenerId + " -> " + this);
}
}
}
@@ -320,7 +315,7 @@
private DataInputStream dis;
private boolean running;
private int errorCount;
- private long lastPing = System.currentTimeMillis();
+ private long lastPing = -1;
ControlConnectionThread(Socket socket, String listenerId) throws IOException
{
@@ -349,9 +344,16 @@
boolean checkConnection()
{
+ if (lastPing < 0)
+ {
+ return true;
+ }
long currentTime = System.currentTimeMillis();
- log.info("elapsed: " + (currentTime - lastPing));
- log.info("returning: " + ((currentTime - lastPing > pingWindow) ? false : true));
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("elapsed: " + (currentTime - lastPing));
+ }
return (currentTime - lastPing <= pingWindow);
}
@@ -362,7 +364,6 @@
public void run()
{
- log.info("starting ControlConnectionThread");
running = true;
while (running)
{
@@ -376,8 +377,6 @@
switch (action)
{
case Bisocket.CREATE_ORDINARY_SOCKET:
-// String listenerId = dis.readUTF();
-// dis.readUTF();
InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
if (socketFactory != null)
socket = socketFactory.createSocket(locator.getHost(), locator.getPort());
@@ -389,8 +388,6 @@
break;
case Bisocket.PING:
- log.info("got ping");
-// log.info("lastPing: " + lastPing);
continue;
case -1:
@@ -407,12 +404,13 @@
if (running)
{
if ("socket closed".equals(e.getMessage()) ||
- "Socket is closed".equals(e.getMessage()))
+ "Socket is closed".equals(e.getMessage()) ||
+ "Connection reset".equals(e.getMessage()))
{
shutdown();
return;
}
- log.error("Unable to create new Socket: " + e.getMessage());
+ log.error("Unable to read from control connection: " + e.getMessage());
e.printStackTrace();
if (++errorCount > 5)
{
@@ -436,7 +434,9 @@
clientpool, threadpool,
getTimeout(), serverSocketClass);
thread.start();
- log.info("created: " + thread);
+
+ if (log.isDebugEnabled())
+ log.debug("created: " + thread);
}
catch (Exception e)
{
@@ -455,12 +455,6 @@
}
-// protected ServerThread createServerThread(Socket socket) throws Exception
-// {
-// return new BisocketServerThread(socket, this, clientpool, threadpool, getTimeout(), serverSocketClass);
-// }
-
-
class SecondaryServerSocketThread extends Thread
{
private ServerSocket secondaryServerSocket;
@@ -475,7 +469,6 @@
{
running = false;
interrupt();
- log.info(this + ": shut down");
}
public void run()
@@ -485,6 +478,7 @@
try
{
Socket socket = secondaryServerSocket.accept();
+ log.debug("accepted: " + socket);
DataInputStream dis = new DataInputStream(socket.getInputStream());
int action = dis.read();
String listenerId = dis.readUTF();
@@ -493,14 +487,22 @@
{
case Bisocket.CREATE_CONTROL_SOCKET:
BisocketClientInvoker invoker;
- invoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
+ invoker = BisocketClientInvoker.getBisocketCallbackClientInvoker(listenerId);
+ if (invoker == null)
+ {
+ log.debug("SecondaryServerSocketThread: transferring socket: " + listenerId);
+ BisocketClientInvoker.transferSocket(listenerId, socket);
+ }
+ else
+ {
invoker.replaceControlSocket(socket);
- log.info("SecondaryServerSocketThread: created secondary socket: " + listenerId);
+ log.debug("SecondaryServerSocketThread: created secondary socket: " + listenerId);
+ }
break;
case Bisocket.CREATE_ORDINARY_SOCKET:
BisocketClientInvoker.transferSocket(listenerId, socket);
- log.info("SecondaryServerSocketThread: transferred socket: " + listenerId);
+ log.debug("SecondaryServerSocketThread: transferred socket: " + listenerId);
break;
default:
@@ -533,30 +535,43 @@
{
running = false;
cancel();
- log.info("shutting down " + this);
}
public void run()
{
+ if (log.isTraceEnabled())
+ log.trace("checking connections");
+
Collection controlConnectionThreads = null;
synchronized (controlConnectionThreadMap)
{
controlConnectionThreads = new HashSet(controlConnectionThreadMap.values());
}
+ if (controlConnectionThreads.isEmpty())
+ cancel();
+
Iterator it = controlConnectionThreads.iterator();
while (it.hasNext() & running)
{
- ControlConnectionThread t = (ControlConnectionThread) it.next();
+ final ControlConnectionThread t = (ControlConnectionThread) it.next();
if (!t.checkConnection())
{
- log.info("detected failure on control connection: requesting new control connection");
+ log.warn(this + ": detected failure on control connection " + t + ": requesting new control connection");
t.shutdown();
- it.remove();
+
+ synchronized (controlConnectionThreadMap)
+ {
+ controlConnectionThreadMap.remove(t.getListenerId());
+ }
if (!running)
return;
+ new Thread()
+ {
+ public void run()
+ {
try
{
createControlConnection(t.getListenerId(), null);
@@ -564,34 +579,13 @@
catch (IOException e)
{
InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(t.getListenerId());
- log.error(this + ": " + "Unable to recreate control connection: " + locator, e);
- e.printStackTrace();
+ log.error("Unable to recreate control connection: " + locator, e);
+ }
}
+ }.start();
+
}
}
}
}
-
-
-// class InternalInvocationHandler implements ServerInvocationHandler
-// {
-// InvokerLocator locator;
-//
-// InternalInvocationHandler(ServerSocket ss)
-// {
-// String host = ss.getInetAddress().getHostAddress();
-// int port = ss.getLocalPort();
-// locator = new InvokerLocator(null, host, port, null, null);
-// }
-//
-// public Object invoke(InvocationRequest invocation) throws Throwable
-// {
-// return locator;
-// }
-//
-// public void setMBeanServer(MBeanServer server) {}
-// public void setInvoker(ServerInvoker invoker) {}
-// public void addListener(InvokerCallbackHandler callbackHandler) {}
-// public void removeListener(InvokerCallbackHandler callbackHandler) {}
-// }
}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list