[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting ...
Tom Elrod
tom.elrod at jboss.com
Thu Nov 9 16:35:23 EST 2006
User: telrod
Date: 06/11/09 16:35:23
Modified: src/main/org/jboss/remoting Version.java Client.java
Lease.java AbstractInvoker.java ServerInvoker.java
Log:
JBREM-622 & JBREM-629 - sync with remoting_2_x branch to pull in bug fixes.
Revision Changes Path
1.13 +1 -1 JBossRemoting/src/main/org/jboss/remoting/Version.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: Version.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/Version.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -b -r1.12 -r1.13
--- Version.java 21 Sep 2006 22:23:07 -0000 1.12
+++ Version.java 9 Nov 2006 21:35:23 -0000 1.13
@@ -31,7 +31,7 @@
public static final byte VERSION_1 = 1;
public static final byte VERSION_2 = 2;
- public static final String VERSION = "2.2.0 Alpha2 (Bluto)";
+ public static final String VERSION = "3.0.0 Alpha1 (Otter)";
private static final byte byteVersion = VERSION_2;
private static byte defaultByteVersion = byteVersion;
private static boolean performVersioning = true;
1.54 +126 -22 JBossRemoting/src/main/org/jboss/remoting/Client.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: Client.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/Client.java,v
retrieving revision 1.53
retrieving revision 1.54
diff -u -b -r1.53 -r1.54
--- Client.java 28 Oct 2006 17:49:13 -0000 1.53
+++ Client.java 9 Nov 2006 21:35:23 -0000 1.54
@@ -63,7 +63,7 @@
*
* @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
- * @version $Revision: 1.53 $
+ * @version $Revision: 1.54 $
*/
public class Client implements Externalizable
{
@@ -153,6 +153,8 @@
private Map callbackConnectors = new HashMap();
private Map callbackPollers = new HashMap();
+ private Map listeners = new HashMap();
+
private SocketFactory socketFactory;
private static final long serialVersionUID = 5679279425009837934L;
@@ -931,15 +933,42 @@
InvokerLocator callbackLocator, Object callbackHandlerObject)
throws Throwable
{
- Map internalMetadata = createListenerMetadata(callbackhandler);
+ // if callback locator is null, then is pull callbacks and need to track callback handler
+ // per Client (not by client invoker).
+ if (callbackLocator == null)
+ {
+ String listenerId = generateListenerId(callbackhandler);
+
+ // if listenerId is null, means this Client has already had the callbackhanler reference
+ // registered as a listener, so no need to add it again.
+ if (listenerId != null)
+ {
+ Map internalMetadata = new HashMap();
+ internalMetadata.put(LISTENER_ID_KEY, listenerId);
if (metadata != null)
{
internalMetadata.putAll(metadata);
}
- String listenerId = (String) internalMetadata.get(LISTENER_ID_KEY);
- invoker.addClientLocator(listenerId, callbackLocator);
- if (callbackLocator != null)
+ // now call server to add listener
+ invoke(new InternalInvocation(InternalInvocation.ADDLISTENER, null), internalMetadata, callbackLocator);
+ }
+ }
+ else
{
+ // is going to be push callbacks which means callback server locator involved.
+ // will have to delegate to client invoker.
+ String listenerId = invoker.addClientLocator(callbackhandler, callbackLocator);
+
+ if (listenerId != null)
+ {
+
+ Map internalMetadata = new HashMap();
+ internalMetadata.put(LISTENER_ID_KEY, listenerId);
+ if(metadata != null)
+ {
+ internalMetadata.putAll(metadata);
+ }
+
Client client = new Client(callbackLocator, subsystem);
client.setSessionId(getSessionId());
client.connect();
@@ -954,10 +983,23 @@
{
client.disconnect();
}
- }
// now call server to add listener
invoke(new InternalInvocation(InternalInvocation.ADDLISTENER, null), internalMetadata, callbackLocator);
}
+ }
+ }
+
+ private String generateListenerId(InvokerCallbackHandler callbackhandler)
+ {
+ String listenerId = null;
+ Object obj = listeners.get(callbackhandler);
+ if(obj == null)
+ {
+ listenerId = new GUID().toString();
+ listeners.put(callbackhandler, listenerId);
+ }
+ return listenerId;
+ }
/**
* Adds the specified handler as a callback listener for pull (sync) callbacks.
@@ -1044,12 +1086,30 @@
{
if (callbackHandler != null)
{
- Map metadata = createListenerMetadata(callbackHandler);
- String listenerId = (String) metadata.get(LISTENER_ID_KEY);
- // connect to the given client locator and remove handler as listener
- InvokerLocator locator = invoker.getClientLocator(listenerId);
- if (locator != null) // async callback
+ // first need to see if is push or pull callback (i.e. does have locator associated with it)
+ String listenerId = (String)listeners.remove(callbackHandler);
+ if(listenerId != null)
+ {
+ // have a pull callback handler
+ Map metadata = new HashMap();
+ metadata.put(LISTENER_ID_KEY, listenerId);
+ invoke(new InternalInvocation(InternalInvocation.REMOVELISTENER, null), metadata);
+ }
+ else
{
+ // have a push callback handler
+ List holderList = invoker.getClientLocators(callbackHandler);
+ if(holderList != null && holderList.size() > 0)
+ {
+ for(int x = 0; x < holderList.size(); x++)
+ {
+ AbstractInvoker.CallbackLocatorHolder holder = (AbstractInvoker.CallbackLocatorHolder)holderList.get(x);
+ listenerId = holder.getListenerId();
+ InvokerLocator locator = holder.getLocator();
+ Map metadata = new HashMap();
+ metadata.put(LISTENER_ID_KEY, listenerId);
+
+ // call to callback server to remove listener
Client client = new Client(locator, subsystem);
client.setSessionId(getSessionId());
client.connect();
@@ -1057,10 +1117,34 @@
new Object[]{callbackHandler}),
metadata);
client.disconnect();
- }
- // now call server to remove listener
+
+ // now call target server to remove listener
invoke(new InternalInvocation(InternalInvocation.REMOVELISTENER, null), metadata);
+ }
+ }
+ }
+
+
+
+
+// Map metadata = createListenerMetadata(callbackHandler);
+// String listenerId = (String) metadata.get(LISTENER_ID_KEY);
+// // connect to the given client locator and remove handler as listener
+// InvokerLocator locator = invoker.getClientLocator(listenerId);
+// if (locator != null) // async callback
+// {
+// Client client = new Client(locator, subsystem);
+// client.setSessionId(getSessionId());
+// client.connect();
+// client.invoke(new InternalInvocation(InternalInvocation.REMOVECLIENTLISTENER,
+// new Object[]{callbackHandler}),
+// metadata);
+// client.disconnect();
+// }
+// // now call server to remove listener
+// invoke(new InternalInvocation(InternalInvocation.REMOVELISTENER, null), metadata);
+
// clean up callback server connector if one exists
Connector callbackConnector = (Connector) callbackConnectors.remove(callbackHandler);
if (callbackConnector != null)
@@ -1100,11 +1184,21 @@
{
if (callbackHandler != null)
{
- Map metadata = createListenerMetadata(callbackHandler);
+ String listenerId = (String)listeners.get(callbackHandler);
+ if(listenerId != null)
+ {
+ Map metadata = new HashMap();
+ metadata.put(LISTENER_ID_KEY, listenerId);
return (List) invoke(new InternalInvocation(InternalInvocation.GETCALLBACKS, null), metadata);
}
else
{
+ log.error("Could not find listener id for InvokerCallbackHandler (" + callbackHandler + "), please verify handler has been registered as listener.");
+ return null;
+ }
+ }
+ else
+ {
throw new NullPointerException("Can not remove null InvokerCallbackHandler listener.");
}
}
@@ -1150,7 +1244,17 @@
if (callbackIds.size() == 0)
return 0;
- Map metadata = createListenerMetadata(callbackHandler);
+ Map metadata = new HashMap();
+ String listenerId = (String)listeners.get(callbackHandler);
+ if(listenerId != null)
+ {
+ metadata.put(LISTENER_ID_KEY, listenerId);
+ }
+ else
+ {
+ log.error("Could not find listener id for InvokerCallbackHandler (" + callbackHandler + "), please verify handler has been registered as listener.");
+ }
+
Object[] callbackIdArray = callbackIds.toArray();
InternalInvocation invocation = new InternalInvocation(InternalInvocation.ACKNOWLEDGECALLBACK, callbackIdArray);
invoke(invocation, metadata);
1.12 +11 -4 JBossRemoting/src/main/org/jboss/remoting/Lease.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: Lease.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/Lease.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -b -r1.11 -r1.12
--- Lease.java 26 Sep 2006 02:27:52 -0000 1.11
+++ Lease.java 9 Nov 2006 21:35:23 -0000 1.12
@@ -242,6 +242,8 @@
}
else
{
+ try
+ {
stopLease();
notifyClientLost();
if (clientLeases != null)
@@ -249,6 +251,11 @@
clientLeases.remove(clientSessionId);
}
}
+ catch (Throwable thr)
+ {
+ log.error("Error terminating client lease and sending notification of lost client.", thr);
+ }
+ }
}
}
}
\ No newline at end of file
1.13 +133 -27 JBossRemoting/src/main/org/jboss/remoting/AbstractInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: AbstractInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/AbstractInvoker.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -b -r1.12 -r1.13
--- AbstractInvoker.java 19 Jul 2006 16:39:17 -0000 1.12
+++ AbstractInvoker.java 9 Nov 2006 21:35:23 -0000 1.13
@@ -23,16 +23,23 @@
package org.jboss.remoting;
import org.jboss.logging.Logger;
+import org.jboss.remoting.callback.InvokerCallbackHandler;
import org.jboss.remoting.loading.ClassByteClassLoader;
import org.jboss.remoting.marshal.MarshallLoaderFactory;
import org.jboss.remoting.security.SSLSocketBuilder;
import org.jboss.remoting.serialization.SerializationStreamFactory;
+import org.jboss.util.id.GUID;
import javax.net.SocketFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* AbstractInvoker is an abstract handler part that contains common methods between both
@@ -40,7 +47,7 @@
*
* @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public abstract class AbstractInvoker implements Invoker
{
@@ -105,18 +112,34 @@
}
/**
- * Sets the callback server locator for the specified callback listener id
+ * Generates a listener id for the callbackhandler and callback server locator combination
*
* @param locator
*/
- public void addClientLocator(String listenerId, InvokerLocator locator)
+ public String addClientLocator(InvokerCallbackHandler callbackhandler, InvokerLocator locator)
{
- Object obj = localServerLocators.put(listenerId, locator);
- if(obj != null)
+ String listenerId = null;
+ synchronized(localServerLocators)
{
- throw new RuntimeException("InvokerLocator already exists for listener id " + listenerId + ". " +
- "Remove this listener before adding again.");
+ Collection holders = localServerLocators.values();
+ Iterator itr = holders.iterator();
+ while(itr.hasNext())
+ {
+ CallbackHandlerHolder holder = (CallbackHandlerHolder)itr.next();
+ if(holder.getHandler().equals(callbackhandler) && holder.getLocator().equals(locator))
+ {
+ // the entry already exists for this pair, so return null
+ return null;
+ }
}
+
+ // if got this far, the entry does not exist, so need to add it and create a listener id
+ CallbackHandlerHolder holder = new CallbackHandlerHolder(callbackhandler, locator);
+ listenerId = new GUID().toString();
+ localServerLocators.put(listenerId, holder);
+ }
+
+ return listenerId;
}
/**
@@ -127,7 +150,46 @@
*/
public InvokerLocator getClientLocator(String listenerId)
{
- return (InvokerLocator) localServerLocators.get(listenerId);
+ InvokerLocator locator = null;
+ if(listenerId != null)
+ {
+ CallbackHandlerHolder holder = (CallbackHandlerHolder) localServerLocators.get(listenerId);
+ if(holder != null)
+ {
+ locator = holder.getLocator();
+ }
+ }
+ return locator;
+ }
+
+ public List getClientLocators(InvokerCallbackHandler handler)
+ {
+ List holderList = new ArrayList();
+ if(handler != null)
+ {
+ synchronized(localServerLocators)
+ {
+ Set entries = localServerLocators.entrySet();
+ Iterator itr = entries.iterator();
+ while(itr.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)itr.next();
+ String listenerId = (String)entry.getKey();
+ CallbackHandlerHolder holder = (CallbackHandlerHolder)entry.getValue();
+ CallbackLocatorHolder locatorHolder = new CallbackLocatorHolder(listenerId, holder.getLocator());
+ holderList.add(locatorHolder);
+ }
+ // now remove holders
+ if(holderList.size() > 0)
+ {
+ for(int x = 0; x < holderList.size(); x++)
+ {
+ localServerLocators.remove(((CallbackLocatorHolder)holderList.get(x)).getListenerId());
+ }
+ }
+ }
+ }
+ return holderList;
}
/**
@@ -274,4 +336,48 @@
else
return false;
}
+
+ private class CallbackHandlerHolder
+ {
+ private InvokerCallbackHandler handler;
+ private InvokerLocator locator;
+
+ private CallbackHandlerHolder(InvokerCallbackHandler handler, InvokerLocator locator)
+ {
+ this.handler = handler;
+ this.locator = locator;
+}
+
+ public InvokerCallbackHandler getHandler()
+ {
+ return handler;
+ }
+
+ public InvokerLocator getLocator()
+ {
+ return locator;
+ }
+ }
+
+ public class CallbackLocatorHolder
+ {
+ private InvokerLocator locator;
+ private String listenerId;
+
+ public CallbackLocatorHolder(String listenerId, InvokerLocator locator)
+ {
+ this.listenerId = listenerId;
+ this.locator = locator;
+ }
+
+ public String getListenerId()
+ {
+ return listenerId;
+ }
+
+ public InvokerLocator getLocator()
+ {
+ return locator;
+ }
+ }
}
1.54 +10 -7 JBossRemoting/src/main/org/jboss/remoting/ServerInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/ServerInvoker.java,v
retrieving revision 1.53
retrieving revision 1.54
diff -u -b -r1.53 -r1.54
--- ServerInvoker.java 2 Nov 2006 17:40:45 -0000 1.53
+++ ServerInvoker.java 9 Nov 2006 21:35:23 -0000 1.54
@@ -58,7 +58,7 @@
*
* @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
- * @version $Revision: 1.53 $
+ * @version $Revision: 1.54 $
*/
public abstract class ServerInvoker extends AbstractInvoker implements ServerInvokerMBean
{
@@ -180,7 +180,7 @@
private long leasePeriod = DEFAULT_CLIENT_LEASE_PERIOD;
private boolean leaseManagement = false;
private Map clientLeases = new HashMap();
- protected ConnectionNotifier connectionNotifier = null;
+ protected ConnectionNotifier connectionNotifier = new ConnectionNotifier();
protected ServerSocketFactory serverSocketFactory = null;
@@ -541,11 +541,8 @@
public void addConnectionListener(ConnectionListener listener)
{
- if(connectionNotifier == null)
+ if(listener != null)
{
- connectionNotifier = new ConnectionNotifier();
- }
-
connectionNotifier.addListener(listener);
if(leasePeriod > 0)
@@ -553,13 +550,18 @@
leaseManagement = true;
}
}
+ else
+ {
+ throw new IllegalArgumentException("Can not add null ConnectionListener.");
+ }
+ }
public void removeConnectionListener(ConnectionListener listener)
{
if(connectionNotifier != null)
{
connectionNotifier.removeListener(listener);
- }
+
// turn off lease management if no listeners (since no one to tell client died)
if(connectionNotifier.size() == 0)
{
@@ -577,6 +579,7 @@
clientLeases.clear();
}
}
+ }
/**
* Sets the amount of time (in milliseconds) that a client should renew its lease.
More information about the jboss-cvs-commits
mailing list