Author: ron.sigal(a)jboss.com
Date: 2009-04-30 17:32:26 -0400 (Thu, 30 Apr 2009)
New Revision: 5094
Modified:
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Client.java
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionNotifier.java
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionValidator.java
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Lease.java
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/LeasePinger.java
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Remoting.java
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ServerInvoker.java
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Version.java
Log:
JBREM-1112 (and others to be named): LeasePingerID, single ConnectionValidator per client
invoker.
Modified:
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Client.java
===================================================================
---
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Client.java 2009-04-30
05:27:56 UTC (rev 5093)
+++
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Client.java 2009-04-30
21:32:26 UTC (rev 5094)
@@ -49,6 +49,7 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.StreamCorruptedException;
+import java.lang.ref.WeakReference;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.rmi.MarshalException;
@@ -152,6 +153,9 @@
public static final int DEFAULT_DISCONNECT_TIMEOUT = -1;
public static final String THROW_CALLBACK_EXCEPTION =
"throwCallbackException";
+
+ private static Map connectionValidators = new HashMap();
+ private static Object connectionValidatorLock = new Object();
private static final Logger log = Logger.getLogger(Client.class);
@@ -175,6 +179,7 @@
private InvokerLocator locator;
private ConnectionValidator connectionValidator = null;
+ private ConnectionValidatorKey connectionValidatorKey;
private Map configuration = new HashMap();
private Map callbackConnectors = new HashMap();
@@ -187,6 +192,8 @@
private int disconnectTimeout = DEFAULT_DISCONNECT_TIMEOUT;
private boolean connected = false;
+
+ private Set connectionListeners = new HashSet();
// Constructors
---------------------------------------------------------------------------------
@@ -384,11 +391,54 @@
}
}
- if (connectionValidator == null)
+ synchronized (connectionValidatorLock)
{
- connectionValidator = new ConnectionValidator(this, metadata);
+ if (connectionValidator == null)
+ {
+ Map map = new HashMap(configuration);
+ map.putAll(metadata);
+ connectionValidatorKey = new ConnectionValidatorKey(invoker, map);
+ WeakReference ref = (WeakReference)
connectionValidators.get(connectionValidatorKey);
+ if (ref == null)
+ {
+ connectionValidator = new ConnectionValidator(this, metadata);
+ connectionValidators.put(connectionValidatorKey, new
WeakReference(connectionValidator));
+ connectionValidator.addConnectionListener(this, listener);
+ log.debug(this + ": created " + connectionValidator);
+ }
+ else
+ {
+ connectionValidator = (ConnectionValidator) ref.get();
+ if (connectionValidator.addConnectionListener(this, listener))
+ {
+ log.debug(this + ": reusing from static table: " +
connectionValidator);
+ }
+ else
+ {
+ connectionValidator = new ConnectionValidator(this, metadata);
+ connectionValidators.put(connectionValidatorKey, new
WeakReference(connectionValidator));
+ connectionValidator.addConnectionListener(this, listener);
+ log.debug(this + ": current ConnectionValidator is stopped:
created " + connectionValidator);
+ }
+ }
+ }
+ else
+ {
+ if (connectionValidator.addConnectionListener(this, listener))
+ {
+ log.debug(this + ": reusing from local reference: " +
connectionValidator);
+ }
+ else
+ {
+ connectionValidator = new ConnectionValidator(this, metadata);
+ connectionValidators.put(connectionValidatorKey, new
WeakReference(connectionValidator));
+ connectionValidator.addConnectionListener(this, listener);
+ log.debug(this + ": current ConnectionValidator is stopped: created
" + connectionValidator);
+ }
+ }
+
+ connectionListeners.add(listener);
}
- connectionValidator.addConnectionListener(listener);
}
/**
@@ -397,11 +447,29 @@
*/
public boolean removeConnectionListener(ConnectionListener listener)
{
- if (connectionValidator == null)
+ boolean isRemoved = false;
+ synchronized (connectionValidatorLock)
{
- return false;
+ if (connectionValidator == null)
+ {
+ return false;
+ }
+ isRemoved = connectionValidator.removeConnectionListener(this, listener);
+ if (connectionValidator.isStopped())
+ {
+ connectionValidators.remove(connectionValidatorKey);
+ log.debug(this + " removed from static map: " +
connectionValidator);
+ connectionValidator = null;
+ connectionValidatorKey = null;
+ }
+ connectionListeners.remove(listener);
+ if (connectionListeners.isEmpty())
+ {
+ connectionValidator = null;
+ connectionValidatorKey = null;
+ }
}
- return connectionValidator.removeConnectionListener(listener);
+ return isRemoved;
}
/**
@@ -483,10 +551,25 @@
// this is a noop if no lease is active
invoker.terminateLease(sessionId, disconnectTimeout);
- if (connectionValidator != null)
+ synchronized (connectionValidatorLock)
{
- connectionValidator.stop();
+ if (connectionValidator != null)
+ {
+ Iterator it = connectionListeners.iterator();
+ while (it.hasNext())
+ {
+ ConnectionListener listener = (ConnectionListener) it.next();
+ connectionValidator.removeConnectionListener(this, listener);
+ }
+ if (connectionValidator.isStopped())
+ {
+ connectionValidators.remove(connectionValidatorKey);
+ log.debug(this + " removed from static map: " +
connectionValidator);
+ }
+ }
+ // connectionValidator.stop();
connectionValidator = null;
+ connectionValidatorKey = null;
}
// Need to remove myself from registry so will not keep reference to me since I
am of no
@@ -1537,6 +1620,7 @@
e.initCause(throwable);
throw e;
}
+ log.debug(this + " connected to " + locator);
}
else
{
@@ -1726,4 +1810,32 @@
// Inner classes
--------------------------------------------------------------------------------
+
+ static class ConnectionValidatorKey
+ {
+ private ClientInvoker invoker;
+ private Map metadata;
+
+ ConnectionValidatorKey(ClientInvoker invoker, Map metadata)
+ {
+ this.invoker = invoker;
+ this.metadata = metadata;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (o == null)
+ return false;
+ if (! (o instanceof ConnectionValidatorKey))
+ return false;
+ ConnectionValidatorKey holder = (ConnectionValidatorKey) o;
+ boolean metadataEquals = (metadata == null && holder.metadata == null)
|| metadata.equals(holder.metadata);
+ return invoker == holder.invoker && metadataEquals;
+ }
+
+ public int hashCode()
+ {
+ return invoker.hashCode() * metadata.hashCode();
+ }
+ }
}
Modified:
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionNotifier.java
===================================================================
---
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionNotifier.java 2009-04-30
05:27:56 UTC (rev 5093)
+++
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionNotifier.java 2009-04-30
21:32:26 UTC (rev 5094)
@@ -43,7 +43,7 @@
{
try
{
- log.debug("Server connection lost to client (session id = " +
clientSessionId);
+ log.debug(this + " Server connection lost to client (session id = " +
clientSessionId);
Client client = new Client(new InvokerLocator(locatorurl), requestPayload);
client.setSessionId(clientSessionId);
@@ -52,7 +52,9 @@
Iterator it = listeners.iterator();
while (it.hasNext())
{
- ((ConnectionListener) it.next()).handleConnectionException(null, client);
+ ConnectionListener listener = (ConnectionListener) it.next();
+ listener.handleConnectionException(null, client);
+ log.debug("notified " + listener + " of connection lost to:
" + clientSessionId);
}
}
}
@@ -66,9 +68,9 @@
{
try
{
- if(log.isTraceEnabled())
+// if(log.isTraceEnabled())
{
- log.trace("Client disconnected (session id = " + clientSessionId);
+ log.debug(this + " Client disconnected (session id = " +
clientSessionId);
}
Client client = new Client(new InvokerLocator(locatorURL), requestPayload);
client.setSessionId(clientSessionId);
Modified:
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionValidator.java
===================================================================
---
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionValidator.java 2009-04-30
05:27:56 UTC (rev 5093)
+++
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionValidator.java 2009-04-30
21:32:26 UTC (rev 5094)
@@ -22,11 +22,11 @@
package org.jboss.remoting;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -230,7 +230,7 @@
private Map metadata;
private InvokerLocator locator;
private Map configMap;
- private List listeners;
+ private Map listeners;
private ClientInvoker clientInvoker;
private Object lock = new Object();
private Object notificationLock = new Object();
@@ -243,6 +243,8 @@
private int failureDisconnectTimeout = -1;
private boolean isValid;
private Timer timer;
+ private MicroRemoteClientInvoker sharedInvoker;
+ private LeasePinger leasePinger;
// Constructors
---------------------------------------------------------------------------------
@@ -254,8 +256,9 @@
public ConnectionValidator(Client client, long pingPeriod)
{
this.client = client;
+ this.locator = client.getInvoker().getLocator();
this.pingPeriod = pingPeriod;
- listeners = new ArrayList();
+ listeners = new HashMap();
stopped = false;
getParameters(client, new HashMap());
log.debug(this + " created");
@@ -264,8 +267,9 @@
public ConnectionValidator(Client client, Map metadata)
{
this.client = client;
+ this.locator = client.getInvoker().getLocator();
pingPeriod = DEFAULT_PING_PERIOD;
- listeners = new ArrayList();
+ listeners = new HashMap();
stopped = false;
this.metadata = new HashMap(metadata);
getParameters(client, metadata);
@@ -367,36 +371,79 @@
// Public
---------------------------------------------------------------------------------------
- public void addConnectionListener(ConnectionListener listener)
+ public boolean addConnectionListener(Client client, ConnectionListener listener)
{
+ boolean doStart = false;
if (listener != null)
{
synchronized (lock)
{
+ if (stopped)
+ {
+ log.debug(this + " is stopped. Cannot add ConnectionListener: "
+ listener + " for " + client);
+ return false;
+ }
if (listeners.size() == 0)
{
- start();
+ doStart = true;
}
- listeners.add(listener);
+ Set s = (Set) listeners.get(listener);
+ if (s == null)
+ {
+ s = new HashSet();
+ }
+ s.add(client);
+ listeners.put(listener, s);
+ log.debug(this + " added ConnectionListener: " + listener + "
for " + client);
}
+ if (doStart)
+ {
+ start();
+ }
}
+
+ return true;
}
- public boolean removeConnectionListener(ConnectionListener listener)
+ public boolean removeConnectionListener(Client client, ConnectionListener listener)
{
- boolean isRemoved = false;
- if (listener != null)
+ if (listener == null)
{
- synchronized (lock)
+ log.debug(this + " ConnectionListener is null");
+ return false;
+ }
+ synchronized (lock)
+ {
+ if (stopped)
{
- isRemoved = listeners.remove(listener);
- if (listeners.size() == 0)
- {
- stop();
- }
+ log.debug(this + " is stopped. It's too late to remove " +
listener);
+ return false;
}
+ Set s = (Set) listeners.get(listener);
+ if (s == null)
+ {
+ log.debug(this + ": " + listener + " is not
registered");
+ return false;
+ }
+ if (s.remove(client))
+ {
+ log.debug(this + " removed ConnectionListener: " + listener +
" for " + client);
+ }
+ else
+ {
+ log.debug(this + ": " + listener + " is not registered for
" + client);
+ return false;
+ }
+ if (s.size() == 0)
+ {
+ listeners.remove(listener);
+ }
+ if (listeners.size() == 0)
+ {
+ stop();
+ }
}
- return isRemoved;
+ return true;
}
public long getPingPeriod()
@@ -413,6 +460,11 @@
{
return "ConnectionValidator[" +
Integer.toHexString(System.identityHashCode(this)) + ":" + clientInvoker +
", pingPeriod=" + pingPeriod + " ms]";
}
+
+ public boolean isStopped()
+ {
+ return stopped;
+ }
// Package protected
----------------------------------------------------------------------------
@@ -434,6 +486,14 @@
{
throw new RuntimeException("creating a ConnectionValidator on a local
connection");
}
+ if (stopLeaseOnFailure)
+ {
+ sharedInvoker = (MicroRemoteClientInvoker) client.getInvoker();
+ if (sharedInvoker != null)
+ {
+ leasePinger = sharedInvoker.getLeasePinger();
+ }
+ }
}
private void getParametersFromMap(Map config)
@@ -505,8 +565,17 @@
" to a boolean: must be a String");
}
}
-
+ ClientInvoker invoker = client.getInvoker();
+ if (invoker == null)
+ {
+ log.debug(this + " client invoker == null");
+ }
+ else
+ {
+ log.debug(this + " InvokerLocator: " + invoker.getLocator());
+ }
o = config.get(FAILURE_DISCONNECT_TIMEOUT);
+ log.debug(this + " \"failureDisconnectTimeout\" set to " +
o);
if (o != null)
{
if (o instanceof String)
@@ -514,6 +583,7 @@
try
{
failureDisconnectTimeout = Integer.valueOf(((String) o)).intValue();
+ log.debug(this + " setting failureDisconnectTimeout to " +
failureDisconnectTimeout);
}
catch (Exception e)
{
@@ -537,7 +607,6 @@
log.debug(this + " timeout: " + pingTimeout);
log.debug(this + " ping retries: " +
configMap.get("NumberOfCallRetries"));
log.debug(this + " connection retries: " +
configMap.get("NumberOfRetries"));
- locator = client.getInvoker().getLocator();
try
{
@@ -555,7 +624,14 @@
clientInvoker.connect();
}
+ try
+ {
TimerUtil.schedule(this, pingPeriod);
+ }
+ catch (Exception e)
+ {
+ log.error(this + " unable to schedule on TimerUtil", e);
+ }
started = true;
timer = new Timer(true);
log.debug(this + " started");
@@ -623,6 +699,7 @@
private boolean doStop()
{
+ log.debug("entering doStop()");
synchronized(lock)
{
if (stopped)
@@ -659,22 +736,33 @@
{
return;
}
- ListIterator itr = listeners.listIterator();
+ stopped = true;
+ log.debug(this + " is stopped. No more listeners will be
accepted.");
+
+ Iterator itr = listeners.keySet().iterator();
while (itr.hasNext())
{
final ConnectionListener listener = (ConnectionListener) itr.next();
- new Thread()
+ Set clients = (Set) listeners.get(listener);
+ Iterator itr2 = clients.iterator();
+ while (itr2.hasNext())
{
- public void run()
+ final Client client = (Client) itr2.next();
+ new Thread()
{
- log.debug(this + " calling " + listener +
".handleConnectionException()");
- listener.handleConnectionException(t, client);
- }
- }.start();
+ public void run()
+ {
+ log.debug(this + " calling " + listener +
".handleConnectionException() for " + client);
+ listener.handleConnectionException(t, client);
+ }
+ }.start();
+ }
}
+
+ listeners.clear();
}
+
stop();
- listeners.clear();
}
// Inner classes
--------------------------------------------------------------------------------
@@ -715,19 +803,23 @@
if (stopLeaseOnFailure)
{
- log.debug(this + " detected connection failure: stopping
LeasePinger");
- MicroRemoteClientInvoker invoker = (MicroRemoteClientInvoker)
client.getInvoker();
-
- if (invoker != null)
+ log.debug(ConnectionValidator.this + " detected connection failure:
stopping LeasePinger");
+// MicroRemoteClientInvoker invoker = (MicroRemoteClientInvoker)
client.getInvoker();
+//
+// if (invoker != null)
+// {
+ if (leasePinger == null)
{
+ leasePinger = sharedInvoker.getLeasePinger();
+ }
+ log.debug(ConnectionValidator.this + " shutting down lease pinger:
" + leasePinger);
int disconnectTimeout = (failureDisconnectTimeout == -1) ?
client.getDisconnectTimeout() : failureDisconnectTimeout;
- invoker.terminateLease(null, disconnectTimeout);
- log.debug(ConnectionValidator.this + " shut down lease
pinger");
- }
- else
- {
- log.debug(ConnectionValidator.this + " unable to shut down lease
pinger: client must have shut down");
- }
+ sharedInvoker.terminateLease(null, disconnectTimeout);
+// }
+// else
+// {
+// log.debug(ConnectionValidator.this + " unable to shut down lease
pinger: " + leasePinger + ". Client must have shut down");
+// }
cancel();
}
Modified: remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Lease.java
===================================================================
---
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Lease.java 2009-04-30
05:27:56 UTC (rev 5093)
+++
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Lease.java 2009-04-30
21:32:26 UTC (rev 5094)
@@ -25,6 +25,7 @@
import org.jboss.remoting.util.TimerUtil;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TimerTask;
@@ -48,6 +49,9 @@
private long leaseWindow = -1;
private long pingStart = -1;
private Map clientLeases = null;
+ private Object lock = new Object();
+ private String leasePingerId;
+ private boolean stopped;
private boolean leaseUpdated = false;
@@ -64,7 +68,9 @@
if(requestPayload != null)
{
this.requestPayload = (Map)requestPayload.get(ClientHolder.CLIENT_HOLDER_KEY);
+ this.leasePingerId = (String) requestPayload.get(LeasePinger.LEASE_PINGER_ID);
}
+ log.debug("leasePingerId: " + leasePingerId);
this.leaseWindow = leasePeriod * 2;
this.clientLeases = clientLeases;
}
@@ -72,9 +78,9 @@
public void startLease()
{
- if(isTraceEnabled)
+ if(true)
{
- log.trace("Starting lease for client invoker (session id = " +
clientSessionId + ") with lease window time of " + leaseWindow);
+ log.debug("Starting lease for client invoker (session id = " +
clientSessionId + ") with lease window time of " + leaseWindow);
}
leaseTimerTask = new LeaseTimerTask();
TimerUtil.schedule(leaseTimerTask, leaseWindow);
@@ -84,8 +90,28 @@
{
if(requestMap != null)
{
- this.requestPayload = (Map)requestMap.get(ClientHolder.CLIENT_HOLDER_KEY);
+ synchronized (lock)
+ {
+ this.requestPayload = (Map)requestMap.get(ClientHolder.CLIENT_HOLDER_KEY);
+
+ log.debug(this + " updating: new Client list:");
+ Collection clientHoldersCol = requestPayload.values();
+ Iterator itr = clientHoldersCol.iterator();
+ while (itr.hasNext())
+ {
+ Object val = itr.next();
+ if (val != null && val instanceof ClientHolder)
+ {
+ ClientHolder clientHolder = (ClientHolder) val;
+ log.debug(this + " " + clientHolder.getSessionId());
+ }
+ }
+ }
}
+ else
+ {
+ log.debug(this + " requestPayload == null");
+ }
updateLease(leasePeriod);
}
@@ -124,13 +150,14 @@
public void terminateLease(String sessionId)
{
- if(isTraceEnabled)
- {
- log.trace("Terminating lease for session id " + sessionId);
- }
// is this terminate for all clients
if (clientSessionId.equals(sessionId))
{
+ if(true)
+ {
+ log.debug(this + " Terminating lease group for session id " +
sessionId);
+ }
+
stopLease();
// should be ok to call this will null as all the client should have
// already been disconnected and there been a notification for each
@@ -140,33 +167,101 @@
}
else
{
+ if(true)
+ {
+ log.debug(this + " Terminating individual lease for session id " +
sessionId);
+ }
notifyClientTermination(sessionId);
}
}
-
+
+ public void terminateLeaseUponFailure(String sessionId)
+ {
+ // is this terminate for all clients
+ if (clientSessionId.equals(sessionId))
+ {
+ if(true)
+ {
+ log.debug(this + " Terminating lease group for session id " +
sessionId);
+ }
+
+ stopLease();
+ // should be ok to call this will null as all the client should have
+ // already been disconnected and there been a notification for each
+ // of these client disconnections (which would remove the client from
+ // the lease, thus leaving the collection empty
+ notifyClientLost();
+ }
+ else
+ {
+ if(true)
+ {
+ log.warn(this + " Expected invoker session id: " + sessionId);
+ }
+ notifyClientLost();
+ }
+ }
+
+ public String toString()
+ {
+ String hash = Integer.toHexString(System.identityHashCode(this));
+ return "Lease[" + hash + ":" + clientSessionId + ":"
+ leasePingerId + "]";
+ }
+
private void notifyClientTermination(String sessionId)
{
- // is for a particular client, so need to inspect request payload for client
- if (requestPayload != null)
+ Map localRequestPayload = null;
+ synchronized (lock)
{
+ if (requestPayload != null)
+ {
+ localRequestPayload = new HashMap(requestPayload);
+ if (sessionId != null)
+ {
+ requestPayload.remove(sessionId);
+ }
+ }
+ }
+
+ if (localRequestPayload != null)
+ {
// should notify for one client or all?
if (sessionId != null)
{
- Object clientHolderObj = requestPayload.remove(sessionId);
+ synchronized (lock)
+ {
+ if (stopped)
+ {
+ log.debug(this + " already stopped");
+ return;
+ }
+ }
+
+ Object clientHolderObj = localRequestPayload.get(sessionId);
if (clientHolderObj != null && clientHolderObj instanceof
ClientHolder)
{
ClientHolder clientHolder = (ClientHolder) clientHolderObj;
notifier.connectionTerminated(locatorURL, clientHolder.getSessionId(),
clientHolder.getConfig());
- if(isTraceEnabled)
+ if(true)
{
- log.trace("Notified connection listener of lease termination due
to disconnect from client (client session id = " + clientHolder.getSessionId());
+ log.debug(this + " Notified connection listener of lease
termination due to disconnect from client (client session id = " +
clientHolder.getSessionId());
}
}
}
else
{
+ synchronized (lock)
+ {
+ if (stopped)
+ {
+ log.debug(this + " already stopped");
+ return;
+ }
+ stopped = true;
+ }
+
// loop through and notify for all clients
- Collection clientHoldersCol = requestPayload.values();
+ Collection clientHoldersCol = localRequestPayload.values();
if (clientHoldersCol != null && clientHoldersCol.size() > 0)
{
Iterator itr = clientHoldersCol.iterator();
@@ -177,9 +272,9 @@
{
ClientHolder clientHolder = (ClientHolder) val;
notifier.connectionTerminated(locatorURL,
clientHolder.getSessionId(), clientHolder.getConfig());
- if(isTraceEnabled)
+ if(true)
{
- log.trace("Notified connection listener of lease termination
due to disconnect from client (client session id = " + clientHolder.getSessionId());
+ log.debug(this + " Notified connection listener of lease
termination due to disconnect from client (client session id = " +
clientHolder.getSessionId());
}
}
}
@@ -188,17 +283,32 @@
}
else
{
- log.warn("Tried to terminate lease for session id " + sessionId +
", but no collection of clients have been set.");
+ log.warn(this + " Tried to terminate lease for session id " +
sessionId + ", but no collection of clients have been set.");
}
}
private void notifyClientLost()
{
- // is not for a particular client (but all clients associated with client invoker),
so need to inspect request payload for client
- if (requestPayload != null)
+ Map localRequestPayload = null;
+ synchronized (lock)
{
+ if (stopped)
+ {
+ log.debug(this + " already stopped");
+ return;
+ }
+ stopped = true;
+ if (requestPayload != null)
+ {
+ localRequestPayload = new HashMap(requestPayload);
+ }
+ }
+
+ if (localRequestPayload != null)
+ {
// loop through and notify for all clients
- Collection clientHoldersCol = requestPayload.values();
+ Collection clientHoldersCol = localRequestPayload.values();
+ log.debug(this + " notifying listeners about " +
clientHoldersCol.size() + " expired client(s)");
if (clientHoldersCol != null && clientHoldersCol.size() > 0)
{
Iterator itr = clientHoldersCol.iterator();
@@ -209,9 +319,9 @@
{
ClientHolder clientHolder = (ClientHolder) val;
notifier.connectionLost(locatorURL, clientHolder.getSessionId(),
clientHolder.getConfig());
- if(isTraceEnabled)
+ if(true)
{
- log.trace("Notified connection listener of lease expired due to
lost connection from client (client session id = " + clientHolder.getSessionId());
+ log.debug(this + " Notified connection listener of lease
expired due to lost connection from client (client session id = " +
clientHolder.getSessionId());
}
}
}
@@ -219,11 +329,16 @@
}
else
{
- log.debug("requestPayload == null, calling
ConnectionNotifier.connectionTerminated()");
- notifier.connectionTerminated(locatorURL, clientSessionId, null);
+ log.debug(this + " requestPayload == null, calling
ConnectionNotifier.connectionLost()");
+ notifier.connectionLost(locatorURL, clientSessionId, null);
}
}
+ protected String getLeasePingerId()
+ {
+ return leasePingerId;
+ }
+
private void stopLease()
{
leaseTimerTask.cancel();
@@ -245,14 +360,14 @@
{
try
{
- if (log.isTraceEnabled()) log.trace("did not receive ping: " +
clientSessionId);
+ if (true) log.debug(Lease.this + " did not receive ping: " +
clientSessionId);
stopLease();
notifyClientLost();
if (clientLeases != null)
{
clientLeases.remove(clientSessionId);
}
- if (log.isTraceEnabled()) log.trace("removed lease:" +
clientSessionId);
+ if (true) log.debug(Lease.this + " removed lease:" +
clientSessionId);
}
catch (Throwable thr)
{
Modified:
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/LeasePinger.java
===================================================================
---
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/LeasePinger.java 2009-04-30
05:27:56 UTC (rev 5093)
+++
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/LeasePinger.java 2009-04-30
21:32:26 UTC (rev 5094)
@@ -2,6 +2,7 @@
import org.jboss.logging.Logger;
import org.jboss.remoting.transport.ClientInvoker;
+import org.jboss.util.id.GUID;
import java.util.HashMap;
import java.util.Iterator;
@@ -25,6 +26,7 @@
public static final long DEFAULT_LEASE_PERIOD = 5000;
public static final int DEFAULT_DISCONNECT_TIMEOUT = -1;
+ public static final String LEASE_PINGER_ID = "leasePingerId";
// Static
---------------------------------------------------------------------------------------
@@ -44,6 +46,8 @@
private long pingPeriod = -1;
private int disconnectTimeout = DEFAULT_DISCONNECT_TIMEOUT;
+
+ private String leasePingerId;
// Constructors
---------------------------------------------------------------------------------
@@ -100,6 +104,24 @@
e.initCause(throwable);
throw e;
}
+
+// if (trace)
+ {
+ log.debug(this + " shut down");
+ if (!clients.isEmpty())
+ {
+ log.debug(this + " " + clients.size() + " remaining
clients:");
+ Iterator it = clients.keySet().iterator();
+ while (it.hasNext())
+ {
+ log.debug(this + ": " + it.next());
+ }
+ }
+ else
+ {
+ log.debug(this + " No remaining clients");
+ }
+ }
}
}
@@ -138,7 +160,9 @@
if(trace) { log.trace(this + " removing client with session ID " +
sessionID); }
- ClientHolder holder = (ClientHolder)clients.remove(sessionID);
+ // Don't remove holder until after client has been removed from server side
Lease, to
+ // avoid a race with LeaseTimerTask sending a PING without the Client being
removed.
+ ClientHolder holder = (ClientHolder)clients.get(sessionID);
if (holder != null)
{
@@ -166,11 +190,13 @@
log.warn(this + " failed sending disconnect for client lease for "
+
"client with session ID " + sessionID);
}
+
+ clients.remove(sessionID);
}
else
{
- log.warn(this + " tried to remove lease for client with session ID " +
sessionID +
- ", but no such lease was found");
+ log.debug(this + " tried to remove lease for client with session ID "
+ sessionID +
+ ", but no such lease was found: probably it was registered with
an older LeasePinger");
}
if (clients.isEmpty())
@@ -233,7 +259,7 @@
public String toString()
{
- return "LeasePinger[" + invoker + "(" + invokerSessionID +
")]";
+ return "LeasePinger[" + leasePingerId + ":" + invoker +
"(" + invokerSessionID + ")]";
}
// Package protected
----------------------------------------------------------------------------
@@ -252,6 +278,16 @@
log.debug(this + " setting disconnect timeout to: " +
disconnectTimeout);
}
+ protected String getLeasePingerId()
+ {
+ return leasePingerId;
+ }
+
+ protected void setLeasePingerId(String leasePingerId)
+ {
+ this.leasePingerId = leasePingerId;
+ }
+
// Private
--------------------------------------------------------------------------------------
private void sendClientPing()
@@ -277,10 +313,9 @@
Map clientsClone = new ConcurrentHashMap(clients);
Map requestClients = new ConcurrentHashMap();
requestClients.put(ClientHolder.CLIENT_HOLDER_KEY, clientsClone);
-
- InvocationRequest ir =
- new InvocationRequest(invokerSessionID, null, "$PING$",
requestClients, null, null);
-
+ requestClients.put(LeasePinger.LEASE_PINGER_ID, leasePingerId);
+
+ InvocationRequest ir = new InvocationRequest(invokerSessionID, null,
"$PING$", requestClients, null, null);
invoker.invoke(ir);
if(trace) { log.trace(this + " successfully pinged the server"); }
Modified:
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java
===================================================================
---
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java 2009-04-30
05:27:56 UTC (rev 5093)
+++
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java 2009-04-30
21:32:26 UTC (rev 5094)
@@ -345,6 +345,7 @@
if (sessionId == null)
{
+ log.debug(this + " shutting down LeasePinger: " + leasePinger);
// Independent of any particular Client - force LeasePinger shutdown.
// Should be called only if there is a reasonable belief that the lease
// has already stopped on the server side.
@@ -361,9 +362,11 @@
else
{
// Remove a particular Client.
+ log.debug(this + " removing client " + sessionId + " from
LeasePinger: " + leasePinger);
boolean isLastClientLease = leasePinger.removeClient(sessionId);
if(isLastClientLease)
{
+ log.debug(this + " shutting down LeasePinger, " + sessionId +
" was last client lease: " + leasePinger);
try
{
leasePinger.stopPing();
@@ -376,6 +379,10 @@
}
}
}
+ else
+ {
+ log.debug(this + " leasePinger is null: must have been shut down
already");
+ }
}
}
@@ -413,8 +420,11 @@
// configuration should NOT be passed as want ping to be specific to client
invoker
// and NOT to the client.
- InvocationRequest ir =
- new InvocationRequest(invokerSessionID, null, "$PING$", null,
new HashMap(), null);
+ String leasePingerId = new GUID().toString();
+ Map requestMap = new HashMap();
+ requestMap.put(LeasePinger.LEASE_PINGER_ID, leasePingerId);
+ log.info(this + " initiating lease for leasePingerId " +
leasePingerId);
+ InvocationRequest ir = new InvocationRequest(invokerSessionID, null,
"$PING$", requestMap, new HashMap(), null);
Object ret = invoke(ir);
@@ -441,6 +451,7 @@
if(trace) { log.trace("server does have leasing enabled (with
default lease period of " + defaultLeasePeriod + ") and will start a new lease
pinger."); }
leasePinger = new LeasePinger(this, invokerSessionID,
defaultLeasePeriod);
+ leasePinger.setLeasePingerId(leasePingerId);
leasePinger.addClient(clientSessionID, configuration, leasePeriod);
leasePinger.startPing();
}
@@ -579,4 +590,11 @@
super.finalize();
}
+ protected LeasePinger getLeasePinger()
+ {
+ synchronized(clientLeaseLock)
+ {
+ return leasePinger;
+ }
+ }
}
Modified:
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Remoting.java
===================================================================
---
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Remoting.java 2009-04-30
05:27:56 UTC (rev 5093)
+++
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Remoting.java 2009-04-30
21:32:26 UTC (rev 5094)
@@ -76,4 +76,12 @@
* org.jboss.remoting.ServerInvoker.InvalidStateException to an
org.jboss.remoting.CannotConnectException.
*/
public static final String CHANGE_INVALID_STATE_TO_CANNOT_CONNECT =
"changeInvalidStateToCannotConnect";
+
+ /**
+ * Flags indicating that connection monitoring should treat a connection as being
defined
+ * by one or two of its endpoints. I.e., if a client invoker or server invoker stops
and restarts, then
+ * all connections it participated in are now gone.
+ */
+ public static final String USE_CLIENT_CONNECTION_IDENTITY =
"useClientConnectionIdentity";
+ public static final String USE_SERVER_CONNECTION_IDENTITY =
"useServerConnectionIdentity";
}
Modified:
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ServerInvoker.java
===================================================================
---
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ServerInvoker.java 2009-04-30
05:27:56 UTC (rev 5093)
+++
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ServerInvoker.java 2009-04-30
21:32:26 UTC (rev 5094)
@@ -262,6 +262,8 @@
protected ServerSocketFactory serverSocketFactory = null;
protected boolean registerCallbackListeners = true;
+
+ protected boolean useClientConnectionIdentity;
// Constructors
---------------------------------------------------------------------------------
@@ -702,6 +704,16 @@
return (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
}
+ protected boolean isUseClientConnectionIdentity()
+ {
+ return useClientConnectionIdentity;
+ }
+
+ protected void setUseClientConnectionIdentity(boolean useClientConnectionIdentity)
+ {
+ this.useClientConnectionIdentity = useClientConnectionIdentity;
+ }
+
public Object invoke(Object invoke) throws IOException
{
InvocationRequest request = null;
@@ -798,6 +810,7 @@
if ("$DISCONNECT$".equals(param))
{
+ log.debug(this + " got $DISCONNECT$");
if (leaseManagement)
{
terminateLease(invocation);
@@ -1177,6 +1190,13 @@
}
}
+ // config for useClientConnectionIdentity
+ String useClientConnectionIdentityString =
(String)config.get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
+ if(useClientConnectionIdentityString != null)
+ {
+ useClientConnectionIdentity =
Boolean.parseBoolean(useClientConnectionIdentityString);
+ }
+
// Inject ConnectionListener
String connectionListener = (String)config.get(CONNECTION_LISTENER);
if (connectionListener != null)
@@ -1694,6 +1714,7 @@
{
if (invocation != null)
{
+ // clientSessionId == MicroRemoteClientInvoker.invokerSessionID.
String clientSessionId = invocation.getSessionId();
Lease clientLease = (Lease)clientLeases.get(clientSessionId);
@@ -1711,9 +1732,9 @@
{
// just a client that disconnected, so only need to terminate lease
for
// that particular client (by client session id).
- if (trace) log.trace("terminating client lease: " +
clientSessionId);
ClientHolder holder = (ClientHolder) holderObj;
clientLease.terminateLease(holder.getSessionId());
+ if (true) log.debug("terminating client lease: " +
clientSessionId + ":" + holder.getSessionId());
clientOnlyTerminated = true;
}
}
@@ -1721,7 +1742,7 @@
// now see if client invoker needs to be terminated
if (!clientOnlyTerminated)
{
- if (trace) log.trace("terminating invoker lease: " +
clientSessionId);
+ if (true) log.debug("terminating invoker lease: " +
clientLease);
clientLease.terminateLease(clientSessionId);
clientLeases.remove(clientSessionId);
}
@@ -1729,7 +1750,7 @@
else
{
String type = "invoker";
- Map reqMap = invocation.getRequestPayload();
+ Map reqMap = invocation.getRequestPayload();
if (reqMap != null)
{
Object holderObj = reqMap.get(ClientHolder.CLIENT_HOLDER_KEY);
@@ -1738,8 +1759,9 @@
type = "client";
}
}
- log.warn("Asked to terminate " + type + " lease for client
session id " + clientSessionId +
- ", but lease for this id could not be found." + ":
" + clientLeases);
+ log.debug("Asked to terminate " + type + " lease for invoker
session id "
+ + clientSessionId + ", but lease for this id could not be
found." +"" +
+ "Probably has been removed due to connection
failure.");
}
}
}
@@ -1751,7 +1773,7 @@
String clientSessionId = invocation.getSessionId();
if(clientSessionId != null)
{
- if(trace) { log.trace("Getting lease for client session id: " +
clientSessionId); }
+ if(trace) { log.trace("Getting lease for invoker session id: " +
clientSessionId); }
Lease clientLease = (Lease)clientLeases.get(clientSessionId);
if(clientLease == null)
@@ -1764,15 +1786,48 @@
clientLeases.put(clientSessionId, newClientLease);
newClientLease.startLease();
-
- if(trace) { log.trace("No lease established for client session id
(" + clientSessionId + "), so starting a new one."); }
+
+ if(true) { log.debug("No lease established for invoker session id
(" + clientSessionId +
+ "), so starting a new one:" +
newClientLease); }
}
else
{
- // including request payload from invocation as may contain updated list
of clients.
- clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
+ if (useClientConnectionIdentity)
+ {
+ String leasePingerId = (String)
invocation.getRequestPayload().get(LeasePinger.LEASE_PINGER_ID);;
+ if (leasePingerId.equals(clientLease.getLeasePingerId()))
+ {
+ // including request payload from invocation as may contain updated
list of clients.
+ log.debug(clientLease + " matches: leasePingerId: " +
leasePingerId);
+ clientLease.updateLease(leasePeriod,
invocation.getRequestPayload());
+ if(trace) { log.trace("Updated lease for invoker session id
(" + clientSessionId + ")"); }
+ }
+ else
+ {
+ log.debug(clientLease + " does not match: leasePingerId: "
+ leasePingerId);
+ if (true) log.debug("terminating invoker lease: " +
clientLease);
+ clientLease.terminateLeaseUponFailure(clientSessionId);
+ clientLeases.remove(clientSessionId);
- if(trace) { log.trace("Updated lease for client session id (" +
clientSessionId + ")"); }
+ Lease newClientLease = new Lease(clientSessionId, leasePeriod,
+ locator.getLocatorURI(),
+ invocation.getRequestPayload(),
+ connectionNotifier,
+ clientLeases);
+
+ clientLeases.put(clientSessionId, newClientLease);
+ newClientLease.startLease();
+
+ if(true) { log.debug("starting a new lease:" +
newClientLease); }
+ }
+ }
+ else
+ {
+ // including request payload from invocation as may contain updated
list of clients.
+ clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
+
+ if(trace) { log.trace("Updated lease for client session id ("
+ clientSessionId + ")"); }
+ }
}
}
}
@@ -1782,7 +1837,7 @@
{
if(leaseManagement && invokerSessionId != null)
{
- if(trace) { log.trace("Checking lease for client session id: " +
invokerSessionId); }
+ if(trace) { log.trace("Checking lease for invoker session id: " +
invokerSessionId); }
Lease clientLease = (Lease)clientLeases.get(invokerSessionId);
if(clientLease == null)
Modified:
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Version.java
===================================================================
---
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Version.java 2009-04-30
05:27:56 UTC (rev 5093)
+++
remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Version.java 2009-04-30
21:32:26 UTC (rev 5094)
@@ -32,7 +32,7 @@
public static final byte VERSION_2 = 2;
public static final byte VERSION_2_2 = 22;
- public static final String VERSION = "2.2.2.SP11";
+ public static final String VERSION = "2.2.2.SP11_JBREM-1112:042409";
private static final byte byteVersion = VERSION_2_2;
private static byte defaultByteVersion = byteVersion;
private static boolean performVersioning = true;