[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...
Ron Sigal
ron_sigal at yahoo.com
Wed Aug 22 23:05:21 EDT 2007
User: rsigal
Date: 07/08/22 23:05:21
Modified: src/main/org/jboss/remoting/callback Tag: remoting_2_2_0_GA
ServerInvokerCallbackHandler.java
Log:
JBREM-641, JBREM-782: Merged changes from branch remoting_2_2_2_experimental..
Revision Changes Path
No revision
No revision
1.15.2.10.2.3 +135 -30 JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerInvokerCallbackHandler.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java,v
retrieving revision 1.15.2.10.2.2
retrieving revision 1.15.2.10.2.3
diff -u -b -r1.15.2.10.2.2 -r1.15.2.10.2.3
--- ServerInvokerCallbackHandler.java 5 Aug 2007 21:04:19 -0000 1.15.2.10.2.2
+++ ServerInvokerCallbackHandler.java 23 Aug 2007 03:05:21 -0000 1.15.2.10.2.3
@@ -73,6 +73,8 @@
private String listenerId;
private String clientSessionId;
private InvokerLocator serverLocator;
+ private int blockingTimeout = ServerInvoker.DEFAULT_BLOCKING_TIMEOUT;
+
private SerializableStore callbackStore = null;
private CallbackErrorHandler callbackErrorHandler = null;
@@ -142,52 +144,68 @@
init(invocation, owner);
}
+ public void connect() throws Exception
+ {
+ if (callBackClient != null)
+ {
+ if (callBackClient.isConnected())
+ return;
+ callBackClient.connect();
+ }
+
+ }
+
private void init(InvocationRequest invocation, ServerInvoker owner) throws Exception
{
clientSessionId = invocation.getSessionId();
sessionId = invocation.getSessionId();
- Map metadata = invocation.getRequestPayload();
- if(metadata != null)
+
+ Map metadata = null;
+ if (owner.getConfiguration() == null)
+ {
+ metadata = new HashMap();
+ }
+ else
{
+ metadata = new HashMap(owner.getConfiguration());
+ }
+ if(invocation.getRequestPayload() != null)
+ {
+ metadata.putAll(invocation.getRequestPayload());
+ }
+
listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
if(listenerId != null)
{
sessionId = sessionId + "+" + listenerId;
}
- }
+ log.debug("Session id for callback handler is " + sessionId);
+
if(invocation.getLocator() != null)
{
- Map clientConfig = new HashMap();
- if(owner.getConfiguration() != null)
- {
- clientConfig.putAll(owner.getConfiguration());
- clientConfig.putAll(metadata);
- }
-
- Object o = clientConfig.get(CALLBACK_TIMEOUT);
- if (o instanceof String)
+ Object val = metadata.get(CALLBACK_TIMEOUT);
+ if (val instanceof String)
{
try
{
- Integer.parseInt((String) o);
- clientConfig.put(ServerInvoker.TIMEOUT, o);
- log.debug(this + " using callbackTimeout value " + o);
+ Integer.parseInt((String) val);
+ metadata.put(ServerInvoker.TIMEOUT, val);
+ log.debug(this + " using callbackTimeout value " + val);
}
catch (NumberFormatException e)
{
- log.warn("callbackTimeout value must have valid numeric format: " + o);
+ log.warn("callbackTimeout value must have valid numeric format: " + val);
}
}
- else if (o != null)
+ else if (val != null)
{
- log.warn("callbackTimeout value must be a String: " + o);
+ log.warn("callbackTimeout value must be a String: " + val);
}
// need to configure callback client with ssl config if one exists for server
- configureSocketFactory(clientConfig, owner);
+ configureSocketFactory(metadata, owner);
- callBackClient = new Client(invocation.getLocator(), invocation.getSubsystem(), clientConfig);
- callBackClient.connect();
+ callBackClient = new Client(invocation.getLocator(), invocation.getSubsystem(), metadata);
createCallbackErrorHandler(owner, invocation.getSubsystem());
}
else
@@ -195,7 +213,26 @@
createCallbackStore(owner, sessionId);
}
- log.debug("Session id for callback handler is " + sessionId);
+ Object val = metadata.get(ServerInvoker.BLOCKING_TIMEOUT);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ try
+ {
+ blockingTimeout = Integer.parseInt((String) val);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long. " + e.getMessage());
+ }
+ }
+ else
+ {
+ log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
}
/**
@@ -506,7 +543,69 @@
return sessionId;
}
- public List getCallbacks()
+ public List getCallbacks(Map metadata)
+ {
+ log.trace("entering getCallbacks()");
+
+ boolean blocking = false;
+ int currentBlockingTimeout = blockingTimeout;
+
+ if (metadata != null)
+ {
+ Object val = metadata.get(ServerInvoker.BLOCKING_MODE);
+ if (ServerInvoker.BLOCKING.equals(val))
+ blocking = true;
+
+ val = metadata.get(ServerInvoker.BLOCKING_TIMEOUT);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ try
+ {
+ currentBlockingTimeout = Integer.parseInt((String) val);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long. " + e.getMessage());
+ }
+ }
+ else
+ {
+ log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
+ }
+
+ if (trace)
+ {
+ log.trace("block: " + blocking);
+ log.trace("blocking timeout: " + currentBlockingTimeout);
+ }
+
+ synchronized (callbacks)
+ {
+ List callbackList = constructCallbackList();
+ if (blocking && callbackList.isEmpty())
+ {
+ try
+ {
+ callbacks.wait(currentBlockingTimeout);
+ callbackList = constructCallbackList();
+ }
+ catch (InterruptedException e)
+ {
+ log.debug("unexpected interrupt");
+ }
+ }
+
+ if (trace) log.trace("callbackList.size(): " + callbackList.size());
+ return callbackList;
+ }
+ }
+
+ private List constructCallbackList()
{
List callbackList = null;
synchronized(callbacks)
@@ -631,12 +730,17 @@
if(callBackClient == null)
{
- // need to check if shoudl persist callback instead of keeping in memory
+ // need to check if should persist callback instead of keeping in memory
if(shouldPersist())
{
try
{
+ synchronized (callbacks)
+ {
persistCallback(callback);
+ callbacks.notify();
+ }
+
callback = null;
// try to help out with the amount of memory usuage
new Thread()
@@ -660,6 +764,7 @@
{
if(trace){ log.debug(this + " got PULL callback. Adding to callback list ..."); }
callbacks.add(callback);
+ callbacks.notify();
}
}
}
More information about the jboss-cvs-commits
mailing list