[jboss-cvs] JBossAS SVN: r107007 - projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 21 16:26:36 EDT 2010


Author: smarlow at redhat.com
Date: 2010-07-21 16:26:36 -0400 (Wed, 21 Jul 2010)
New Revision: 107007

Added:
   projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java
Log:
JBCLUSTER-255 DistributedState.  listener needs to be implemented still...

Added: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java	                        (rev 0)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java	2010-07-21 20:26:36 UTC (rev 107007)
@@ -0,0 +1,531 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2010, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.ha.framework.server;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
+import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
+import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
+import org.infinispan.tree.TreeCache;
+import org.infinispan.CacheException;
+
+import org.infinispan.notifications.Listener;
+import org.infinispan.tree.Fqn;
+import org.infinispan.tree.Node;
+import org.infinispan.tree.TreeCacheFactory;
+import org.jboss.ha.framework.server.spi.ManagedDistributedState;
+import org.jboss.logging.Logger;
+
+/**
+ *   This class manages distributed state across the cluster.
+ *
+ * @author  <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
+ * @author  <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
+ * @author  Scott.Stark at jboss.org
+ * @author  Scott Marlow
+ * @version $Revision:77673 $
+ */
+ at Listener
+public class DistributedStateImpl 
+   implements ManagedDistributedState
+{
+   public static final Serializable ROOT = "__DISTRIBUTED_STATE__";
+   public static final Fqn  ROOTFQN = Fqn.fromElements(ROOT);
+   private static final Logger LOG = Logger.getLogger(DistributedStateImpl.class);
+
+   private final Map<String, List<Object>> keyListeners = new HashMap<String, List<Object>>();
+
+   private volatile TreeCache<Serializable, Serializable> cache;
+   private volatile boolean replAsync;
+   private volatile HAPartitionCacheHandlerImpl cacheHandler;
+
+   public HAPartitionCacheHandlerImpl getCacheHandler()
+   {
+      return cacheHandler;
+   }
+
+   public void setCacheHandler(HAPartitionCacheHandlerImpl cacheHandler)
+   {
+      this.cacheHandler = cacheHandler;
+   }
+
+   // Public --------------------------------------------------------
+
+   public void createService() throws Exception
+   {
+   }
+
+   public void startService() throws Exception
+   {
+      this.internalSetClusteredCache(cacheHandler.getCache());
+      this.cache.getCache().addListener(this);
+   }
+
+   public void stopService() throws Exception
+   {      
+      this.cache.getCache().removeListener(this);
+   }
+
+   public void destroyService() throws Exception
+   {
+   }
+
+   public String listContent() throws Exception
+   {
+      StringBuilder result = new StringBuilder();
+      Collection<String> cats = this.getAllCategories();
+      if (cats == null) return result.toString();
+
+      Iterator<String> catsIter = cats.iterator();
+      while (catsIter.hasNext())
+      {
+         String category = (String) catsIter.next();
+         Iterator<Serializable> keysIter = this.getAllKeys(category).iterator();
+
+         result.append("-----------------------------------------------\n");
+         result.append("Logger : ").append(category).append("\n\n");
+         result.append("KEY\t:\tVALUE\n");
+
+         while (keysIter.hasNext())
+         {
+            Serializable key = (Serializable) keysIter.next();
+            String value = this.get(category, key).toString();
+            result.append("'").append(key);
+            result.append("'\t:\t'");
+            result.append(value);
+            result.append("'\n");
+         }
+         result.append("\n");
+      }
+      return result.toString();
+   }
+
+   public String listXmlContent() throws Exception
+   {
+      StringBuilder result = new StringBuilder();
+      result.append("<DistributedState>\n");
+      Collection<String> cats = this.getAllCategories();
+      if (cats != null)
+      {
+         Iterator<String> catsIter = cats.iterator();
+         while (catsIter.hasNext())
+         {
+            String category = (String) catsIter.next();
+            Iterator<Serializable> keysIter = this.getAllKeys(category).iterator();
+
+            result.append("\t<Logger>\n");
+            result.append("\t\t<LoggerName>").append(category).append("</LoggerName>\n");
+
+            while (keysIter.hasNext())
+            {
+               Serializable key = (Serializable) keysIter.next();
+               String value = this.get(category, key).toString();
+               result.append("\t\t<Entry>\n");
+               result.append("\t\t\t<Key>").append(key).append("</Key>\n");
+               result.append("\t\t\t<Value>").append(value).append("</Value>\n");
+               result.append("\t\t</Entry>\n");
+            }
+            result.append("\t</Logger>\n");
+         }
+      }
+
+      result.append("</DistributedState>\n");
+
+      return result.toString();
+   }
+
+   // DistributedState implementation ----------------------------------------------
+   /*
+   * (non-Javadoc)
+   *
+   * @see org.jboss.ha.framework.interfaces.DistributedState#set(java.lang.String,
+   *      java.io.Serializable, java.io.Serializable)
+   */
+   @Override
+   public void set(String category, Serializable key, Serializable value) throws Exception
+   {
+      this.set(category, key, value, true);
+   }
+
+   /*
+    * (non-Javadoc)
+    *
+    * @see org.jboss.ha.framework.interfaces.DistributedState#set(java.lang.String,
+    *      java.io.Serializable, java.io.Serializable, boolean) @param
+    *      asynchronousCall is not supported yet. TreeCache cannot switch this
+    *      on the fly. Will take value from TreeCache-config instead.
+    */
+   @Override
+   public void set(String category, Serializable key, Serializable value, boolean asynchronousCall) throws Exception
+   {
+      //if (this.replAsync != asynchronousCall)
+      //{
+      //   if (asynchronousCall)
+      //   {
+      //      this.cache.put(this.buildFqn(category), key, value, Flag.FORCE_ASYNCHRONOUS);
+      //   }
+      //   else
+      //   {
+      //      this.cache.put(this.buildFqn(category), key, value, Flag.FORCE_SYNCHRONOUS);
+      //   }
+      //}
+
+      this.cache.put(this.buildFqn(category), key, value);
+   }
+
+   /*
+     * (non-Javadoc)
+     *
+     * @see org.jboss.ha.framework.interfaces.DistributedState#remove(java.lang.String,
+     *      java.io.Serializable) @return - returns null in case of
+     *      CacheException
+     */
+   @Override
+   public Serializable remove(String category, Serializable key) throws Exception
+   {
+      return this.remove(category, key, true);
+   }
+
+   /*
+    * (non-Javadoc)
+    *
+    * @see org.jboss.ha.framework.interfaces.DistributedState#remove(java.lang.String,
+    *      java.io.Serializable, boolean)
+    */
+   @Override
+   public Serializable remove(String category, Serializable key, boolean asynchronousCall) throws Exception
+   {
+      Serializable retVal = this.get(category, key);
+      if (retVal != null)
+      {
+      //   if (this.replAsync != asynchronousCall)
+      //   {
+      //      if (asynchronousCall)
+      //      {
+      //         this.cache.getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
+      //      }
+      //      else
+      //      {
+      //         this.cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
+      //      }
+      //   }
+         this.cache.remove(this.buildFqn(category), key);
+      }
+      return retVal;
+   }
+
+   /*
+     * (non-Javadoc)
+     *
+     * @see org.jboss.ha.framework.interfaces.DistributedState#get(java.lang.String,
+     *      java.io.Serializable)
+     */
+   @Override
+   public Serializable get(String category, Serializable key)
+   {
+      try
+      {
+         return this.cache.get(this.buildFqn(category), key);
+      }
+      catch (CacheException ce)
+      {
+         return null;
+      }
+   }
+
+   @Override
+   public Collection<String> getAllCategories()
+   {
+      try
+      {
+         Node base = this.cache.getRoot().getChild(ROOTFQN);
+         Collection<String> keys = (base == null ? null : base.getChildrenNames());
+         if (keys != null && keys.size() > 0)
+         {
+            keys = Collections.unmodifiableCollection(keys);
+         }
+         return keys;
+      }
+      catch (CacheException ce)
+      {
+         return null;
+      }
+   }
+
+   /*
+     * (non-Javadoc)
+     *
+     * @see org.jboss.ha.framework.interfaces.DistributedState#getAllKeys(java.lang.String)
+     *      @return - returns null in case of CacheException
+     */
+   @Override
+   public Collection<Serializable> getAllKeys(String category)
+   {
+      try
+      {
+         Node<Serializable, Serializable> node = this.getNode(category);
+         if (node == null) return null;
+         return node.getKeys();
+      }
+      catch (CacheException ce)
+      {
+         return null;
+      }
+   }
+
+   /*
+     * (non-Javadoc)
+     *
+     * @see org.jboss.ha.framework.interfaces.DistributedState#getAllValues(java.lang.String)
+     *      @return - returns null in case of CacheException
+     */
+   @Override
+   public Collection<Serializable> getAllValues(String category)
+   {
+      try
+      {
+         Node<Serializable, Serializable> categoryNode = this.getNode(category);
+         if (categoryNode == null) return null;
+         Set<Serializable> childNodes = categoryNode.getKeys();
+         if (childNodes == null) return null;
+         Map<Serializable, Serializable> entries = categoryNode.getData();
+         if (entries == null) return null;
+         Collection<Serializable> retVal = new HashSet<Serializable>(entries.values());
+         return Collections.unmodifiableCollection(retVal);
+      }
+      catch (CacheException ce)
+      {
+         return null;
+      }
+   }
+
+   @Override
+   public void registerDSListenerEx(String category, DSListenerEx subscriber)
+   {
+      this.registerListener(category, subscriber);
+   }
+
+   @Override
+   public void unregisterDSListenerEx(String category, DSListenerEx subscriber)
+   {
+      this.unregisterListener(category, subscriber);
+   }
+
+   @Override
+   public void registerDSListener(String category, DSListener subscriber)
+   {
+      this.registerListener(category, subscriber);
+   }
+
+   @Override
+   public void unregisterDSListener(String category, DSListener subscriber)
+   {
+      this.unregisterListener(category, subscriber);
+   }
+
+   // ChannelSource -------------------------------------------------
+
+//   public Channel getChannel()
+//   {
+//      Channel result = null;
+//      if (cache != null)
+//      {
+//         result = cache.getConfiguration().getRuntimeConfig().getChannel();
+//      }
+//      return result;
+//   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void registerListener(String category, Object subscriber)
+   {
+      synchronized (this.keyListeners)
+      {
+         List<Object> listeners = this.keyListeners.get(category);
+         if (listeners == null)
+         {
+            listeners = new ArrayList<Object>();
+            this.keyListeners.put(category, listeners);
+         }
+         listeners.add(subscriber);
+      }
+   }
+
+   protected void unregisterListener(String category, Object subscriber)
+   {
+      synchronized (this.keyListeners)
+      {
+         List<Object> listeners = this.keyListeners.get(category);
+         if (listeners == null) return;
+
+         listeners.remove(subscriber);
+         if (listeners.size() == 0)
+         {
+            this.keyListeners.remove(category);
+         }
+      }
+   }
+
+   protected void notifyKeyListeners(String category, Serializable key, Serializable value, boolean locallyModified)
+   {
+      synchronized (this.keyListeners)
+      {
+         List<Object> listeners = this.keyListeners.get(category);
+         if (listeners == null) return;
+         String strKey = key.toString();
+
+         for (Object listener: listeners)
+         {
+            if (listener instanceof DSListener)
+            {
+               DSListener dslistener = (DSListener) listener;
+               dslistener.valueHasChanged(category, strKey, value, locallyModified);
+            }
+            else
+            {
+               DSListenerEx dslistener = (DSListenerEx) listener;
+               dslistener.valueHasChanged(category, key, value, locallyModified);
+            }
+         }
+      }
+   }
+
+   protected void notifyKeyListenersOfRemove(String category, Serializable key, Serializable oldContent,
+         boolean locallyModified)
+   {
+      synchronized (this.keyListeners)
+      {
+         List<Object> listeners = this.keyListeners.get(category);
+         if (listeners == null) return;
+         String strKey = key.toString();
+
+         for (Object listener: listeners)
+         {
+            if (listener instanceof DSListener)
+            {
+               DSListener dslistener = (DSListener) listener;
+               dslistener.keyHasBeenRemoved(category, strKey, oldContent, locallyModified);
+            }
+            else
+            {
+               DSListenerEx dslistener = (DSListenerEx) listener;
+               dslistener.keyHasBeenRemoved(category, key, oldContent, locallyModified);
+            }
+         }
+      }
+   }
+
+   protected void cleanupKeyListeners()
+   {
+      // NOT IMPLEMENTED YET
+   }
+
+   /** ExtendedTreeCacheListener methods */
+
+   // Private -------------------------------------------------------
+   protected Fqn buildFqn(String category)
+   {
+      return Fqn.fromRelativeElements(ROOTFQN, category);
+   }
+
+   protected Fqn buildFqn(String category, Serializable key)
+   {
+      return Fqn.fromElements(ROOT, category, key);
+   }
+
+   protected Fqn buildFqn(String category, Serializable key, Serializable value)
+   {
+      return Fqn.fromElements(ROOT, category, key, value);
+   }
+
+   protected Node<Serializable, Serializable> getNode(String category) throws CacheException
+   {
+      return this.cache.getRoot().getChild(this.buildFqn(category));
+   }
+
+   // @CacheListener  -------------------------------------------------
+   // The following is at the (raw) Cache level and not reflective of tree operation.
+
+   @CacheEntryModified
+   public void nodeModified(CacheEntryModifiedEvent event)
+   {
+      if (event.isPre()) return;
+
+      Object key = event.getKey();
+      Object value = event.getValue();
+      String category = null;
+      notifyKeyListeners(category, (Serializable)key, (Serializable)value, event.isOriginLocal());
+   }
+
+   @CacheEntryRemoved
+   public void nodeRemoved(CacheEntryRemovedEvent event)
+   {
+      if (event.isPre()) return;
+
+      Object key = event.getKey();
+      Object value = event.getValue();
+      String category = null;
+      notifyKeyListenersOfRemove(category, (Serializable)key, (Serializable)value, event.isOriginLocal());
+   }
+
+   
+   // Private ------------------------------------------------------------
+   
+   private void internalSetClusteredCache(Cache<Serializable, Serializable> cache)
+   {
+      TreeCacheFactory tcf = new TreeCacheFactory();
+      this.cache = tcf.createTreeCache(cache);
+
+      if (this.cache != null)
+      {        
+        Configuration.CacheMode cm = cache.getConfiguration().getCacheMode();
+        if (Configuration.CacheMode.REPL_ASYNC == cm)
+        {
+           this.replAsync = true;
+        }
+        else if (Configuration.CacheMode.REPL_SYNC == cm)
+        {
+           this.replAsync = false;
+        }
+        else
+        {
+           throw new IllegalStateException("Cache must be configured for replication, not " + cm);
+        }
+      }
+   }
+
+}


Property changes on: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF



More information about the jboss-cvs-commits mailing list