[jboss-cvs] JBossCache/src/org/jboss/cache/loader ...

Galder Zamarreno galder.zamarreno at jboss.com
Fri Sep 29 14:27:21 EDT 2006


  User: gzamarreno
  Date: 06/09/29 14:27:21

  Modified:    src/org/jboss/cache/loader      AsyncCacheLoader.java
                        CacheLoaderManager.java
  Added:       src/org/jboss/cache/loader     
                        AbstractDelegatingCacheLoader.java
                        SingletonStoreCacheLoader.java
  Removed:     src/org/jboss/cache/loader      SharedStoreCacheLoader.java
  Log:
  [JBCACHE-650] - SharedStoreCacheLoader refactored to SingletonStoreCacheLoader 
  adding the ability to push the in-memory state to the underlying cache loader when 
  assuming the coordinator role. Created AbstractDelegatingCacheLoader class that 
  contains basic delegating functionality. SingletonStoreCacheLoader and 
  AsyncCacheLoader now extend this class. Necessary configuration options added 
  and documentation has been updated to include explanation. Two singleton cache 
  sample configurations added too.
  
  Revision  Changes    Path
  1.19      +17 -91    JBossCache/src/org/jboss/cache/loader/AsyncCacheLoader.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: AsyncCacheLoader.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/loader/AsyncCacheLoader.java,v
  retrieving revision 1.18
  retrieving revision 1.19
  diff -u -b -r1.18 -r1.19
  --- AsyncCacheLoader.java	22 Sep 2006 16:27:56 -0000	1.18
  +++ AsyncCacheLoader.java	29 Sep 2006 18:27:21 -0000	1.19
  @@ -26,8 +26,9 @@
   import java.util.Set;
   
   /**
  - * The AsyncCacheLoader is a delegating cache loader that passes on all
  - * operations to an underlying CacheLoader.
  + * The AsyncCacheLoader is a delegating cache loader that extends
  + * AbstractDelegatingCacheLoader overriding methods to that should not
  + * just delegate the operation to the underlying cache loader.
    * <p/>
    * Read operations are done synchronously, while write (CRUD - Create, Remove,
    * Update, Delete) operations are done asynchronously.  There is no provision
  @@ -79,7 +80,7 @@
    *
    * @author Manik Surtani (manik.surtani at jboss.com)
    */
  -public class AsyncCacheLoader extends AbstractCacheLoader
  +public class AsyncCacheLoader extends AbstractDelegatingCacheLoader
   {
   
      private static final Log log = LogFactory.getLog(AsyncCacheLoader.class);
  @@ -91,7 +92,6 @@
       */
      public static final int DEFAULT_QUEUE_SIZE = 10000;
   
  -   private CacheLoader delegateTo;
      private AsyncProcessor processor;
      private SynchronizedBoolean stopped = new SynchronizedBoolean(true);
      private BoundedLinkedQueue queue = new BoundedLinkedQueue(DEFAULT_QUEUE_SIZE);
  @@ -105,19 +105,12 @@
   
      public AsyncCacheLoader()
      {
  +      super(null);
      }
   
      public AsyncCacheLoader(CacheLoader cacheLoader)
      {
  -      delegateTo = cacheLoader;
  -   }
  -
  -   /**
  -    * Returns the delegate cache loader.
  -    */
  -   public CacheLoader getCacheLoader()
  -   {
  -      return delegateTo;
  +      super(cacheLoader);
      }
   
      public void setConfig(Properties props)
  @@ -159,25 +152,14 @@
            asyncPut = Boolean.valueOf(s).booleanValue();
         }
   
  -      delegateTo.setConfig(props);
  -   }
  -
  -   public void setCache(CacheSPI c)
  -   {
  -      super.setCache(c);
  -      delegateTo.setCache(c);
  -   }
  -
  -   public Set<String> getChildrenNames(Fqn fqn) throws Exception
  -   {
  -      return delegateTo.getChildrenNames(fqn);
  +      super.setConfig(props);
      }
   
      public Map get(Fqn name) throws Exception
      {
         try
         {
  -         return delegateTo.get(name);
  +         return super.get(name);
         }
         catch (IOException e)
         {
  @@ -187,18 +169,13 @@
         }
      }
   
  -   public boolean exists(Fqn name) throws Exception
  -   {
  -      return delegateTo.exists(name);
  -   }
  -
      Object get(Fqn name, Object key) throws Exception
      {
         if (returnOld)
         {
            try
            {
  -            Map map = delegateTo.get(name);
  +            Map map = super.get(name);
               if (map != null)
               {
                  return map.get(key);
  @@ -224,7 +201,7 @@
         }
         else
         {
  -         return delegateTo.put(name, key, value);
  +         return super.put(name, key, value);
         }
      }
   
  @@ -239,7 +216,7 @@
         }
         else
         {
  -         delegateTo.put(name, attributes); // Let delegate make its own defensive copy
  +         super.put(name, attributes); // Let delegate make its own defensive copy
         }
      }
   
  @@ -255,7 +232,7 @@
         }
         else
         {
  -         delegateTo.put(modifications);
  +         super.put(modifications);
         }
      }
   
  @@ -279,57 +256,11 @@
         enqueue(mod);
      }
   
  -   public void prepare(Object tx, List modifications, boolean one_phase)
  -           throws Exception
  -   {
  -      delegateTo.prepare(tx, modifications, one_phase);
  -   }
  -
  -   public void commit(Object tx) throws Exception
  -   {
  -      delegateTo.commit(tx);
  -   }
  -
  -   public void rollback(Object tx)
  -   {
  -      delegateTo.rollback(tx);
  -   }
  -
  -   public void setRegionManager(RegionManager manager)
  -   {
  -      delegateTo.setRegionManager(manager);
  -   }
  -
  -   public void loadEntireState(ObjectOutputStream os) throws Exception
  -   {
  -      delegateTo.loadEntireState(os);
  -   }
  -
  -   public void loadState(Fqn subtree, ObjectOutputStream os) throws Exception
  -   {
  -      delegateTo.loadState(subtree, os);
  -   }
  -
  -   public void storeEntireState(ObjectInputStream is) throws Exception
  -   {
  -      delegateTo.storeEntireState(is);
  -   }
  -
  -   public void storeState(Fqn subtree, ObjectInputStream is) throws Exception
  -   {
  -      delegateTo.storeState(subtree, is);
  -   }
  -
  -   public void create() throws Exception
  -   {
  -      delegateTo.create();
  -   }
  -
      public void start() throws Exception
      {
         if (log.isInfoEnabled()) log.info("Async cache loader starting: " + this);
         stopped.set(false);
  -      delegateTo.start();
  +      super.start();
         processor = new AsyncProcessor();
         processor.start();
      }
  @@ -341,12 +272,7 @@
         {
            processor.stop();
         }
  -      delegateTo.stop();
  -   }
  -
  -   public void destroy()
  -   {
  -      delegateTo.destroy();
  +      super.stop();
      }
   
      private void enqueue(Modification mod)
  @@ -465,11 +391,11 @@
            }
         }
   
  -      private void put(List mods)
  +      private void put(List<Modification> mods)
         {
            try
            {
  -            delegateTo.put(mods);
  +            AsyncCacheLoader.super.put(mods);
            }
            catch (Exception e)
            {
  @@ -488,7 +414,7 @@
      public String toString()
      {
         return super.toString() +
  -             " delegate=[" + delegateTo + "]" +
  +             " delegate=[" + super.getCacheLoader() + "]" +
                " processor=" + processor +
                " stopped=" + stopped +
                " batchSize=" + batchSize +
  
  
  
  1.25      +21 -0     JBossCache/src/org/jboss/cache/loader/CacheLoaderManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: CacheLoaderManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/loader/CacheLoaderManager.java,v
  retrieving revision 1.24
  retrieving revision 1.25
  diff -u -b -r1.24 -r1.25
  --- CacheLoaderManager.java	5 Sep 2006 13:26:00 -0000	1.24
  +++ CacheLoaderManager.java	29 Sep 2006 18:27:21 -0000	1.25
  @@ -10,6 +10,7 @@
   import org.apache.commons.logging.LogFactory;
   import org.jboss.cache.CacheSPI;
   import org.jboss.cache.Fqn;
  +import org.jboss.cache.CacheListener;
   import org.jboss.cache.config.CacheLoaderConfig;
   import org.jboss.cache.xml.XmlHelper;
   import org.w3c.dom.Element;
  @@ -90,6 +91,8 @@
               clc.setPurgeOnStartup(XmlHelper.readBooleanContents(element, "purgeOnStartup", false));
               clc.setClassName(XmlHelper.readStringContents(element, "class"));
               clc.setProperties(XmlHelper.readPropertiesContents(element, "properties"));
  +            clc.setSingletonStore(XmlHelper.readBooleanContents(element, "singletonStore", false));
  +            clc.setPushStateWhenCoordinator(XmlHelper.readBooleanAttribute(element, "singletonStore", "pushStateWhenCoordinator", false));
               config.addIndividualCacheLoaderConfig(clc);
            }
         }
  @@ -146,6 +149,11 @@
               {
                  throw new Exception("Invalid cache loader configuration!!  Only ONE cache loader may have fetchPersistentState set to true.  Cache will not start!");
               }
  +            if (cfg.isSingletonStore() && config.isShared())
  +            {
  +               throw new Exception("Invalid cache loader configuration!!  If a cache loader is configured as a singleton, the cache loader cannot be shared in a cluster!");
  +            }
  +
               CacheLoader l = createCacheLoader(cfg, cache);
               // Only loaders that deal w/ state transfer factor into
               // whether the overall chain supports ExtendedCacheLoader
  @@ -185,6 +193,14 @@
               tmpLoader = asyncDecorator;
            }
   
  +          // singleton?
  +         if (cfg.isSingletonStore())
  +         {
  +            SingletonStoreCacheLoader singletonDecorator = new SingletonStoreCacheLoader(tmpLoader, cfg.isPushStateWhenCoordinator());
  +            addCacheListener(cache, singletonDecorator.getCacheListener());
  +            tmpLoader = singletonDecorator;
  +         }
  +
            // load props
            tmpLoader.setConfig(cfg.getProperties());
   
  @@ -357,4 +373,9 @@
            }
         }
      }
  +
  +   protected void addCacheListener(CacheSPI cache, CacheListener listener)
  +   {
  +      cache.addCacheListener(listener);
  +   }
   }
  
  
  
  1.1      date: 2006/09/29 18:27:21;  author: gzamarreno;  state: Exp;JBossCache/src/org/jboss/cache/loader/AbstractDelegatingCacheLoader.java
  
  Index: AbstractDelegatingCacheLoader.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   * 
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jboss.cache.loader;
  
  import org.jboss.cache.CacheSPI;
  import org.jboss.cache.Fqn;
  import org.jboss.cache.Modification;
  import org.jboss.cache.marshall.RegionManager;
  
  import java.util.Properties;
  import java.util.Set;
  import java.util.Map;
  import java.util.List;
  import java.io.ObjectOutputStream;
  import java.io.ObjectInputStream;
  
  /**
   * AbstractDelegatingCacheLoader provides standard functionality for a cache loader that simply delegates each
   * operation defined in the cache loader interface to the underlying cache loader, basically acting as a proxy to the
   * real cache loader. 
   *
   * Any cache loader implementation that extends this class would be required to override any of the methods in
   * order to provide a different or added behaviour.
   *
   * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
   */
  public abstract class AbstractDelegatingCacheLoader extends AbstractCacheLoader
  {
     private CacheLoader cacheLoader;
  
     public AbstractDelegatingCacheLoader(CacheLoader cl)
     {
        cacheLoader = cl;
     }
  
     public CacheLoader getCacheLoader()
     {
        return cacheLoader;
     }
  
     public void setCacheLoader(CacheLoader cacheLoader)
     {
        this.cacheLoader = cacheLoader;
     }
  
     public void setConfig(Properties properties)
     {
        cacheLoader.setConfig(properties);
     }
  
     public void setCache(CacheSPI c)
     {
        super.setCache(c);
        cacheLoader.setCache(c);
     }
  
     public Set getChildrenNames(Fqn fqn) throws Exception
     {
        return cacheLoader.getChildrenNames(fqn);
     }
  
     public Map get(Fqn name) throws Exception
     {
        return cacheLoader.get(name);
     }
  
     public boolean exists(Fqn name) throws Exception
     {
        return cacheLoader.exists(name);
     }
  
     public Object put(Fqn name, Object key, Object value) throws Exception
     {
        return cacheLoader.put(name, key, value);
     }
  
     public void put(Fqn name, Map attributes) throws Exception
     {
        cacheLoader.put(name, attributes);
     }
  
     public void put(List<Modification> modifications) throws Exception
     {
        cacheLoader.put(modifications);
     }
  
     public Object remove(Fqn fqn, Object key) throws Exception
     {
        return cacheLoader.remove(fqn, key);
     }
  
     public void remove(Fqn fqn) throws Exception
     {
        cacheLoader.remove(fqn);
     }
  
     public void removeData(Fqn fqn) throws Exception
     {
       cacheLoader.removeData(fqn);
     }
  
     public void prepare(Object tx, List<Modification> modifications, boolean one_phase) throws Exception
     {
        cacheLoader.prepare(tx, modifications, one_phase);
     }
  
     public void commit(Object tx) throws Exception
     {
        cacheLoader.commit(tx);
     }
  
     public void rollback(Object tx)
     {
        cacheLoader.rollback(tx);
     }
  
     public void loadEntireState(ObjectOutputStream os) throws Exception
     {
        cacheLoader.loadEntireState(os);
     }
  
     public void storeEntireState(ObjectInputStream is) throws Exception
     {
        cacheLoader.storeEntireState(is);
     }
  
     public void loadState(Fqn subtree, ObjectOutputStream os) throws Exception
     {
        cacheLoader.loadState(subtree, os);
     }
  
     public void storeState(Fqn subtree, ObjectInputStream is) throws Exception
     {
        cacheLoader.storeState(subtree, is);
     }
  
     public void setRegionManager(RegionManager manager)
     {
        cacheLoader.setRegionManager(manager);
     }
  
     public void create() throws Exception
     {
        cacheLoader.create();
     }
  
     public void start() throws Exception
     {
        cacheLoader.start();
     }
  
     public void stop()
     {
        cacheLoader.stop();
     }
  
     public void destroy()
     {
        cacheLoader.destroy();
     }
  }
  
  
  
  1.1      date: 2006/09/29 18:27:21;  author: gzamarreno;  state: Exp;JBossCache/src/org/jboss/cache/loader/SingletonStoreCacheLoader.java
  
  Index: SingletonStoreCacheLoader.java
  ===================================================================
  package org.jboss.cache.loader;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.jboss.cache.AbstractCacheListener;
  import org.jboss.cache.CacheListener;
  import org.jboss.cache.CacheSPI;
  import org.jboss.cache.Fqn;
  import org.jboss.cache.Modification;
  import org.jboss.cache.Node;
  import org.jgroups.Address;
  import org.jgroups.View;
  
  import java.io.ObjectInputStream;
  import java.util.Collection;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
  import java.util.Vector;
  
  /**
   * SingletonStoreCacheLoader is a delegating cache loader used for situations when only one node should interact with
   * the underlying store. The coordinator of the cluster will be responsible for the underlying CacheLoader.
   * SingletonStoreCacheLoader is a simply facade to a real CacheLoader implementation. It always delegates reads to the
   * real CacheLoader.
   *
   * Writes are forwarded only if this SingletonStoreCacheLoader is currently the cordinator. This avoid having all
   * CacheLoaders in a cluster writing the same data to the same underlying store. Although not incorrect (e.g. a DB
   * will just discard additional INSERTs for the same key, and throw an exception), this will avoid a lot of
   * redundant work.<br/>
   *
   * Whenever the current coordinator dies (or leaves), the second in line will take over. That SingletonStoreCacheLoader
   * will then pass writes through to its underlying CacheLoader. Optionally, when a new coordinator takes over the
   * Singleton, it can push the in-memory state to the cache cacheLoader.
   *
   * @author Bela Ban
   * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
   */
  public class SingletonStoreCacheLoader extends AbstractDelegatingCacheLoader
  {
     private static final Log log = LogFactory.getLog(SingletonStoreCacheLoader.class);
     private Address localAddress;
     private boolean active; // only active if coordinator
     private boolean pushStateWhenCoordinator;
     private Thread pushStateThread;
     private CacheListener cacheListener;
  
     public SingletonStoreCacheLoader(CacheLoader cacheLoader, boolean pushConfiguration)
     {
        super(cacheLoader);
        pushStateWhenCoordinator = pushConfiguration;
        cacheListener = new SingletonStoreListener();
     }
  
     public CacheListener getCacheListener()
     {
        return cacheListener;
     }
  
     protected void activeStatusChanged(boolean newActiveState)
     {
        active = newActiveState;
        log.debug("changed mode: " + this);
        if (active && pushStateWhenCoordinator)
        {
           if (pushStateThread == null || !pushStateThread.isAlive())
           {
              pushStateThread = createPushStateThread();
              pushStateThread.setName("InMemoryToCacheLoaderPusher");
              pushStateThread.start();
           }
           else
           {
              try
              {
                 log.debug("joining currently running state push thread");
                 pushStateThread.join();
              } catch (InterruptedException e)
              {
                 log.error("joining existing push state thread was interrupted", e);
              }
           }
        }
     }
  
     protected Thread createPushStateThread()
     {
        return new Thread(new Runnable()
        {
           public void run()
           {
              log.debug("start pushing in-memory state to cache cacheLoader");
              try
              {
                 pushState(cache.getRoot());
                 log.debug("in-memory state passed to cache cacheLoader successfully");
              } catch (Exception e)
              {
                 log.error("unable to finish pushing the state", e);
              }
           }
        });
     }
  
     private boolean isCoordinator(View newView)
     {
        if (newView != null && localAddress != null)
        {
           Vector mbrs = newView.getMembers();
           if (mbrs != null)
           {
              if (mbrs.size() > 0 && localAddress.equals(mbrs.firstElement()))
              {
                 /* This node is the coordinator */
                 return true;
              }
           }
  
           return false;
        }
  
        /* Invalid new view, so previous value returned */
        return active;
     }
  
     private void pushState(Node node) throws Exception
     {
        /* Put the node's data first */
        Set keys = node.getKeys();
        Fqn fqn = node.getFqn();
  
        for (Object aKey : keys)
        {
           Object value = cache.get(fqn, aKey);
           put(fqn, aKey, value);
        }
  
        /* Navigates to the children */
        Collection<Node> children = node.getChildren();
        for (Node aChildren : children)
        {
           //Map.Entry entry = (Map.Entry) aChildren;
           pushState(aChildren);
        }
     }
  
     public Object put(Fqn name, Object key, Object value) throws Exception
     {
        if (active)
        {
           return super.put(name, key, value);
        }
  
        return null;
     }
  
     public void put(Fqn name, Map attributes) throws Exception
     {
        if (active)
        {
           super.put(name, attributes);
        }
     }
  
     public void put(List<Modification> modifications) throws Exception
     {
        if (active)
        {
           super.put(modifications);
        }
     }
  
     public Object remove(Fqn fqn, Object key) throws Exception
     {
        if (active)
        {
           return super.remove(fqn, key);
        }
  
        return null;
     }
  
     public void remove(Fqn fqn) throws Exception
     {
        if (active)
        {
           super.remove(fqn);
        }
     }
  
     public void removeData(Fqn fqn) throws Exception
     {
        if (active)
        {
           super.removeData(fqn);
        }
     }
  
     public void prepare(Object tx, List<Modification> modifications, boolean one_phase) throws Exception
     {
        if (active)
        {
           super.prepare(tx, modifications, one_phase);
        }
     }
  
     public void commit(Object tx) throws Exception
     {
        if (active)
        {
           super.commit(tx);
        }
     }
  
     public void rollback(Object tx)
     {
        if (active)
        {
           super.rollback(tx);
        }
     }
  
     public void storeEntireState(ObjectInputStream is) throws Exception
     {
        if (active)
        {
           super.storeEntireState(is);
        }
     }
  
     public void storeState(Fqn subtree, ObjectInputStream is) throws Exception
     {
        if (active)
        {
           super.storeState(subtree, is);
        }
     }
  
     public Thread getPushStateThread()
     {
        return pushStateThread;
     }
  
     public String toString()
     {
        return "loc_addr=" + localAddress + ", active=" + active;
     }
  
     /**
      * Cache listener that reacts to cluster topology changes to find out whether a new coordinator is elected.
      * SingletonStoreCacheLoader reacts to these changes in order to decide which node should interact with the
      * underlying cache store. 
      */
     private class SingletonStoreListener extends AbstractCacheListener
     {
        public void cacheStarted(CacheSPI cache)
        {
           localAddress = (Address) cache.getLocalAddress();
           active = cache.getRPCManager().isCoordinator();
           log.debug("cache started: " + this);
        }
  
        public void cacheStopped(CacheSPI cache)
        {
           log.debug("cache stopped: " + this);
        }
  
        public void viewChange(View newView)
        {
           boolean tmp = isCoordinator(newView);
  
           if (active != tmp)
           {
              activeStatusChanged(tmp);
           }
        }
     }
  }
  
  



More information about the jboss-cvs-commits mailing list