[jboss-cvs] JBossAS SVN: r65923 - projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Oct 8 13:27:20 EDT 2007


Author: bstansberry at jboss.com
Date: 2007-10-08 13:27:20 -0400 (Mon, 08 Oct 2007)
New Revision: 65923

Added:
   projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/CacheableTimestamp.java
   projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/GroupedPassivatingCacheImpl2.java
   projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupContainer.java
   projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupImpl.java
   projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimpleIntegratedObjectStore.java
   projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimplePassivatingCache2.java
Log:
First draft of clustering support

Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/CacheableTimestamp.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/CacheableTimestamp.java	                        (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/CacheableTimestamp.java	2007-10-08 17:27:20 UTC (rev 65923)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ejb3.cache.impl;
+
+import org.jboss.ejb3.cache.Cacheable;
+import org.jboss.ejb3.cache.Identifiable;
+import org.jboss.ejb3.cache.IntegratedObjectStore;
+
+/**
+ * Encapsulation of the {@link Identifiable#getId() id} and
+ * {@link Cacheable#getLastUsed() last used timestamp} of
+ * a cached {@link Cacheable}.
+ * <p>
+ * Implements <code>Comparable</code> to make it easy to sort
+ * for LRU comparisons.
+ * </p>
+ * 
+ * @see IntegratedObjectStore#getInMemoryEntries()
+ * @see IntegratedObjectStore#getPassivatedEntries()
+ * 
+ * @author Brian Stansberry
+ * @version $Revision$
+ */
+public class CacheableTimestamp 
+   implements Identifiable, Comparable<CacheableTimestamp>
+{
+   private Object id;
+   private long lastUsed;
+   
+   public CacheableTimestamp(Object id, long lastUsed)
+   {
+      assert id != null : "id cannot be null";
+      assert lastUsed > 0 : "lastUsed must be positive";
+      
+      this.id = id;
+      this.lastUsed = lastUsed;
+   }
+   
+   public Object getId()
+   {
+      return id;
+   }
+   
+   public long getLastUsed()
+   {
+      return lastUsed;
+   }
+
+   /**
+    * Compares based on {@link #getLastUsed() last used}, returning
+    * -1 for earlier timestamps.
+    */
+   public int compareTo(CacheableTimestamp o)
+   {
+      if (this.lastUsed < o.lastUsed)
+         return -1;
+      else if (this.lastUsed > o.lastUsed)
+         return 1;
+      return 0;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      
+      if (obj instanceof CacheableTimestamp)
+      {
+         return this.id.equals(((CacheableTimestamp) obj).id);
+      }
+      return false;
+   }
+
+   @Override
+   public int hashCode()
+   {
+      return id.hashCode();
+   }   
+}

Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/GroupedPassivatingCacheImpl2.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/GroupedPassivatingCacheImpl2.java	                        (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/GroupedPassivatingCacheImpl2.java	2007-10-08 17:27:20 UTC (rev 65923)
@@ -0,0 +1,456 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ejb3.cache.impl;
+
+import java.io.Serializable;
+
+import javax.ejb.NoSuchEJBException;
+
+import org.jboss.ejb3.cache.Cacheable;
+import org.jboss.ejb3.cache.IntegratedObjectStore;
+import org.jboss.ejb3.cache.PassivatingCache;
+import org.jboss.ejb3.cache.PassivationManager;
+import org.jboss.ejb3.cache.StatefulObjectFactory;
+import org.jboss.ejb3.cache.grouped.GroupedPassivatingCache;
+import org.jboss.ejb3.cache.grouped.SerializationGroup;
+import org.jboss.ejb3.cache.grouped.SerializationGroupMember;
+import org.jboss.logging.Logger;
+
+/**
+ * {@link GroupedPassivatingCache} that uses an {@link IntegratedObjectStore}
+ * to manage data.
+ *
+ * @author Brian Stansberry
+ * @version $Revision$
+ */
+public class GroupedPassivatingCacheImpl2<T extends Cacheable & Serializable> implements GroupedPassivatingCache<T>
+{
+   private static final Logger log = Logger.getLogger(GroupedPassivatingCacheImpl2.class);
+   
+   /**
+    * Cache that's managing the PassivationGroup
+    */
+   private PassivatingCache<SerializationGroup> groupCache;
+   
+   /**
+    *  Delegate that handles the usual details; ends up calling into
+    *  our EntryContainer for StatefulObjectFactory, PassivationManager
+    *  and IntegratedObjectStore functions.
+    */
+   private SimplePassivatingCache2<Entry<T>> delegate;
+   
+   /** 
+    * Do we support clustering? This field is really just a minor
+    * optimization to avoid calling through to the underlying 
+    * IntegratedObjectStore all the time.
+    */
+   private boolean clustered;
+   
+   public class Entry<C extends Cacheable & Serializable> implements Cacheable, SerializationGroupMember, Serializable
+   {
+      private static final long serialVersionUID = 1L;
+      
+      Object id;
+      /**
+       * The underlying object (e.g. bean context).
+       * Preferably, this field would be transient. It isn't now because it is 
+       * possible this entry will never be assigned to a PassivationGroup,
+       * in which case we need to serialize obj.
+       * TODO Relying on nulling this field is fragile. Can we make this
+       * field transient by ensuring we only use this cache class with bean 
+       * classes that are sure to be part of a group?
+       */
+      C obj;
+      /** The group. Never serialize the group; only the groupCache does that */
+      transient SerializationGroup group;
+      Object groupId;
+      long lastUsed;
+      
+      Entry(C obj)
+      {
+         assert obj != null : "obj is null";
+         
+         this.obj = obj;
+         this.id = obj.getId();
+      }
+      
+      public Object getId()
+      {
+         return id;
+      }
+      
+      public boolean isClustered()
+      {
+         // Value from the containing cache
+         return clustered;
+      }
+      
+      @SuppressWarnings("unchecked")
+      public C getSerializableObject()
+      {
+         return obj;
+      }
+      
+      // Called by PassivationGroup prior to its passivating
+      public void prePassivate()
+      {
+         // make sure we don't passivate the group twice
+         group = null;
+         // null out obj so when delegate passivates this entry
+         // we don't serialize it. It serializes with the PassivationGroup only
+         obj = null;
+         
+         delegate.passivate(this.id);
+      }
+      
+      // Called by PassivationGroup prior to its replicating
+      public void preReplicate()
+      {
+         // make sure we don't replicate the group twice
+         group = null;
+         // null out obj so when delegate passivates this entry
+         // we don't serialize it. It serializes with the PassivationGroup only
+         obj = null;
+         
+         delegate.replicate(this.id);
+      }
+      
+      public long getLastUsed()
+      {
+         return obj == null ? lastUsed : obj.getLastUsed();
+      }
+
+      public boolean isInUse()
+      {
+         return obj == null ? false : obj.isInUse();
+      }
+
+      public void setInUse(boolean inUse)
+      {
+         if (obj != null)
+         {
+            obj.setInUse(inUse);
+            lastUsed = obj.getLastUsed();
+         }
+      }
+
+      @Override
+      public String toString()
+      {
+         return super.toString() + "{id=" + id + ",obj=" + obj + ",groupId=" + groupId + ",group=" + group + "}";
+      }
+   }
+   
+   private class EntryContainer   
+      implements StatefulObjectFactory<Entry<T>>, PassivationManager<Entry<T>>, IntegratedObjectStore<Entry<T>>
+   {
+      private StatefulObjectFactory<T> factory;
+      private PassivationManager<T> passivationManager;
+      private IntegratedObjectStore<Entry<T>> store;
+      
+      EntryContainer(StatefulObjectFactory<T> factory, PassivationManager<T> passivationManager, IntegratedObjectStore<Entry<T>> store)
+      {
+         this.factory = factory;
+         this.passivationManager = passivationManager;
+         this.store = store;
+      }
+      
+      public Entry<T> create(Class<?>[] initTypes, Object[] initValues)
+      {
+         return new Entry<T>(factory.create(initTypes, initValues));
+      }
+
+      public void destroy(Entry<T> entry)
+      {
+         factory.destroy(entry.obj);
+         if (entry.group != null) 
+         {
+            entry.group.removeMember(entry.id);
+            if (entry.group.size() == 0)
+            {
+               groupCache.remove(entry.groupId);
+            }
+         }
+      }
+      
+      @SuppressWarnings("unchecked")
+      public void postActivate(Entry<T> entry)
+      {
+         log.trace("post activate " + entry);
+         
+         // Restore the entry's ref to the group and object
+         if(entry.obj == null)
+         {
+            if(entry.group == null)
+            {
+               // TODO: peek or get?
+               // BES 2007/10/06 I think peek is better; no
+               // sense marking the group as in-use and then having
+               // to release it or something
+               entry.group = groupCache.peek(entry.groupId);
+            }
+            entry.obj = (T) entry.group.getMemberObject(entry.id);
+         }
+         
+         // Notify the group that this entry is active
+         entry.group.addActive(entry);
+         
+         // Invoke callbacks on the underlying object
+         passivationManager.postActivate(entry.obj);
+      }
+      
+      public void prePassivate(Entry<T> entry)
+      {
+         log.trace("pre-passivate " + entry);
+         
+         // entry.obj may or may not get serialized (depends on if group
+         // is in use) but it's ok to invoke callbacks now. If a caller
+         // requests this entry again and the obj hadn't been serialized with
+         // the group, we'll just call postActivate on it then, which is OK.
+         // By always invoking the callbacks here, we avoid possible bugs
+         // where they sometimes don't get called.
+         passivationManager.prePassivate(entry.obj);
+         
+         // If this call is coming via delegate.passivate(), entry.group will
+         // *not* be null.  In that case we are the controller for the
+         // group passivation. If the call is coming via Entry.prePassivate(), 
+         // entry.group *will* be null. In that case we are not the controller 
+         // of the passivation and can just return.
+         if(entry.group != null)
+         {
+            // Remove ourself from group's active list so we don't get
+            // called again via Entry.prePassivate()            
+            entry.group.removeActive(entry.id);
+            
+            // Only tell the group to passivate if no members are in use
+            if (!entry.group.isInUse())
+            {
+               // Tell group to prePassivate other active members
+               entry.group.prePassivate();
+               // Go ahead and do the real passivation
+               groupCache.passivate(entry.groupId);
+            }
+            // else {
+            // this turns into a pretty meaningless exercise of just
+            // passivating an empty Entry. TODO consider throwing 
+            // ItemInUseException here, thus aborting everything.  Need to
+            // be sure that doesn't lead to problems as the exception propagates
+            // }
+            
+            // This call didn't come through Entry.prePassivate() (which nulls
+            // group and obj) so we have to do it ourselves. Otherwise
+            // when this call returns, delegate will serialize the entry
+            // with a ref to group and obj.            
+            entry.group = null;
+            entry.obj = null;
+         }
+      }
+      
+      public void preReplicate(Entry<T> entry)
+      {         
+         // This method follows the same conceptual logic as prePassivate.
+         // See the detailed comments in that method.
+         
+         log.trace("pre-replicate " + entry);
+         
+         passivationManager.preReplicate(entry.obj);
+         
+         if(entry.group != null)
+         {
+            entry.group.removeActive(entry.id);
+            
+            try
+            {
+               if (!entry.group.isInUse())
+               {
+                  entry.group.preReplicate();
+                  groupCache.replicate(entry.groupId);
+               }
+            }
+            finally
+            {
+               // Here we differ from prePassivate!!
+               // Restore the entry as "active" so it can get
+               // passivation callbacks
+               entry.group.addActive(entry);
+            }
+            
+            entry.group = null;
+            entry.obj = null;
+         }
+      }
+      
+      @SuppressWarnings("unchecked")
+      public void postReplicate(Entry<T> entry)
+      {
+         log.trace("postreplicate " + entry);
+         
+         // Restore the entry's ref to the group and object
+         if(entry.obj == null)
+         {
+            if(entry.group == null)
+            {
+               // TODO: peek or get?
+               // BES 2007/10/06 I think peek is better; no
+               // sense marking the group as in-use and then having
+               // to release it or something
+               entry.group = groupCache.peek(entry.groupId);
+            }
+            entry.obj = (T) entry.group.getMemberObject(entry.id);
+         }
+         
+         // Notify the group that this entry is active
+         entry.group.addActive(entry);
+         
+         // Invoke callbacks on the underlying object
+         passivationManager.postReplicate(entry.obj);
+      }
+
+      public void replicate(Entry<T> entry)
+      {
+         store.replicate(entry);
+      }
+
+      public boolean isClustered()
+      {
+         // Use value from containing cache; containing cache c'tor ensures
+         // the underlying store matches this
+         return clustered;
+      }
+      
+      public Entry<T> get(Object key)
+      {
+         return store.get(key);
+      }
+
+      public void passivate(Entry<T> entry)
+      {
+         store.passivate(entry);         
+      }
+
+      public void insert(Entry<T> entry)
+      {
+         store.insert(entry);         
+      }
+
+      public Entry<T> remove(Object key)
+      {
+         return store.remove(key);
+      }
+
+      public void start()
+      {         
+         store.start();
+      }
+
+      public void stop()
+      {
+         store.stop();
+      }      
+   }
+   
+   public GroupedPassivatingCacheImpl2(StatefulObjectFactory<T> factory, PassivationManager<T> passivationManager, IntegratedObjectStore<Entry<T>> store, PassivatingCache<SerializationGroup> groupCache)
+   {
+      assert groupCache != null : "groupCache is null";
+      assert passivationManager != null : "passivationManager is null";
+      assert store != null : "store is null";
+      assert groupCache.isClustered() == store.isClustered(): "incompatible clustering support between groupCache and store";
+      assert groupCache.isClustered() == passivationManager.isClustered(): "incompatible clustering support between groupCache and passivationManager";
+
+      this.clustered = store.isClustered();
+      this.groupCache = groupCache;
+      EntryContainer container = new EntryContainer(factory, passivationManager, store);
+      this.delegate = new SimplePassivatingCache2<Entry<T>>(container, container, container);
+   }
+   
+   public boolean isClustered()
+   {
+      return clustered;
+   }
+   
+   public void replicate(Object key)
+   {
+      delegate.replicate(key);
+   }
+   
+   public void passivate(Object key)
+   {
+      delegate.passivate(key);
+   }
+
+   public T create(Class<?>[] initTypes, Object[] initValues)
+   {
+      return delegate.create(initTypes, initValues).obj;
+   }
+
+   public T get(Object key) throws NoSuchEJBException
+   {
+      Entry<T> entry = delegate.get(key);
+      if (entry.group != null)
+      {
+         entry.group.addInUse(key);
+      }
+      return entry.obj;
+   }
+
+   public T peek(Object key) throws NoSuchEJBException
+   {
+      return delegate.peek(key).obj;
+   }
+
+   public void release(T obj)
+   {
+      Object key = obj.getId();
+      Entry<T> entry = delegate.releaseByKey(key);
+      if (entry.group != null)
+      {
+         entry.group.removeInUse(key);
+      }      
+   }
+
+   public void remove(Object key)
+   {
+      delegate.remove(key);
+   }
+   
+   public void setGroup(T obj, SerializationGroup group)
+   {
+      Entry<T> entry;
+      Object key = obj.getId();
+      entry = delegate.peek(key);
+      if(entry.group != null)
+         throw new IllegalStateException("object " + key + " already associated with a passivation group");
+      entry.group = group;
+      entry.groupId = group.getId();
+      entry.group.addMember(entry);
+   }
+   
+   public void start()
+   {
+      delegate.start();
+   }
+
+   public void stop()
+   {
+      delegate.stop();
+   }
+}

Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupContainer.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupContainer.java	                        (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupContainer.java	2007-10-08 17:27:20 UTC (rev 65923)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ejb3.cache.impl;
+
+import org.jboss.ejb3.cache.PassivationManager;
+import org.jboss.ejb3.cache.StatefulObjectFactory;
+import org.jboss.ejb3.cache.grouped.SerializationGroup;
+import org.jboss.logging.Logger;
+
+/**
+ * Comment
+ *  
+ *  FIXME determine whether SerializationGroup clustering support should
+ *  be controlled by a property of this container or via a param passed
+ *  to create(). 
+ *
+ * @author <a href="mailto:carlo.dewolf at jboss.com">Carlo de Wolf</a>
+ * @version $Revision: $
+ */
+public class SerializationGroupContainer implements StatefulObjectFactory<SerializationGroup>, PassivationManager<SerializationGroup>
+{
+   private static final Logger log = Logger.getLogger(SerializationGroupContainer.class);
+   
+   private boolean clustered;
+   
+   public boolean isClustered()
+   {
+      return clustered;
+   }
+   
+   public void setClustered(boolean clustered)
+   {
+      this.clustered = clustered;
+   }
+
+   public SerializationGroup create(Class<?>[] initTypes, Object[] initValues)
+   {
+      SerializationGroup group = new SerializationGroupImpl();
+      // TODO should this be controlled via one of the initValues?
+      group.setClustered(clustered);
+      return group;
+   }
+
+   public void destroy(SerializationGroup obj)
+   {
+      // TODO: nothing?
+   }
+
+   public void postActivate(SerializationGroup obj)
+   {
+      log.trace("post activate " + obj);
+      obj.postActivate();
+   }
+
+   public void prePassivate(SerializationGroup obj)
+   {
+      log.trace("pre passivate " + obj);
+      obj.prePassivate();
+   }
+
+   public void postReplicate(SerializationGroup obj)
+   {
+      if (!clustered)
+         throw new UnsupportedOperationException("Clustering not supported");
+      log.trace("post replicate " + obj);
+      obj.postReplicate();
+   }
+
+   public void preReplicate(SerializationGroup obj)
+   {
+      if (!clustered)
+         throw new UnsupportedOperationException("Clustering not supported");
+      log.trace("pre replicate " + obj);
+      obj.preReplicate();
+   }
+
+   
+}


Property changes on: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupContainer.java
___________________________________________________________________
Name: svn:executable
   + *

Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupImpl.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupImpl.java	                        (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupImpl.java	2007-10-08 17:27:20 UTC (rev 65923)
@@ -0,0 +1,186 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ejb3.cache.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.ejb3.cache.grouped.SerializationGroup;
+import org.jboss.ejb3.cache.grouped.SerializationGroupMember;
+import org.jboss.logging.Logger;
+import org.jboss.util.id.GUID;
+
+/**
+ * Default implementation of {@link SerializationGroup}.
+ *
+ * @author <a href="mailto:carlo.dewolf at jboss.com">Carlo de Wolf</a>
+ * @author Brian Stansberry
+ * @version $Revision: $
+ */
+public class SerializationGroupImpl implements SerializationGroup
+{
+   private static final Logger log = Logger.getLogger(SerializationGroupImpl.class);
+   private static final long serialVersionUID = 1L;
+
+   private Object id = new GUID();
+   /** 
+    * The actual underlying objects passed in via addMember(). We store them 
+    * here so they aren't lost when they are cleared from the values
+    * stored in the "members" map.
+    */
+   private Map<Object, Object> memberObjects = new HashMap<Object, Object>();
+   /** 
+    * The active group members.  We don't serialized these. Rather, it is
+    * the responsibility of members to reassociate themselves with the 
+    * group upon deserialization (via addActive())
+    */
+   private transient Map<Object, SerializationGroupMember> active = 
+         new HashMap<Object, SerializationGroupMember>();
+   /**
+    * Set of keys passed to {@link #addInUse(Object)}
+    */
+   private transient Set<Object> inUseKeys = new HashSet<Object>();
+   
+   private boolean clustered;
+   
+   private long lastUsed;
+   
+   public Object getId()
+   {
+      return id;
+   }
+   
+   public boolean isClustered()
+   {
+      return clustered;
+   }
+   
+   public void setClustered(boolean clustered)
+   {
+      this.clustered = clustered;
+   }
+   
+   public void addMember(SerializationGroupMember member)
+   {
+      Object key = member.getId();
+      if (memberObjects.containsKey(key))
+         throw new IllegalStateException(member + " is already a member");
+      log.trace("add member " + key + ", " + member);
+      memberObjects.put(key, member.getSerializableObject());
+      active.put(key, member);
+   }
+   
+   public void removeMember(Object key)
+   {
+      removeActive(key);
+      memberObjects.remove(key);
+   }
+   
+   public int size()
+   {
+      return memberObjects.size();
+   }
+   
+   public Object getMemberObject(Object key)
+   {
+      return memberObjects.get(key);
+   }
+   
+   public void postActivate()
+   {
+      // do nothing
+   }
+   
+   public void prePassivate()
+   {
+      for(SerializationGroupMember member : active.values())
+      {
+         member.prePassivate();
+      }
+      active.clear();
+   }
+   
+   public void postReplicate()
+   {
+      // do nothing
+   }
+   
+   public void preReplicate()
+   {
+      for(SerializationGroupMember member : active.values())
+      {
+         member.preReplicate();
+      }
+      active.clear();
+   }
+   
+   public void addActive(SerializationGroupMember member)
+   {
+      Object key = member.getId();
+      if (!memberObjects.containsKey(key))
+         throw new IllegalStateException(member + " is not a member of " + this);
+      active.put(key, member);
+   }
+   
+   public void removeActive(Object key)
+   {
+      active.remove(key);
+   }
+
+   public void addInUse(Object key)
+   {
+      if (!memberObjects.containsKey(key))
+         throw new IllegalStateException(key + " is not a member of " + this);
+      inUseKeys.add(key);
+      lastUsed = System.currentTimeMillis();
+   }
+
+   public void removeInUse(Object key)
+   {
+      if (inUseKeys.remove(key))
+      {
+         lastUsed = System.currentTimeMillis();
+      }
+      else if (!memberObjects.containsKey(key))
+      {
+            throw new IllegalStateException(key + " is not a member of " + this);
+      }      
+   }
+
+   public long getLastUsed()
+   {
+      return lastUsed;
+   }
+
+   public boolean isInUse()
+   {
+      return inUseKeys.size() > 0;
+   }
+
+   public void setInUse(boolean inUse)
+   {
+      lastUsed = System.currentTimeMillis();
+   }   
+   
+}


Property changes on: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupImpl.java
___________________________________________________________________
Name: svn:executable
   + *

Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimpleIntegratedObjectStore.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimpleIntegratedObjectStore.java	                        (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimpleIntegratedObjectStore.java	2007-10-08 17:27:20 UTC (rev 65923)
@@ -0,0 +1,328 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ejb3.cache.impl;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.jboss.ejb3.cache.Cacheable;
+import org.jboss.ejb3.cache.IntegratedObjectStore;
+import org.jboss.ejb3.cache.ItemInUseException;
+import org.jboss.ejb3.cache.ObjectStore;
+import org.jboss.ejb3.cache.PassivatingCache;
+import org.jboss.ejb3.cache.PassivatingIntegratedObjectStore;
+import org.jboss.logging.Logger;
+
+/**
+ * A {@link IntegratedObjectStore} that delegates to a provided
+ * {@link ObjectStore} for persistence.
+ * 
+ * @author Brian Stansberry
+ * @version $Revision$
+ */
+public class SimpleIntegratedObjectStore<T extends Cacheable & Serializable> 
+      implements PassivatingIntegratedObjectStore<T>
+{
+   private static final Logger log = Logger.getLogger(SimpleIntegratedObjectStore.class);
+   
+   private final ObjectStore<T> store;
+   private Map<Object, T> cache;
+   private Map<Object, Long> passivatedEntries;
+   
+   /**
+    * Support callbacks when our SessionTimeoutThread decides to
+    * evict an entry.
+    */
+   private PassivatingCache<T> owningCache;   
+   private int interval;
+   private int idleTimeSeconds;
+   private int expirationTimeSeconds;   
+   private SessionTimeoutRunner sessionTimeoutRunner;
+   private String name;
+   
+   /**
+    * Create a new SimpleIntegratedObjectStore.
+    * 
+    */
+   public SimpleIntegratedObjectStore(ObjectStore<T> store)
+   {
+      this.store = store;
+      this.cache = new HashMap<Object, T>();
+      this.passivatedEntries = new HashMap<Object, Long>();
+   }
+   
+   public boolean isClustered()
+   {
+      return false;
+   }
+
+   public T get(Object key)
+   {
+      synchronized (cache)
+      {
+         T entry = cache.get(key);
+         if(entry == null)
+         {
+            entry = store.load(key);
+            if(entry != null)
+            {
+               cache.put(key, entry);
+               passivatedEntries.remove(key);
+            }
+         }
+         return entry;
+      }
+   }
+
+   public void insert(T entry)
+   {
+      Object key = entry.getId();
+      synchronized (cache)
+      {
+         cache.put(key, entry);
+      }
+   }
+   
+   public void replicate(T entry)
+   {
+      throw new UnsupportedOperationException("Clustering is not supported by " + 
+                                              getClass().getName());      
+   }
+
+   public void passivate(T entry)
+   {
+      synchronized (cache)
+      {
+         Object key = entry.getId();
+         store.store(entry);  
+         passivatedEntries.put(key, new Long(entry.getLastUsed()));
+         cache.remove(key);
+      }
+   }
+
+   public T remove(Object id)
+   {
+      synchronized (cache)
+      {
+         T entry = get(id);
+         if (entry != null)
+         {
+            cache.remove(id);
+         }
+         return entry;
+      }
+   } 
+
+   public void start()
+   {
+      if (interval > 0)
+      {
+         if (sessionTimeoutRunner == null)
+         {
+            assert name != null : "name has not been set";
+            
+            sessionTimeoutRunner = new SessionTimeoutRunner();
+         }
+         sessionTimeoutRunner.start();
+      }
+   }
+
+   public void stop()
+   {
+      if (sessionTimeoutRunner != null)
+      {
+         sessionTimeoutRunner.stop();
+      }      
+   }
+
+   // ---------------------------------------  PassivatingIntegratedObjectStore
+
+
+   public void setPassivatingCache(PassivatingCache<T> cache)
+   {
+      this.owningCache = cache;      
+   }
+   
+   public int getInterval()
+   {
+      return interval;
+   }
+
+   public void setInterval(int seconds)
+   {
+      this.interval = seconds;      
+   }
+   
+   public void runExpiration()
+   {
+      if (expirationTimeSeconds > 0)
+      {
+         long now = System.currentTimeMillis();
+         long minRemovalUse = now - (expirationTimeSeconds * 1000);                     
+         for (CacheableTimestamp ts : getPassivatedEntries())
+         {
+            try
+            {
+               if (minRemovalUse >= ts.getLastUsed())
+               {
+                  remove(ts.getId());
+               }
+            }
+            catch (ItemInUseException ignored)
+            {
+               log.trace("skipping in-use entry " + ts.getId());
+            }
+         }    
+      }      
+   }
+
+   public void runPassivation()
+   {
+      if (idleTimeSeconds > 0)
+      {
+         long now = System.currentTimeMillis();
+         long minPassUse = now - (idleTimeSeconds * 1000);
+         
+         // Scan the in-memory entries for passivation or removal
+         for (CacheableTimestamp ts : getInMemoryEntries())
+         {
+            try
+            {
+               long lastUsed = ts.getLastUsed();
+               if (minPassUse >= lastUsed)
+               {
+                  owningCache.passivate(ts.getId());
+               }
+            }
+            catch (ItemInUseException ignored)
+            {
+               log.trace("skipping in-use entry " + ts.getId());
+            }
+         }
+      }
+      
+   }   
+
+   private SortedSet<CacheableTimestamp> getInMemoryEntries()
+   {      
+      SortedSet<CacheableTimestamp> set = new TreeSet<CacheableTimestamp>();
+      for (Map.Entry<Object, T> entry : cache.entrySet())
+      {
+         set.add(new CacheableTimestamp(entry.getKey(), entry.getValue().getLastUsed()));
+      }
+      return set;
+   }
+
+   private SortedSet<CacheableTimestamp> getPassivatedEntries()
+   {
+      SortedSet<CacheableTimestamp> set = new TreeSet<CacheableTimestamp>();
+      for (Map.Entry<Object, Long> entry : passivatedEntries.entrySet())
+      {
+         set.add(new CacheableTimestamp(entry.getKey(), entry.getValue().longValue()));
+      }
+      return set;
+   }
+
+   public void setName(String name)
+   {
+      this.name = name;
+   }
+   
+
+   private class SessionTimeoutRunner implements Runnable
+   {
+      private boolean stopped = true;
+      private Thread thread;
+      
+      public void run()
+      {
+         while (!stopped)
+         {
+            try
+            {
+               runPassivation();               
+            }
+            catch (Exception e)
+            {
+               log.error("Caught exception processing passivations", e);
+            }
+            
+            if (!stopped)
+            {
+               try
+               {
+                  runExpiration();               
+               }
+               catch (Exception e)
+               {
+                  log.error("Caught exception processing expirations", e);
+               }               
+            }
+            
+            if (!stopped)
+            {
+               try
+               {
+                  Thread.sleep(interval * 1000);
+               }
+               catch (InterruptedException ignored) {}
+            }
+         }
+      }
+      
+      void start()
+      {
+         if (stopped)
+         {
+            thread = new Thread(this, "SessionTimeoutRunner-" + name);
+            thread.setDaemon(true);
+            stopped = false;
+            thread.start();
+         }
+      }
+      
+      void stop()
+      {
+         stopped = true;
+         if (thread != null && thread.isAlive())
+         {
+            try
+            {
+               thread.join(1000);
+            }
+            catch (InterruptedException ignored) {}
+            
+            if (thread.isAlive())
+            {
+               thread.interrupt();
+            }
+            
+         }
+      }
+      
+   }
+
+}

Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimplePassivatingCache2.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimplePassivatingCache2.java	                        (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimplePassivatingCache2.java	2007-10-08 17:27:20 UTC (rev 65923)
@@ -0,0 +1,210 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ejb3.cache.impl;
+
+import java.io.Serializable;
+
+import javax.ejb.NoSuchEJBException;
+
+import org.jboss.ejb3.cache.Cacheable;
+import org.jboss.ejb3.cache.IntegratedObjectStore;
+import org.jboss.ejb3.cache.ItemInUseException;
+import org.jboss.ejb3.cache.PassivatingCache;
+import org.jboss.ejb3.cache.PassivationManager;
+import org.jboss.ejb3.cache.StatefulObjectFactory;
+import org.jboss.logging.Logger;
+
+/**
+ * Non group-aware {@link PassivatingCache} that uses an {@link IntegratedObjectStore}
+ * to manage data.
+ *
+ * @author Brian Stansberry
+ * @version $Revision: 65339 $
+ */
+public class SimplePassivatingCache2<T extends Cacheable & Serializable> implements PassivatingCache<T>
+{
+   private static final Logger log = Logger.getLogger(SimplePassivatingCache2.class);
+   
+   private StatefulObjectFactory<T> factory;
+   private PassivationManager<T> passivationManager;
+   private IntegratedObjectStore<T> store;
+   
+   public SimplePassivatingCache2(StatefulObjectFactory<T> factory, PassivationManager<T> passivationManager, IntegratedObjectStore<T> store)
+   {
+      assert factory != null : "factory is null";
+      assert passivationManager != null : "passivationManager is null";
+      assert store != null : "store is null";
+      
+      this.factory = factory;
+      this.passivationManager = passivationManager;
+      this.store = store;
+   }
+   
+   public boolean isClustered()
+   {
+      return store.isClustered();
+   }
+   
+   public void replicate(Object key)
+   {
+      if (!isClustered())
+      {
+         throw new UnsupportedOperationException("Clustering is not supported by " + 
+                                                 store.getClass().getName());
+      }
+      
+      log.trace("replicate " + key);
+      synchronized (store)
+      {
+         T entry = store.get(key);
+         
+         if(entry == null)
+            throw new IllegalArgumentException("entry " + key + " not found in cache " + this);
+         
+         if(entry.isInUse())
+         {
+            throw new ItemInUseException("entry " + entry + " is in use");
+         }
+         
+         passivationManager.preReplicate(entry);
+         
+         store.replicate(entry);
+      }
+   }
+   
+   public T create(Class<?>[] initTypes, Object[] initValues)
+   {
+      T obj = factory.create(initTypes, initValues);
+      synchronized (store)
+      {
+         store.insert(obj);
+      }
+      return obj;
+   }
+
+   public T get(Object key) throws NoSuchEJBException
+   {
+      synchronized (store)
+      {
+         T entry = store.get(key);
+         if(entry == null)
+            throw new NoSuchEJBException(String.valueOf(key));
+         
+         if (isClustered())
+         {
+            passivationManager.postReplicate(entry);
+         }
+         
+         passivationManager.postActivate(entry);
+         
+//         if(entry.getCacheState() != Cacheable.State.READY)
+//         {
+//            throw new IllegalStateException("entry " + key + " is not ready");
+//         }
+         entry.setInUse(true);
+         return entry;
+      }
+   }
+
+   public void passivate(Object key)
+   {
+      log.trace("passivate " + key);
+      synchronized (store)
+      {
+         T entry = store.get(key);
+         
+         if(entry == null)
+            throw new IllegalArgumentException("entry " + key + " not found in cache " + this);
+         
+         if(entry.isInUse())
+         {
+            throw new ItemInUseException("entry " + entry + " is in use");
+         }
+
+         passivationManager.prePassivate(entry);
+         
+         store.passivate(entry);
+      }
+   }
+   
+   public T peek(Object key) throws NoSuchEJBException
+   {
+      synchronized (store)
+      {
+         T entry = store.get(key);
+         if(entry == null)
+            throw new NoSuchEJBException(String.valueOf(key));
+         
+         if (isClustered())
+         {
+            passivationManager.postReplicate(entry);
+         }
+         
+         passivationManager.postActivate(entry);
+         
+         return entry;
+      }
+   }
+
+   public void release(T obj)
+   {
+      releaseByKey(obj.getId());
+   }
+
+   protected T releaseByKey(Object key)
+   {
+      synchronized (store)
+      {
+         T entry = store.get(key);
+         if(entry == null)
+            throw new IllegalStateException("object " + key + " not from this cache");
+         if(!entry.isInUse())
+            throw new IllegalStateException("entry " + entry + " is not in use");
+         entry.setInUse(false);
+         return entry;
+      }
+   }
+   
+   public void remove(Object key)
+   {
+      T entry;
+      synchronized (store)
+      {
+         entry = store.remove(key);
+         if(entry.isInUse())
+            throw new ItemInUseException("entry " + entry + " is in use");
+      }
+      if(entry != null)
+         factory.destroy(entry);
+   }
+   
+   public void start()
+   {
+      store.start();
+   }
+
+   public void stop()
+   {
+      store.stop();
+   }
+
+}




More information about the jboss-cvs-commits mailing list