[jboss-cvs] JBossCache/src/org/jboss/cache/notifications ...
Manik Surtani
msurtani at jboss.com
Wed Jan 3 10:33:08 EST 2007
User: msurtani
Date: 07/01/03 10:33:08
Modified: src/org/jboss/cache/notifications Notifier.java
Log:
Improved notification mechanism, added a notification interceptor
Revision Changes Path
1.14 +322 -114 JBossCache/src/org/jboss/cache/notifications/Notifier.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: Notifier.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/notifications/Notifier.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -b -r1.13 -r1.14
--- Notifier.java 30 Dec 2006 19:48:45 -0000 1.13
+++ Notifier.java 3 Jan 2007 15:33:08 -0000 1.14
@@ -13,10 +13,13 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.InvocationContext;
+import org.jboss.cache.marshall.MethodCall;
import org.jgroups.View;
+import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -43,7 +46,34 @@
private Set<CacheListener> listeners = new CopyOnWriteArraySet();
private CacheImpl cache;
private InvocationContext tempCtx;
- private Log log = LogFactory.getLog(Notifier.class);
+ private static final Log log = LogFactory.getLog(Notifier.class);
+
+ // --- the java.lang.reflect.Methods of CacheListener
+ private static Method nodeCreated, nodeModified, nodeRemoved, nodeVisited, nodeEvicted, nodeLoaded, nodePassivated, nodeActivated, nodeMoved, cacheStarted, cacheStopped, viewChange;
+
+ static
+ {
+ try
+ {
+ nodeCreated = CacheListener.class.getMethod("nodeCreated", Fqn.class, boolean.class, boolean.class);
+ nodeModified = CacheListener.class.getMethod("nodeModified", Fqn.class, boolean.class, boolean.class, Map.class);
+ nodeRemoved = CacheListener.class.getMethod("nodeRemoved", Fqn.class, boolean.class, boolean.class, Map.class);
+ nodeVisited = CacheListener.class.getMethod("nodeVisited", Fqn.class, boolean.class);
+ nodeEvicted = CacheListener.class.getMethod("nodeEvicted", Fqn.class, boolean.class, boolean.class);
+ nodeLoaded = CacheListener.class.getMethod("nodeLoaded", Fqn.class, boolean.class, Map.class);
+ nodePassivated = CacheListener.class.getMethod("nodePassivated", Fqn.class, boolean.class);
+ nodeActivated = CacheListener.class.getMethod("nodeActivated", Fqn.class, boolean.class);
+ nodeMoved = CacheListener.class.getMethod("nodeMoved", Fqn.class, Fqn.class, boolean.class, boolean.class);
+ cacheStarted = CacheListener.class.getMethod("cacheStarted", CacheSPI.class);
+ cacheStopped = CacheListener.class.getMethod("cacheStopped", CacheSPI.class);
+ viewChange = CacheListener.class.getMethod("viewChange", View.class);
+ }
+ catch (Exception e)
+ {
+ log.error("Unable to initialise Notifier - unable to get Methods on CacheListener.class", e);
+ }
+ }
+
public Notifier(CacheImpl cache)
{
@@ -112,11 +142,14 @@
*
* @param fqn
* @param pre
+ * @param sendImmediately
*/
- public synchronized void notifyNodeCreated(Fqn fqn, boolean pre)
+ public synchronized void notifyNodeCreated(Fqn fqn, boolean pre, boolean sendImmediately)
{
boolean originLocal = cache.getInvocationContext().isOriginLocal();
+ if (sendImmediately)
+ {
resetInvocationContext();
if (evictionPolicyListener != null)
{
@@ -131,12 +164,11 @@
}
restoreInvocationContext();
}
-
- private Map copy(Map<Object, Object> data)
+ else
{
- if (data == null)
- return null;
- return Collections.unmodifiableMap(new HashMap<Object, Object>(data));
+ MethodCall call = new MethodCall(nodeCreated, new Object[]{fqn, pre, originLocal});
+ cache.getInvocationContext().addCacheListenerEvent(call);
+ }
}
/**
@@ -145,24 +177,34 @@
* @param fqn
* @param pre
* @param data
+ * @param sendImmediately
*/
- public synchronized void notifyNodeModified(Fqn fqn, boolean pre, Map<Object, Object> data)
+ public synchronized void notifyNodeModified(Fqn fqn, boolean pre, Map<Object, Object> data, boolean sendImmediately)
{
boolean originLocal = cache.getInvocationContext().isOriginLocal();
+ Map dataCopy = copy(data);
+ if (sendImmediately)
+ {
resetInvocationContext();
if (evictionPolicyListener != null)
{
- evictionPolicyListener.nodeModified(fqn, pre, originLocal, copy(data));
+ evictionPolicyListener.nodeModified(fqn, pre, originLocal, dataCopy);
}
if (hasListeners)
{
for (CacheListener listener : listeners)
{
- listener.nodeModified(fqn, pre, originLocal, copy(data));
+ listener.nodeModified(fqn, pre, originLocal, dataCopy);
}
}
restoreInvocationContext();
}
+ else
+ {
+ MethodCall call = new MethodCall(nodeModified, new Object[]{fqn, pre, originLocal, dataCopy});
+ cache.getInvocationContext().addCacheListenerEvent(call);
+ }
+ }
/**
* Notifies all registered listeners of a nodeRemoved event.
@@ -170,32 +212,46 @@
* @param fqn
* @param pre
* @param data
+ * @param sendImmediately
*/
- public synchronized void notifyNodeRemoved(Fqn fqn, boolean pre, Map<Object, Object> data)
+ public synchronized void notifyNodeRemoved(Fqn fqn, boolean pre, Map<Object, Object> data, boolean sendImmediately)
{
boolean originLocal = cache.getInvocationContext().isOriginLocal();
+ Map dataCopy = copy(data);
+ if (sendImmediately)
+ {
resetInvocationContext();
if (evictionPolicyListener != null)
{
- evictionPolicyListener.nodeRemoved(fqn, pre, originLocal, copy(data));
+ evictionPolicyListener.nodeRemoved(fqn, pre, originLocal, dataCopy);
}
if (hasListeners)
{
for (CacheListener listener : listeners)
{
- listener.nodeRemoved(fqn, pre, originLocal, copy(data));
+ listener.nodeRemoved(fqn, pre, originLocal, dataCopy);
}
}
restoreInvocationContext();
}
+ else
+ {
+ MethodCall call = new MethodCall(nodeRemoved, new Object[]{fqn, pre, originLocal, dataCopy});
+ cache.getInvocationContext().addCacheListenerEvent(call);
+ }
+
+ }
/**
* Notifies all registered listeners of a nodeVisited event.
*
* @param fqn
* @param pre
+ * @param sendImmediately
*/
- public synchronized void notifyNodeVisited(Fqn fqn, boolean pre)
+ public synchronized void notifyNodeVisited(Fqn fqn, boolean pre, boolean sendImmediately)
+ {
+ if (sendImmediately)
{
resetInvocationContext();
if (evictionPolicyListener != null)
@@ -211,16 +267,53 @@
}
restoreInvocationContext();
}
+ else
+ {
+ MethodCall call = new MethodCall(nodeVisited, new Object[]{fqn, pre});
+ cache.getInvocationContext().addCacheListenerEvent(call);
+ }
+
+ }
+
+ public synchronized void notifyNodeMoved(Fqn originalFqn, Fqn newFqn, boolean pre, boolean sendImmediately)
+ {
+ boolean originLocal = cache.getInvocationContext().isOriginLocal();
+ if (sendImmediately)
+ {
+ resetInvocationContext();
+ if (evictionPolicyListener != null)
+ {
+ evictionPolicyListener.nodeMoved(originalFqn, newFqn, pre, originLocal);
+ }
+ if (hasListeners)
+ {
+ for (CacheListener listener : listeners)
+ {
+ listener.nodeMoved(originalFqn, newFqn, pre, originLocal);
+ }
+ }
+ restoreInvocationContext();
+ }
+ else
+ {
+ MethodCall call = new MethodCall(nodeMoved, new Object[]{originalFqn, newFqn, pre, originLocal});
+ cache.getInvocationContext().addCacheListenerEvent(call);
+ }
+ }
+
/**
* Notifies all registered listeners of a nodeEvicted event.
*
* @param fqn
* @param pre
+ * @param sendImmediately
*/
- public synchronized void notifyNodeEvicted(Fqn fqn, boolean pre)
+ public synchronized void notifyNodeEvicted(Fqn fqn, boolean pre, boolean sendImmediately)
{
boolean originLocal = cache.getInvocationContext().isOriginLocal();
+ if (sendImmediately)
+ {
resetInvocationContext();
if (evictionPolicyListener != null)
{
@@ -235,6 +328,12 @@
}
restoreInvocationContext();
}
+ else
+ {
+ MethodCall call = new MethodCall(nodeEvicted, new Object[]{fqn, pre, originLocal});
+ cache.getInvocationContext().addCacheListenerEvent(call);
+ }
+ }
/**
* Notifies all registered listeners of a nodeLoaded event.
@@ -242,31 +341,44 @@
* @param fqn
* @param pre
* @param data
+ * @param sendImmediately
*/
- public synchronized void notifyNodeLoaded(Fqn fqn, boolean pre, Map<Object, Object> data)
+ public synchronized void notifyNodeLoaded(Fqn fqn, boolean pre, Map<Object, Object> data, boolean sendImmediately)
+ {
+ Map dataCopy = copy(data);
+ if (sendImmediately)
{
resetInvocationContext();
if (evictionPolicyListener != null)
{
- evictionPolicyListener.nodeLoaded(fqn, pre, data);
+ evictionPolicyListener.nodeLoaded(fqn, pre, dataCopy);
}
if (hasListeners)
{
for (CacheListener listener : listeners)
{
- listener.nodeLoaded(fqn, pre, data);
+ listener.nodeLoaded(fqn, pre, dataCopy);
}
}
restoreInvocationContext();
}
+ else
+ {
+ MethodCall call = new MethodCall(nodeLoaded, new Object[]{fqn, pre, dataCopy});
+ cache.getInvocationContext().addCacheListenerEvent(call);
+ }
+ }
/**
* Notifies all registered listeners of a nodeActivated event.
*
* @param fqn
* @param pre
+ * @param sendImmediately
*/
- public synchronized void notifyNodeActivated(Fqn fqn, boolean pre)
+ public synchronized void notifyNodeActivated(Fqn fqn, boolean pre, boolean sendImmediately)
+ {
+ if (sendImmediately)
{
resetInvocationContext();
if (evictionPolicyListener != null)
@@ -282,14 +394,23 @@
}
restoreInvocationContext();
}
+ else
+ {
+ MethodCall call = new MethodCall(nodeActivated, new Object[]{fqn, pre});
+ cache.getInvocationContext().addCacheListenerEvent(call);
+ }
+ }
/**
* Notifies all registered listeners of a nodePassivated event.
*
* @param fqn
* @param pre
+ * @param sendImmediately
*/
- public synchronized void notifyNodePassivated(Fqn fqn, boolean pre)
+ public synchronized void notifyNodePassivated(Fqn fqn, boolean pre, boolean sendImmediately)
+ {
+ if (sendImmediately)
{
resetInvocationContext();
if (evictionPolicyListener != null)
@@ -305,13 +426,22 @@
}
restoreInvocationContext();
}
+ else
+ {
+ MethodCall call = new MethodCall(nodePassivated, new Object[]{fqn, pre});
+ cache.getInvocationContext().addCacheListenerEvent(call);
+ }
+ }
/**
* Notifies all registered listeners of a cacheStarted event.
*
* @param cache
+ * @param sendImmediately
*/
- public synchronized void notifyCacheStarted(CacheSPI cache)
+ public synchronized void notifyCacheStarted(CacheSPI cache, boolean sendImmediately)
+ {
+ if (sendImmediately)
{
resetInvocationContext();
if (evictionPolicyListener != null)
@@ -327,13 +457,22 @@
}
restoreInvocationContext();
}
+ else
+ {
+ MethodCall call = new MethodCall(cacheStarted, new Object[]{cache});
+ cache.getInvocationContext().addCacheListenerEvent(call);
+ }
+ }
/**
* Notifies all registered listeners of a cacheStopped event.
*
* @param cache
+ * @param sendImmediately
*/
- public synchronized void notifyCacheStopped(CacheSPI cache)
+ public synchronized void notifyCacheStopped(CacheSPI cache, boolean sendImmediately)
+ {
+ if (sendImmediately)
{
resetInvocationContext();
if (evictionPolicyListener != null)
@@ -349,6 +488,50 @@
}
restoreInvocationContext();
}
+ else
+ {
+ MethodCall call = new MethodCall(cacheStopped, new Object[]{cache});
+ cache.getInvocationContext().addCacheListenerEvent(call);
+ }
+ }
+
+ /**
+ * Notifies all registered listeners of a viewChange event.
+ *
+ * @param new_view
+ * @param sendImmediately
+ */
+ public synchronized void notifyViewChange(View new_view, boolean sendImmediately)
+ {
+ if (sendImmediately)
+ {
+ resetInvocationContext();
+ if (evictionPolicyListener != null)
+ {
+ evictionPolicyListener.viewChange(new_view);
+ }
+ if (hasListeners)
+ {
+ for (CacheListener listener : listeners)
+ {
+ listener.viewChange(new_view);
+ }
+ }
+ restoreInvocationContext();
+ }
+ else
+ {
+ MethodCall call = new MethodCall(viewChange, new Object[]{new_view});
+ cache.getInvocationContext().addCacheListenerEvent(call);
+ }
+ }
+
+ private Map copy(Map<Object, Object> data)
+ {
+ if (data == null) return null;
+ if (data.isEmpty()) return Collections.emptyMap();
+ return Collections.unmodifiableMap(new HashMap<Object, Object>(data));
+ }
private void restoreInvocationContext()
{
@@ -369,24 +552,49 @@
}
/**
- * Notifies all registered listeners of a viewChange event.
+ * Fires off all notifications for a given queue.
*
- * @param new_view
+ * @param queue queue to process.
*/
- public synchronized void notifyViewChange(View new_view)
+ public void invokeQueuedNotifications(List<MethodCall> queue)
{
resetInvocationContext();
+ for (MethodCall c : queue)
+ {
if (evictionPolicyListener != null)
{
- evictionPolicyListener.viewChange(new_view);
+ try
+ {
+ c.invoke(evictionPolicyListener);
+ }
+ catch (Throwable throwable)
+ {
+ log.error("Unable to deliver queued notification " + c + " to eviction policy listener", throwable);
+ }
}
if (hasListeners)
{
for (CacheListener listener : listeners)
{
- listener.viewChange(new_view);
+ try
+ {
+ c.invoke(listener);
+ }
+ catch (Throwable throwable)
+ {
+ log.error("Unable to deliver queued notification " + c + " to listener " + listener, throwable);
+ }
+ }
}
}
restoreInvocationContext();
}
+
+ /**
+ * Fires off all notifications that have been registered within the current invocation, with sendImmediately set to false.
+ */
+ public void invokeQueuedNotifications()
+ {
+ invokeQueuedNotifications(cache.getInvocationContext().getCacheListenerEvents());
+ }
}
More information about the jboss-cvs-commits
mailing list