[jboss-cvs] JBossAS SVN: r107007 - projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 21 16:26:36 EDT 2010
Author: smarlow at redhat.com
Date: 2010-07-21 16:26:36 -0400 (Wed, 21 Jul 2010)
New Revision: 107007
Added:
projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java
Log:
JBCLUSTER-255 DistributedState. listener needs to be implemented still...
Added: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java (rev 0)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java 2010-07-21 20:26:36 UTC (rev 107007)
@@ -0,0 +1,531 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ha.framework.server;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
+import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
+import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
+import org.infinispan.tree.TreeCache;
+import org.infinispan.CacheException;
+
+import org.infinispan.notifications.Listener;
+import org.infinispan.tree.Fqn;
+import org.infinispan.tree.Node;
+import org.infinispan.tree.TreeCacheFactory;
+import org.jboss.ha.framework.server.spi.ManagedDistributedState;
+import org.jboss.logging.Logger;
+
+/**
+ * This class manages distributed state across the cluster.
+ *
+ * @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
+ * @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
+ * @author Scott.Stark at jboss.org
+ * @author Scott Marlow
+ * @version $Revision:77673 $
+ */
+ at Listener
+public class DistributedStateImpl
+ implements ManagedDistributedState
+{
+ public static final Serializable ROOT = "__DISTRIBUTED_STATE__";
+ public static final Fqn ROOTFQN = Fqn.fromElements(ROOT);
+ private static final Logger LOG = Logger.getLogger(DistributedStateImpl.class);
+
+ private final Map<String, List<Object>> keyListeners = new HashMap<String, List<Object>>();
+
+ private volatile TreeCache<Serializable, Serializable> cache;
+ private volatile boolean replAsync;
+ private volatile HAPartitionCacheHandlerImpl cacheHandler;
+
+ public HAPartitionCacheHandlerImpl getCacheHandler()
+ {
+ return cacheHandler;
+ }
+
+ public void setCacheHandler(HAPartitionCacheHandlerImpl cacheHandler)
+ {
+ this.cacheHandler = cacheHandler;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void createService() throws Exception
+ {
+ }
+
+ public void startService() throws Exception
+ {
+ this.internalSetClusteredCache(cacheHandler.getCache());
+ this.cache.getCache().addListener(this);
+ }
+
+ public void stopService() throws Exception
+ {
+ this.cache.getCache().removeListener(this);
+ }
+
+ public void destroyService() throws Exception
+ {
+ }
+
+ public String listContent() throws Exception
+ {
+ StringBuilder result = new StringBuilder();
+ Collection<String> cats = this.getAllCategories();
+ if (cats == null) return result.toString();
+
+ Iterator<String> catsIter = cats.iterator();
+ while (catsIter.hasNext())
+ {
+ String category = (String) catsIter.next();
+ Iterator<Serializable> keysIter = this.getAllKeys(category).iterator();
+
+ result.append("-----------------------------------------------\n");
+ result.append("Logger : ").append(category).append("\n\n");
+ result.append("KEY\t:\tVALUE\n");
+
+ while (keysIter.hasNext())
+ {
+ Serializable key = (Serializable) keysIter.next();
+ String value = this.get(category, key).toString();
+ result.append("'").append(key);
+ result.append("'\t:\t'");
+ result.append(value);
+ result.append("'\n");
+ }
+ result.append("\n");
+ }
+ return result.toString();
+ }
+
+ public String listXmlContent() throws Exception
+ {
+ StringBuilder result = new StringBuilder();
+ result.append("<DistributedState>\n");
+ Collection<String> cats = this.getAllCategories();
+ if (cats != null)
+ {
+ Iterator<String> catsIter = cats.iterator();
+ while (catsIter.hasNext())
+ {
+ String category = (String) catsIter.next();
+ Iterator<Serializable> keysIter = this.getAllKeys(category).iterator();
+
+ result.append("\t<Logger>\n");
+ result.append("\t\t<LoggerName>").append(category).append("</LoggerName>\n");
+
+ while (keysIter.hasNext())
+ {
+ Serializable key = (Serializable) keysIter.next();
+ String value = this.get(category, key).toString();
+ result.append("\t\t<Entry>\n");
+ result.append("\t\t\t<Key>").append(key).append("</Key>\n");
+ result.append("\t\t\t<Value>").append(value).append("</Value>\n");
+ result.append("\t\t</Entry>\n");
+ }
+ result.append("\t</Logger>\n");
+ }
+ }
+
+ result.append("</DistributedState>\n");
+
+ return result.toString();
+ }
+
+ // DistributedState implementation ----------------------------------------------
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.jboss.ha.framework.interfaces.DistributedState#set(java.lang.String,
+ * java.io.Serializable, java.io.Serializable)
+ */
+ @Override
+ public void set(String category, Serializable key, Serializable value) throws Exception
+ {
+ this.set(category, key, value, true);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.jboss.ha.framework.interfaces.DistributedState#set(java.lang.String,
+ * java.io.Serializable, java.io.Serializable, boolean) @param
+ * asynchronousCall is not supported yet. TreeCache cannot switch this
+ * on the fly. Will take value from TreeCache-config instead.
+ */
+ @Override
+ public void set(String category, Serializable key, Serializable value, boolean asynchronousCall) throws Exception
+ {
+ //if (this.replAsync != asynchronousCall)
+ //{
+ // if (asynchronousCall)
+ // {
+ // this.cache.put(this.buildFqn(category), key, value, Flag.FORCE_ASYNCHRONOUS);
+ // }
+ // else
+ // {
+ // this.cache.put(this.buildFqn(category), key, value, Flag.FORCE_SYNCHRONOUS);
+ // }
+ //}
+
+ this.cache.put(this.buildFqn(category), key, value);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.jboss.ha.framework.interfaces.DistributedState#remove(java.lang.String,
+ * java.io.Serializable) @return - returns null in case of
+ * CacheException
+ */
+ @Override
+ public Serializable remove(String category, Serializable key) throws Exception
+ {
+ return this.remove(category, key, true);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.jboss.ha.framework.interfaces.DistributedState#remove(java.lang.String,
+ * java.io.Serializable, boolean)
+ */
+ @Override
+ public Serializable remove(String category, Serializable key, boolean asynchronousCall) throws Exception
+ {
+ Serializable retVal = this.get(category, key);
+ if (retVal != null)
+ {
+ // if (this.replAsync != asynchronousCall)
+ // {
+ // if (asynchronousCall)
+ // {
+ // this.cache.getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
+ // }
+ // else
+ // {
+ // this.cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
+ // }
+ // }
+ this.cache.remove(this.buildFqn(category), key);
+ }
+ return retVal;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.jboss.ha.framework.interfaces.DistributedState#get(java.lang.String,
+ * java.io.Serializable)
+ */
+ @Override
+ public Serializable get(String category, Serializable key)
+ {
+ try
+ {
+ return this.cache.get(this.buildFqn(category), key);
+ }
+ catch (CacheException ce)
+ {
+ return null;
+ }
+ }
+
+ @Override
+ public Collection<String> getAllCategories()
+ {
+ try
+ {
+ Node base = this.cache.getRoot().getChild(ROOTFQN);
+ Collection<String> keys = (base == null ? null : base.getChildrenNames());
+ if (keys != null && keys.size() > 0)
+ {
+ keys = Collections.unmodifiableCollection(keys);
+ }
+ return keys;
+ }
+ catch (CacheException ce)
+ {
+ return null;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.jboss.ha.framework.interfaces.DistributedState#getAllKeys(java.lang.String)
+ * @return - returns null in case of CacheException
+ */
+ @Override
+ public Collection<Serializable> getAllKeys(String category)
+ {
+ try
+ {
+ Node<Serializable, Serializable> node = this.getNode(category);
+ if (node == null) return null;
+ return node.getKeys();
+ }
+ catch (CacheException ce)
+ {
+ return null;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.jboss.ha.framework.interfaces.DistributedState#getAllValues(java.lang.String)
+ * @return - returns null in case of CacheException
+ */
+ @Override
+ public Collection<Serializable> getAllValues(String category)
+ {
+ try
+ {
+ Node<Serializable, Serializable> categoryNode = this.getNode(category);
+ if (categoryNode == null) return null;
+ Set<Serializable> childNodes = categoryNode.getKeys();
+ if (childNodes == null) return null;
+ Map<Serializable, Serializable> entries = categoryNode.getData();
+ if (entries == null) return null;
+ Collection<Serializable> retVal = new HashSet<Serializable>(entries.values());
+ return Collections.unmodifiableCollection(retVal);
+ }
+ catch (CacheException ce)
+ {
+ return null;
+ }
+ }
+
+ @Override
+ public void registerDSListenerEx(String category, DSListenerEx subscriber)
+ {
+ this.registerListener(category, subscriber);
+ }
+
+ @Override
+ public void unregisterDSListenerEx(String category, DSListenerEx subscriber)
+ {
+ this.unregisterListener(category, subscriber);
+ }
+
+ @Override
+ public void registerDSListener(String category, DSListener subscriber)
+ {
+ this.registerListener(category, subscriber);
+ }
+
+ @Override
+ public void unregisterDSListener(String category, DSListener subscriber)
+ {
+ this.unregisterListener(category, subscriber);
+ }
+
+ // ChannelSource -------------------------------------------------
+
+// public Channel getChannel()
+// {
+// Channel result = null;
+// if (cache != null)
+// {
+// result = cache.getConfiguration().getRuntimeConfig().getChannel();
+// }
+// return result;
+// }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void registerListener(String category, Object subscriber)
+ {
+ synchronized (this.keyListeners)
+ {
+ List<Object> listeners = this.keyListeners.get(category);
+ if (listeners == null)
+ {
+ listeners = new ArrayList<Object>();
+ this.keyListeners.put(category, listeners);
+ }
+ listeners.add(subscriber);
+ }
+ }
+
+ protected void unregisterListener(String category, Object subscriber)
+ {
+ synchronized (this.keyListeners)
+ {
+ List<Object> listeners = this.keyListeners.get(category);
+ if (listeners == null) return;
+
+ listeners.remove(subscriber);
+ if (listeners.size() == 0)
+ {
+ this.keyListeners.remove(category);
+ }
+ }
+ }
+
+ protected void notifyKeyListeners(String category, Serializable key, Serializable value, boolean locallyModified)
+ {
+ synchronized (this.keyListeners)
+ {
+ List<Object> listeners = this.keyListeners.get(category);
+ if (listeners == null) return;
+ String strKey = key.toString();
+
+ for (Object listener: listeners)
+ {
+ if (listener instanceof DSListener)
+ {
+ DSListener dslistener = (DSListener) listener;
+ dslistener.valueHasChanged(category, strKey, value, locallyModified);
+ }
+ else
+ {
+ DSListenerEx dslistener = (DSListenerEx) listener;
+ dslistener.valueHasChanged(category, key, value, locallyModified);
+ }
+ }
+ }
+ }
+
+ protected void notifyKeyListenersOfRemove(String category, Serializable key, Serializable oldContent,
+ boolean locallyModified)
+ {
+ synchronized (this.keyListeners)
+ {
+ List<Object> listeners = this.keyListeners.get(category);
+ if (listeners == null) return;
+ String strKey = key.toString();
+
+ for (Object listener: listeners)
+ {
+ if (listener instanceof DSListener)
+ {
+ DSListener dslistener = (DSListener) listener;
+ dslistener.keyHasBeenRemoved(category, strKey, oldContent, locallyModified);
+ }
+ else
+ {
+ DSListenerEx dslistener = (DSListenerEx) listener;
+ dslistener.keyHasBeenRemoved(category, key, oldContent, locallyModified);
+ }
+ }
+ }
+ }
+
+ protected void cleanupKeyListeners()
+ {
+ // NOT IMPLEMENTED YET
+ }
+
+ /** ExtendedTreeCacheListener methods */
+
+ // Private -------------------------------------------------------
+ protected Fqn buildFqn(String category)
+ {
+ return Fqn.fromRelativeElements(ROOTFQN, category);
+ }
+
+ protected Fqn buildFqn(String category, Serializable key)
+ {
+ return Fqn.fromElements(ROOT, category, key);
+ }
+
+ protected Fqn buildFqn(String category, Serializable key, Serializable value)
+ {
+ return Fqn.fromElements(ROOT, category, key, value);
+ }
+
+ protected Node<Serializable, Serializable> getNode(String category) throws CacheException
+ {
+ return this.cache.getRoot().getChild(this.buildFqn(category));
+ }
+
+ // @CacheListener -------------------------------------------------
+ // The following is at the (raw) Cache level and not reflective of tree operation.
+
+ @CacheEntryModified
+ public void nodeModified(CacheEntryModifiedEvent event)
+ {
+ if (event.isPre()) return;
+
+ Object key = event.getKey();
+ Object value = event.getValue();
+ String category = null;
+ notifyKeyListeners(category, (Serializable)key, (Serializable)value, event.isOriginLocal());
+ }
+
+ @CacheEntryRemoved
+ public void nodeRemoved(CacheEntryRemovedEvent event)
+ {
+ if (event.isPre()) return;
+
+ Object key = event.getKey();
+ Object value = event.getValue();
+ String category = null;
+ notifyKeyListenersOfRemove(category, (Serializable)key, (Serializable)value, event.isOriginLocal());
+ }
+
+
+ // Private ------------------------------------------------------------
+
+ private void internalSetClusteredCache(Cache<Serializable, Serializable> cache)
+ {
+ TreeCacheFactory tcf = new TreeCacheFactory();
+ this.cache = tcf.createTreeCache(cache);
+
+ if (this.cache != null)
+ {
+ Configuration.CacheMode cm = cache.getConfiguration().getCacheMode();
+ if (Configuration.CacheMode.REPL_ASYNC == cm)
+ {
+ this.replAsync = true;
+ }
+ else if (Configuration.CacheMode.REPL_SYNC == cm)
+ {
+ this.replAsync = false;
+ }
+ else
+ {
+ throw new IllegalStateException("Cache must be configured for replication, not " + cm);
+ }
+ }
+ }
+
+}
Property changes on: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
More information about the jboss-cvs-commits
mailing list