[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...
Ron Sigal
ron_sigal at yahoo.com
Sun Mar 11 16:11:16 EDT 2007
User: rsigal
Date: 07/03/11 16:11:16
Modified: src/main/org/jboss/remoting/transport/bisocket
BisocketServerInvoker.java
Log:
JBREM-721, JBREM-722, JBREM-723: (1) Made ControlMonitorTimerTask static; (2) added destroyControlConnection(); (3) handleInternalInvocation() looks for REMOVECLIENTLISTENER messages; (4) ControlConnectionThread gives a new control connection 5 ping cycles before declaring it nonfunctional.
Revision Changes Path
1.8 +99 -28 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BisocketServerInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -b -r1.7 -r1.8
--- BisocketServerInvoker.java 23 Feb 2007 07:04:00 -0000 1.7
+++ BisocketServerInvoker.java 11 Mar 2007 20:11:16 -0000 1.8
@@ -57,7 +57,7 @@
/**
*
* @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
* <p>
* Copyright Nov 23, 2006
* </p>
@@ -149,15 +149,16 @@
{
boolean firstConnection;
- // restarting connection
if (locator == null)
{
+ // restarting connection
firstConnection = false;
BisocketClientInvoker clientInvoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
try
{
locator = clientInvoker.getSecondaryLocator();
+ listenerIdToInvokerLocatorMap.put(listenerId, locator);
}
catch (Throwable t)
{
@@ -165,9 +166,9 @@
throw new IOException("unable to get secondary locator: " + t.getMessage());
}
}
- // first connection
else
{
+ // first connection
firstConnection = true;
listenerIdToInvokerLocatorMap.put(listenerId, locator);
}
@@ -208,12 +209,22 @@
{
timer = new Timer(true);
}
- controlMonitorTimerTask = new ControlMonitorTimerTask();
+ controlMonitorTimerTask = new ControlMonitorTimerTask(this);
timer.schedule(controlMonitorTimerTask, pingFrequency, pingFrequency);
}
}
+ public void destroyControlConnection(String listenerId) throws IOException
+ {
+ listenerIdToInvokerLocatorMap.remove(listenerId);
+ Thread t = (Thread) controlConnectionThreadMap.remove(listenerId);
+ ((ControlConnectionThread)t).shutdown();
+ controlMonitorTimerTask.shutdown();
+ controlMonitorTimerTask = null;
+ }
+
+
public int getPingFrequency()
{
return pingFrequency;
@@ -432,6 +443,18 @@
}
}
}
+ else if(InternalInvocation.REMOVECLIENTLISTENER.equals(ii.getMethodName()))
+ {
+ Map metadata = ir.getRequestPayload();
+ if(metadata != null)
+ {
+ String listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
+ if (listenerId != null)
+ {
+ listenerIdToServerInvokerMap.remove(listenerId);
+ }
+ }
+ }
return response;
}
@@ -445,6 +468,7 @@
private boolean running;
private int errorCount;
private long lastPing = -1;
+ private int initialAttempts;
ControlConnectionThread(Socket socket, String listenerId) throws IOException
{
@@ -473,10 +497,15 @@
boolean checkConnection()
{
- if (lastPing < 0)
+ if (lastPing < 0 && initialAttempts++ < MAX_INITIAL_ATTEMPTS)
{
return true;
}
+ else if (lastPing < 0)
+ {
+ return false;
+ }
+
long currentTime = System.currentTimeMillis();
if (log.isTraceEnabled())
@@ -507,10 +536,34 @@
{
case Bisocket.CREATE_ORDINARY_SOCKET:
InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
+
+ for (int i = 0; i < 10; i++)
+ {
+ try
+ {
if (socketFactory != null)
socket = socketFactory.createSocket(locator.getHost(), locator.getPort());
else
socket = new Socket(locator.getHost(), locator.getPort());
+ }
+ catch (Exception e)
+ {
+ log.debug("Error creating a socket", e);
+ }
+
+ if (socket != null)
+ break;
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ log.debug("unexpected interruption");
+ }
+ }
+
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
dos.write(Bisocket.CREATE_ORDINARY_SOCKET);
dos.writeUTF(listenerId);
@@ -539,8 +592,7 @@
shutdown();
return;
}
- log.error("Unable to read from control connection: " + e.getMessage());
- e.printStackTrace();
+ log.error("Unable to process control connection: " + e.getMessage(), e);
if (++errorCount > 5)
{
shutdown();
@@ -656,18 +708,34 @@
}
- class ControlMonitorTimerTask extends TimerTask
+ static class ControlMonitorTimerTask extends TimerTask
{
private boolean running = true;
+ private BisocketServerInvoker invoker;
+ private Map listenerIdToInvokerLocatorMap;
+ private Map controlConnectionThreadMap;
- void shutdown()
+ ControlMonitorTimerTask(BisocketServerInvoker invoker)
+ {
+ this.invoker = invoker;
+ listenerIdToInvokerLocatorMap = invoker.listenerIdToInvokerLocatorMap;
+ controlConnectionThreadMap = invoker.controlConnectionThreadMap;
+ }
+
+ synchronized void shutdown()
{
running = false;
+ invoker = null;
+ listenerIdToInvokerLocatorMap = null;
+ controlConnectionThreadMap = null;
cancel();
}
- public void run()
+ public synchronized void run()
{
+ if (!running)
+ return;
+
if (log.isTraceEnabled())
log.trace("checking connections");
@@ -678,10 +746,10 @@
}
if (controlConnectionThreads.isEmpty())
- cancel();
+ shutdown();
Iterator it = controlConnectionThreads.iterator();
- while (it.hasNext() & running)
+ while (it.hasNext())
{
final ControlConnectionThread t = (ControlConnectionThread) it.next();
if (!t.checkConnection())
@@ -694,16 +762,18 @@
controlConnectionThreadMap.remove(t.getListenerId());
}
- if (!running)
- return;
-
new Thread()
{
public void run()
{
+ synchronized (ControlMonitorTimerTask.this)
+ {
+ if (!running)
+ return;
+
try
{
- createControlConnection(t.getListenerId(), null);
+ invoker.createControlConnection(t.getListenerId(), null);
}
catch (IOException e)
{
@@ -711,6 +781,7 @@
log.error("Unable to recreate control connection: " + locator, e);
}
}
+ }
}.start();
}
More information about the jboss-cvs-commits
mailing list