[jboss-cvs] JBossAS SVN: r65923 - projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Oct 8 13:27:20 EDT 2007
Author: bstansberry at jboss.com
Date: 2007-10-08 13:27:20 -0400 (Mon, 08 Oct 2007)
New Revision: 65923
Added:
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/CacheableTimestamp.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/GroupedPassivatingCacheImpl2.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupContainer.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupImpl.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimpleIntegratedObjectStore.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimplePassivatingCache2.java
Log:
First draft of clustering support
Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/CacheableTimestamp.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/CacheableTimestamp.java (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/CacheableTimestamp.java 2007-10-08 17:27:20 UTC (rev 65923)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ejb3.cache.impl;
+
+import org.jboss.ejb3.cache.Cacheable;
+import org.jboss.ejb3.cache.Identifiable;
+import org.jboss.ejb3.cache.IntegratedObjectStore;
+
+/**
+ * Encapsulation of the {@link Identifiable#getId() id} and
+ * {@link Cacheable#getLastUsed() last used timestamp} of
+ * a cached {@link Cacheable}.
+ * <p>
+ * Implements <code>Comparable</code> to make it easy to sort
+ * for LRU comparisons.
+ * </p>
+ *
+ * @see IntegratedObjectStore#getInMemoryEntries()
+ * @see IntegratedObjectStore#getPassivatedEntries()
+ *
+ * @author Brian Stansberry
+ * @version $Revision$
+ */
+public class CacheableTimestamp
+ implements Identifiable, Comparable<CacheableTimestamp>
+{
+ private Object id;
+ private long lastUsed;
+
+ public CacheableTimestamp(Object id, long lastUsed)
+ {
+ assert id != null : "id cannot be null";
+ assert lastUsed > 0 : "lastUsed must be positive";
+
+ this.id = id;
+ this.lastUsed = lastUsed;
+ }
+
+ public Object getId()
+ {
+ return id;
+ }
+
+ public long getLastUsed()
+ {
+ return lastUsed;
+ }
+
+ /**
+ * Compares based on {@link #getLastUsed() last used}, returning
+ * -1 for earlier timestamps.
+ */
+ public int compareTo(CacheableTimestamp o)
+ {
+ if (this.lastUsed < o.lastUsed)
+ return -1;
+ else if (this.lastUsed > o.lastUsed)
+ return 1;
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+
+ if (obj instanceof CacheableTimestamp)
+ {
+ return this.id.equals(((CacheableTimestamp) obj).id);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return id.hashCode();
+ }
+}
Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/GroupedPassivatingCacheImpl2.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/GroupedPassivatingCacheImpl2.java (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/GroupedPassivatingCacheImpl2.java 2007-10-08 17:27:20 UTC (rev 65923)
@@ -0,0 +1,456 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ejb3.cache.impl;
+
+import java.io.Serializable;
+
+import javax.ejb.NoSuchEJBException;
+
+import org.jboss.ejb3.cache.Cacheable;
+import org.jboss.ejb3.cache.IntegratedObjectStore;
+import org.jboss.ejb3.cache.PassivatingCache;
+import org.jboss.ejb3.cache.PassivationManager;
+import org.jboss.ejb3.cache.StatefulObjectFactory;
+import org.jboss.ejb3.cache.grouped.GroupedPassivatingCache;
+import org.jboss.ejb3.cache.grouped.SerializationGroup;
+import org.jboss.ejb3.cache.grouped.SerializationGroupMember;
+import org.jboss.logging.Logger;
+
+/**
+ * {@link GroupedPassivatingCache} that uses an {@link IntegratedObjectStore}
+ * to manage data.
+ *
+ * @author Brian Stansberry
+ * @version $Revision$
+ */
+public class GroupedPassivatingCacheImpl2<T extends Cacheable & Serializable> implements GroupedPassivatingCache<T>
+{
+ private static final Logger log = Logger.getLogger(GroupedPassivatingCacheImpl2.class);
+
+ /**
+ * Cache that's managing the PassivationGroup
+ */
+ private PassivatingCache<SerializationGroup> groupCache;
+
+ /**
+ * Delegate that handles the usual details; ends up calling into
+ * our EntryContainer for StatefulObjectFactory, PassivationManager
+ * and IntegratedObjectStore functions.
+ */
+ private SimplePassivatingCache2<Entry<T>> delegate;
+
+ /**
+ * Do we support clustering? This field is really just a minor
+ * optimization to avoid calling through to the underlying
+ * IntegratedObjectStore all the time.
+ */
+ private boolean clustered;
+
+ public class Entry<C extends Cacheable & Serializable> implements Cacheable, SerializationGroupMember, Serializable
+ {
+ private static final long serialVersionUID = 1L;
+
+ Object id;
+ /**
+ * The underlying object (e.g. bean context).
+ * Preferably, this field would be transient. It isn't now because it is
+ * possible this entry will never be assigned to a PassivationGroup,
+ * in which case we need to serialize obj.
+ * TODO Relying on nulling this field is fragile. Can we make this
+ * field transient by ensuring we only use this cache class with bean
+ * classes that are sure to be part of a group?
+ */
+ C obj;
+ /** The group. Never serialize the group; only the groupCache does that */
+ transient SerializationGroup group;
+ Object groupId;
+ long lastUsed;
+
+ Entry(C obj)
+ {
+ assert obj != null : "obj is null";
+
+ this.obj = obj;
+ this.id = obj.getId();
+ }
+
+ public Object getId()
+ {
+ return id;
+ }
+
+ public boolean isClustered()
+ {
+ // Value from the containing cache
+ return clustered;
+ }
+
+ @SuppressWarnings("unchecked")
+ public C getSerializableObject()
+ {
+ return obj;
+ }
+
+ // Called by PassivationGroup prior to its passivating
+ public void prePassivate()
+ {
+ // make sure we don't passivate the group twice
+ group = null;
+ // null out obj so when delegate passivates this entry
+ // we don't serialize it. It serializes with the PassivationGroup only
+ obj = null;
+
+ delegate.passivate(this.id);
+ }
+
+ // Called by PassivationGroup prior to its replicating
+ public void preReplicate()
+ {
+ // make sure we don't replicate the group twice
+ group = null;
+ // null out obj so when delegate passivates this entry
+ // we don't serialize it. It serializes with the PassivationGroup only
+ obj = null;
+
+ delegate.replicate(this.id);
+ }
+
+ public long getLastUsed()
+ {
+ return obj == null ? lastUsed : obj.getLastUsed();
+ }
+
+ public boolean isInUse()
+ {
+ return obj == null ? false : obj.isInUse();
+ }
+
+ public void setInUse(boolean inUse)
+ {
+ if (obj != null)
+ {
+ obj.setInUse(inUse);
+ lastUsed = obj.getLastUsed();
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return super.toString() + "{id=" + id + ",obj=" + obj + ",groupId=" + groupId + ",group=" + group + "}";
+ }
+ }
+
+ private class EntryContainer
+ implements StatefulObjectFactory<Entry<T>>, PassivationManager<Entry<T>>, IntegratedObjectStore<Entry<T>>
+ {
+ private StatefulObjectFactory<T> factory;
+ private PassivationManager<T> passivationManager;
+ private IntegratedObjectStore<Entry<T>> store;
+
+ EntryContainer(StatefulObjectFactory<T> factory, PassivationManager<T> passivationManager, IntegratedObjectStore<Entry<T>> store)
+ {
+ this.factory = factory;
+ this.passivationManager = passivationManager;
+ this.store = store;
+ }
+
+ public Entry<T> create(Class<?>[] initTypes, Object[] initValues)
+ {
+ return new Entry<T>(factory.create(initTypes, initValues));
+ }
+
+ public void destroy(Entry<T> entry)
+ {
+ factory.destroy(entry.obj);
+ if (entry.group != null)
+ {
+ entry.group.removeMember(entry.id);
+ if (entry.group.size() == 0)
+ {
+ groupCache.remove(entry.groupId);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void postActivate(Entry<T> entry)
+ {
+ log.trace("post activate " + entry);
+
+ // Restore the entry's ref to the group and object
+ if(entry.obj == null)
+ {
+ if(entry.group == null)
+ {
+ // TODO: peek or get?
+ // BES 2007/10/06 I think peek is better; no
+ // sense marking the group as in-use and then having
+ // to release it or something
+ entry.group = groupCache.peek(entry.groupId);
+ }
+ entry.obj = (T) entry.group.getMemberObject(entry.id);
+ }
+
+ // Notify the group that this entry is active
+ entry.group.addActive(entry);
+
+ // Invoke callbacks on the underlying object
+ passivationManager.postActivate(entry.obj);
+ }
+
+ public void prePassivate(Entry<T> entry)
+ {
+ log.trace("pre-passivate " + entry);
+
+ // entry.obj may or may not get serialized (depends on if group
+ // is in use) but it's ok to invoke callbacks now. If a caller
+ // requests this entry again and the obj hadn't been serialized with
+ // the group, we'll just call postActivate on it then, which is OK.
+ // By always invoking the callbacks here, we avoid possible bugs
+ // where they sometimes don't get called.
+ passivationManager.prePassivate(entry.obj);
+
+ // If this call is coming via delegate.passivate(), entry.group will
+ // *not* be null. In that case we are the controller for the
+ // group passivation. If the call is coming via Entry.prePassivate(),
+ // entry.group *will* be null. In that case we are not the controller
+ // of the passivation and can just return.
+ if(entry.group != null)
+ {
+ // Remove ourself from group's active list so we don't get
+ // called again via Entry.prePassivate()
+ entry.group.removeActive(entry.id);
+
+ // Only tell the group to passivate if no members are in use
+ if (!entry.group.isInUse())
+ {
+ // Tell group to prePassivate other active members
+ entry.group.prePassivate();
+ // Go ahead and do the real passivation
+ groupCache.passivate(entry.groupId);
+ }
+ // else {
+ // this turns into a pretty meaningless exercise of just
+ // passivating an empty Entry. TODO consider throwing
+ // ItemInUseException here, thus aborting everything. Need to
+ // be sure that doesn't lead to problems as the exception propagates
+ // }
+
+ // This call didn't come through Entry.prePassivate() (which nulls
+ // group and obj) so we have to do it ourselves. Otherwise
+ // when this call returns, delegate will serialize the entry
+ // with a ref to group and obj.
+ entry.group = null;
+ entry.obj = null;
+ }
+ }
+
+ public void preReplicate(Entry<T> entry)
+ {
+ // This method follows the same conceptual logic as prePassivate.
+ // See the detailed comments in that method.
+
+ log.trace("pre-replicate " + entry);
+
+ passivationManager.preReplicate(entry.obj);
+
+ if(entry.group != null)
+ {
+ entry.group.removeActive(entry.id);
+
+ try
+ {
+ if (!entry.group.isInUse())
+ {
+ entry.group.preReplicate();
+ groupCache.replicate(entry.groupId);
+ }
+ }
+ finally
+ {
+ // Here we differ from prePassivate!!
+ // Restore the entry as "active" so it can get
+ // passivation callbacks
+ entry.group.addActive(entry);
+ }
+
+ entry.group = null;
+ entry.obj = null;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void postReplicate(Entry<T> entry)
+ {
+ log.trace("postreplicate " + entry);
+
+ // Restore the entry's ref to the group and object
+ if(entry.obj == null)
+ {
+ if(entry.group == null)
+ {
+ // TODO: peek or get?
+ // BES 2007/10/06 I think peek is better; no
+ // sense marking the group as in-use and then having
+ // to release it or something
+ entry.group = groupCache.peek(entry.groupId);
+ }
+ entry.obj = (T) entry.group.getMemberObject(entry.id);
+ }
+
+ // Notify the group that this entry is active
+ entry.group.addActive(entry);
+
+ // Invoke callbacks on the underlying object
+ passivationManager.postReplicate(entry.obj);
+ }
+
+ public void replicate(Entry<T> entry)
+ {
+ store.replicate(entry);
+ }
+
+ public boolean isClustered()
+ {
+ // Use value from containing cache; containing cache c'tor ensures
+ // the underlying store matches this
+ return clustered;
+ }
+
+ public Entry<T> get(Object key)
+ {
+ return store.get(key);
+ }
+
+ public void passivate(Entry<T> entry)
+ {
+ store.passivate(entry);
+ }
+
+ public void insert(Entry<T> entry)
+ {
+ store.insert(entry);
+ }
+
+ public Entry<T> remove(Object key)
+ {
+ return store.remove(key);
+ }
+
+ public void start()
+ {
+ store.start();
+ }
+
+ public void stop()
+ {
+ store.stop();
+ }
+ }
+
+ public GroupedPassivatingCacheImpl2(StatefulObjectFactory<T> factory, PassivationManager<T> passivationManager, IntegratedObjectStore<Entry<T>> store, PassivatingCache<SerializationGroup> groupCache)
+ {
+ assert groupCache != null : "groupCache is null";
+ assert passivationManager != null : "passivationManager is null";
+ assert store != null : "store is null";
+ assert groupCache.isClustered() == store.isClustered(): "incompatible clustering support between groupCache and store";
+ assert groupCache.isClustered() == passivationManager.isClustered(): "incompatible clustering support between groupCache and passivationManager";
+
+ this.clustered = store.isClustered();
+ this.groupCache = groupCache;
+ EntryContainer container = new EntryContainer(factory, passivationManager, store);
+ this.delegate = new SimplePassivatingCache2<Entry<T>>(container, container, container);
+ }
+
+ public boolean isClustered()
+ {
+ return clustered;
+ }
+
+ public void replicate(Object key)
+ {
+ delegate.replicate(key);
+ }
+
+ public void passivate(Object key)
+ {
+ delegate.passivate(key);
+ }
+
+ public T create(Class<?>[] initTypes, Object[] initValues)
+ {
+ return delegate.create(initTypes, initValues).obj;
+ }
+
+ public T get(Object key) throws NoSuchEJBException
+ {
+ Entry<T> entry = delegate.get(key);
+ if (entry.group != null)
+ {
+ entry.group.addInUse(key);
+ }
+ return entry.obj;
+ }
+
+ public T peek(Object key) throws NoSuchEJBException
+ {
+ return delegate.peek(key).obj;
+ }
+
+ public void release(T obj)
+ {
+ Object key = obj.getId();
+ Entry<T> entry = delegate.releaseByKey(key);
+ if (entry.group != null)
+ {
+ entry.group.removeInUse(key);
+ }
+ }
+
+ public void remove(Object key)
+ {
+ delegate.remove(key);
+ }
+
+ public void setGroup(T obj, SerializationGroup group)
+ {
+ Entry<T> entry;
+ Object key = obj.getId();
+ entry = delegate.peek(key);
+ if(entry.group != null)
+ throw new IllegalStateException("object " + key + " already associated with a passivation group");
+ entry.group = group;
+ entry.groupId = group.getId();
+ entry.group.addMember(entry);
+ }
+
+ public void start()
+ {
+ delegate.start();
+ }
+
+ public void stop()
+ {
+ delegate.stop();
+ }
+}
Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupContainer.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupContainer.java (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupContainer.java 2007-10-08 17:27:20 UTC (rev 65923)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ejb3.cache.impl;
+
+import org.jboss.ejb3.cache.PassivationManager;
+import org.jboss.ejb3.cache.StatefulObjectFactory;
+import org.jboss.ejb3.cache.grouped.SerializationGroup;
+import org.jboss.logging.Logger;
+
+/**
+ * Comment
+ *
+ * FIXME determine whether SerializationGroup clustering support should
+ * be controlled by a property of this container or via a param passed
+ * to create().
+ *
+ * @author <a href="mailto:carlo.dewolf at jboss.com">Carlo de Wolf</a>
+ * @version $Revision: $
+ */
+public class SerializationGroupContainer implements StatefulObjectFactory<SerializationGroup>, PassivationManager<SerializationGroup>
+{
+ private static final Logger log = Logger.getLogger(SerializationGroupContainer.class);
+
+ private boolean clustered;
+
+ public boolean isClustered()
+ {
+ return clustered;
+ }
+
+ public void setClustered(boolean clustered)
+ {
+ this.clustered = clustered;
+ }
+
+ public SerializationGroup create(Class<?>[] initTypes, Object[] initValues)
+ {
+ SerializationGroup group = new SerializationGroupImpl();
+ // TODO should this be controlled via one of the initValues?
+ group.setClustered(clustered);
+ return group;
+ }
+
+ public void destroy(SerializationGroup obj)
+ {
+ // TODO: nothing?
+ }
+
+ public void postActivate(SerializationGroup obj)
+ {
+ log.trace("post activate " + obj);
+ obj.postActivate();
+ }
+
+ public void prePassivate(SerializationGroup obj)
+ {
+ log.trace("pre passivate " + obj);
+ obj.prePassivate();
+ }
+
+ public void postReplicate(SerializationGroup obj)
+ {
+ if (!clustered)
+ throw new UnsupportedOperationException("Clustering not supported");
+ log.trace("post replicate " + obj);
+ obj.postReplicate();
+ }
+
+ public void preReplicate(SerializationGroup obj)
+ {
+ if (!clustered)
+ throw new UnsupportedOperationException("Clustering not supported");
+ log.trace("pre replicate " + obj);
+ obj.preReplicate();
+ }
+
+
+}
Property changes on: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupContainer.java
___________________________________________________________________
Name: svn:executable
+ *
Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupImpl.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupImpl.java (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupImpl.java 2007-10-08 17:27:20 UTC (rev 65923)
@@ -0,0 +1,186 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ejb3.cache.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.ejb3.cache.grouped.SerializationGroup;
+import org.jboss.ejb3.cache.grouped.SerializationGroupMember;
+import org.jboss.logging.Logger;
+import org.jboss.util.id.GUID;
+
+/**
+ * Default implementation of {@link SerializationGroup}.
+ *
+ * @author <a href="mailto:carlo.dewolf at jboss.com">Carlo de Wolf</a>
+ * @author Brian Stansberry
+ * @version $Revision: $
+ */
+public class SerializationGroupImpl implements SerializationGroup
+{
+ private static final Logger log = Logger.getLogger(SerializationGroupImpl.class);
+ private static final long serialVersionUID = 1L;
+
+ private Object id = new GUID();
+ /**
+ * The actual underlying objects passed in via addMember(). We store them
+ * here so they aren't lost when they are cleared from the values
+ * stored in the "members" map.
+ */
+ private Map<Object, Object> memberObjects = new HashMap<Object, Object>();
+ /**
+ * The active group members. We don't serialized these. Rather, it is
+ * the responsibility of members to reassociate themselves with the
+ * group upon deserialization (via addActive())
+ */
+ private transient Map<Object, SerializationGroupMember> active =
+ new HashMap<Object, SerializationGroupMember>();
+ /**
+ * Set of keys passed to {@link #addInUse(Object)}
+ */
+ private transient Set<Object> inUseKeys = new HashSet<Object>();
+
+ private boolean clustered;
+
+ private long lastUsed;
+
+ public Object getId()
+ {
+ return id;
+ }
+
+ public boolean isClustered()
+ {
+ return clustered;
+ }
+
+ public void setClustered(boolean clustered)
+ {
+ this.clustered = clustered;
+ }
+
+ public void addMember(SerializationGroupMember member)
+ {
+ Object key = member.getId();
+ if (memberObjects.containsKey(key))
+ throw new IllegalStateException(member + " is already a member");
+ log.trace("add member " + key + ", " + member);
+ memberObjects.put(key, member.getSerializableObject());
+ active.put(key, member);
+ }
+
+ public void removeMember(Object key)
+ {
+ removeActive(key);
+ memberObjects.remove(key);
+ }
+
+ public int size()
+ {
+ return memberObjects.size();
+ }
+
+ public Object getMemberObject(Object key)
+ {
+ return memberObjects.get(key);
+ }
+
+ public void postActivate()
+ {
+ // do nothing
+ }
+
+ public void prePassivate()
+ {
+ for(SerializationGroupMember member : active.values())
+ {
+ member.prePassivate();
+ }
+ active.clear();
+ }
+
+ public void postReplicate()
+ {
+ // do nothing
+ }
+
+ public void preReplicate()
+ {
+ for(SerializationGroupMember member : active.values())
+ {
+ member.preReplicate();
+ }
+ active.clear();
+ }
+
+ public void addActive(SerializationGroupMember member)
+ {
+ Object key = member.getId();
+ if (!memberObjects.containsKey(key))
+ throw new IllegalStateException(member + " is not a member of " + this);
+ active.put(key, member);
+ }
+
+ public void removeActive(Object key)
+ {
+ active.remove(key);
+ }
+
+ public void addInUse(Object key)
+ {
+ if (!memberObjects.containsKey(key))
+ throw new IllegalStateException(key + " is not a member of " + this);
+ inUseKeys.add(key);
+ lastUsed = System.currentTimeMillis();
+ }
+
+ public void removeInUse(Object key)
+ {
+ if (inUseKeys.remove(key))
+ {
+ lastUsed = System.currentTimeMillis();
+ }
+ else if (!memberObjects.containsKey(key))
+ {
+ throw new IllegalStateException(key + " is not a member of " + this);
+ }
+ }
+
+ public long getLastUsed()
+ {
+ return lastUsed;
+ }
+
+ public boolean isInUse()
+ {
+ return inUseKeys.size() > 0;
+ }
+
+ public void setInUse(boolean inUse)
+ {
+ lastUsed = System.currentTimeMillis();
+ }
+
+}
Property changes on: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SerializationGroupImpl.java
___________________________________________________________________
Name: svn:executable
+ *
Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimpleIntegratedObjectStore.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimpleIntegratedObjectStore.java (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimpleIntegratedObjectStore.java 2007-10-08 17:27:20 UTC (rev 65923)
@@ -0,0 +1,328 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ejb3.cache.impl;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.jboss.ejb3.cache.Cacheable;
+import org.jboss.ejb3.cache.IntegratedObjectStore;
+import org.jboss.ejb3.cache.ItemInUseException;
+import org.jboss.ejb3.cache.ObjectStore;
+import org.jboss.ejb3.cache.PassivatingCache;
+import org.jboss.ejb3.cache.PassivatingIntegratedObjectStore;
+import org.jboss.logging.Logger;
+
+/**
+ * A {@link IntegratedObjectStore} that delegates to a provided
+ * {@link ObjectStore} for persistence.
+ *
+ * @author Brian Stansberry
+ * @version $Revision$
+ */
+public class SimpleIntegratedObjectStore<T extends Cacheable & Serializable>
+ implements PassivatingIntegratedObjectStore<T>
+{
+ private static final Logger log = Logger.getLogger(SimpleIntegratedObjectStore.class);
+
+ private final ObjectStore<T> store;
+ private Map<Object, T> cache;
+ private Map<Object, Long> passivatedEntries;
+
+ /**
+ * Support callbacks when our SessionTimeoutThread decides to
+ * evict an entry.
+ */
+ private PassivatingCache<T> owningCache;
+ private int interval;
+ private int idleTimeSeconds;
+ private int expirationTimeSeconds;
+ private SessionTimeoutRunner sessionTimeoutRunner;
+ private String name;
+
+ /**
+ * Create a new SimpleIntegratedObjectStore.
+ *
+ */
+ public SimpleIntegratedObjectStore(ObjectStore<T> store)
+ {
+ this.store = store;
+ this.cache = new HashMap<Object, T>();
+ this.passivatedEntries = new HashMap<Object, Long>();
+ }
+
+ public boolean isClustered()
+ {
+ return false;
+ }
+
+ public T get(Object key)
+ {
+ synchronized (cache)
+ {
+ T entry = cache.get(key);
+ if(entry == null)
+ {
+ entry = store.load(key);
+ if(entry != null)
+ {
+ cache.put(key, entry);
+ passivatedEntries.remove(key);
+ }
+ }
+ return entry;
+ }
+ }
+
+ public void insert(T entry)
+ {
+ Object key = entry.getId();
+ synchronized (cache)
+ {
+ cache.put(key, entry);
+ }
+ }
+
+ public void replicate(T entry)
+ {
+ throw new UnsupportedOperationException("Clustering is not supported by " +
+ getClass().getName());
+ }
+
+ public void passivate(T entry)
+ {
+ synchronized (cache)
+ {
+ Object key = entry.getId();
+ store.store(entry);
+ passivatedEntries.put(key, new Long(entry.getLastUsed()));
+ cache.remove(key);
+ }
+ }
+
+ public T remove(Object id)
+ {
+ synchronized (cache)
+ {
+ T entry = get(id);
+ if (entry != null)
+ {
+ cache.remove(id);
+ }
+ return entry;
+ }
+ }
+
+ public void start()
+ {
+ if (interval > 0)
+ {
+ if (sessionTimeoutRunner == null)
+ {
+ assert name != null : "name has not been set";
+
+ sessionTimeoutRunner = new SessionTimeoutRunner();
+ }
+ sessionTimeoutRunner.start();
+ }
+ }
+
+ public void stop()
+ {
+ if (sessionTimeoutRunner != null)
+ {
+ sessionTimeoutRunner.stop();
+ }
+ }
+
+ // --------------------------------------- PassivatingIntegratedObjectStore
+
+
+ public void setPassivatingCache(PassivatingCache<T> cache)
+ {
+ this.owningCache = cache;
+ }
+
+ public int getInterval()
+ {
+ return interval;
+ }
+
+ public void setInterval(int seconds)
+ {
+ this.interval = seconds;
+ }
+
+ public void runExpiration()
+ {
+ if (expirationTimeSeconds > 0)
+ {
+ long now = System.currentTimeMillis();
+ long minRemovalUse = now - (expirationTimeSeconds * 1000);
+ for (CacheableTimestamp ts : getPassivatedEntries())
+ {
+ try
+ {
+ if (minRemovalUse >= ts.getLastUsed())
+ {
+ remove(ts.getId());
+ }
+ }
+ catch (ItemInUseException ignored)
+ {
+ log.trace("skipping in-use entry " + ts.getId());
+ }
+ }
+ }
+ }
+
+ public void runPassivation()
+ {
+ if (idleTimeSeconds > 0)
+ {
+ long now = System.currentTimeMillis();
+ long minPassUse = now - (idleTimeSeconds * 1000);
+
+ // Scan the in-memory entries for passivation or removal
+ for (CacheableTimestamp ts : getInMemoryEntries())
+ {
+ try
+ {
+ long lastUsed = ts.getLastUsed();
+ if (minPassUse >= lastUsed)
+ {
+ owningCache.passivate(ts.getId());
+ }
+ }
+ catch (ItemInUseException ignored)
+ {
+ log.trace("skipping in-use entry " + ts.getId());
+ }
+ }
+ }
+
+ }
+
+ private SortedSet<CacheableTimestamp> getInMemoryEntries()
+ {
+ SortedSet<CacheableTimestamp> set = new TreeSet<CacheableTimestamp>();
+ for (Map.Entry<Object, T> entry : cache.entrySet())
+ {
+ set.add(new CacheableTimestamp(entry.getKey(), entry.getValue().getLastUsed()));
+ }
+ return set;
+ }
+
+ private SortedSet<CacheableTimestamp> getPassivatedEntries()
+ {
+ SortedSet<CacheableTimestamp> set = new TreeSet<CacheableTimestamp>();
+ for (Map.Entry<Object, Long> entry : passivatedEntries.entrySet())
+ {
+ set.add(new CacheableTimestamp(entry.getKey(), entry.getValue().longValue()));
+ }
+ return set;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+
+ private class SessionTimeoutRunner implements Runnable
+ {
+ private boolean stopped = true;
+ private Thread thread;
+
+ public void run()
+ {
+ while (!stopped)
+ {
+ try
+ {
+ runPassivation();
+ }
+ catch (Exception e)
+ {
+ log.error("Caught exception processing passivations", e);
+ }
+
+ if (!stopped)
+ {
+ try
+ {
+ runExpiration();
+ }
+ catch (Exception e)
+ {
+ log.error("Caught exception processing expirations", e);
+ }
+ }
+
+ if (!stopped)
+ {
+ try
+ {
+ Thread.sleep(interval * 1000);
+ }
+ catch (InterruptedException ignored) {}
+ }
+ }
+ }
+
+ void start()
+ {
+ if (stopped)
+ {
+ thread = new Thread(this, "SessionTimeoutRunner-" + name);
+ thread.setDaemon(true);
+ stopped = false;
+ thread.start();
+ }
+ }
+
+ void stop()
+ {
+ stopped = true;
+ if (thread != null && thread.isAlive())
+ {
+ try
+ {
+ thread.join(1000);
+ }
+ catch (InterruptedException ignored) {}
+
+ if (thread.isAlive())
+ {
+ thread.interrupt();
+ }
+
+ }
+ }
+
+ }
+
+}
Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimplePassivatingCache2.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimplePassivatingCache2.java (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/SimplePassivatingCache2.java 2007-10-08 17:27:20 UTC (rev 65923)
@@ -0,0 +1,210 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ejb3.cache.impl;
+
+import java.io.Serializable;
+
+import javax.ejb.NoSuchEJBException;
+
+import org.jboss.ejb3.cache.Cacheable;
+import org.jboss.ejb3.cache.IntegratedObjectStore;
+import org.jboss.ejb3.cache.ItemInUseException;
+import org.jboss.ejb3.cache.PassivatingCache;
+import org.jboss.ejb3.cache.PassivationManager;
+import org.jboss.ejb3.cache.StatefulObjectFactory;
+import org.jboss.logging.Logger;
+
+/**
+ * Non group-aware {@link PassivatingCache} that uses an {@link IntegratedObjectStore}
+ * to manage data.
+ *
+ * @author Brian Stansberry
+ * @version $Revision: 65339 $
+ */
+public class SimplePassivatingCache2<T extends Cacheable & Serializable> implements PassivatingCache<T>
+{
+ private static final Logger log = Logger.getLogger(SimplePassivatingCache2.class);
+
+ private StatefulObjectFactory<T> factory;
+ private PassivationManager<T> passivationManager;
+ private IntegratedObjectStore<T> store;
+
+ public SimplePassivatingCache2(StatefulObjectFactory<T> factory, PassivationManager<T> passivationManager, IntegratedObjectStore<T> store)
+ {
+ assert factory != null : "factory is null";
+ assert passivationManager != null : "passivationManager is null";
+ assert store != null : "store is null";
+
+ this.factory = factory;
+ this.passivationManager = passivationManager;
+ this.store = store;
+ }
+
+ public boolean isClustered()
+ {
+ return store.isClustered();
+ }
+
+ public void replicate(Object key)
+ {
+ if (!isClustered())
+ {
+ throw new UnsupportedOperationException("Clustering is not supported by " +
+ store.getClass().getName());
+ }
+
+ log.trace("replicate " + key);
+ synchronized (store)
+ {
+ T entry = store.get(key);
+
+ if(entry == null)
+ throw new IllegalArgumentException("entry " + key + " not found in cache " + this);
+
+ if(entry.isInUse())
+ {
+ throw new ItemInUseException("entry " + entry + " is in use");
+ }
+
+ passivationManager.preReplicate(entry);
+
+ store.replicate(entry);
+ }
+ }
+
+ public T create(Class<?>[] initTypes, Object[] initValues)
+ {
+ T obj = factory.create(initTypes, initValues);
+ synchronized (store)
+ {
+ store.insert(obj);
+ }
+ return obj;
+ }
+
+ public T get(Object key) throws NoSuchEJBException
+ {
+ synchronized (store)
+ {
+ T entry = store.get(key);
+ if(entry == null)
+ throw new NoSuchEJBException(String.valueOf(key));
+
+ if (isClustered())
+ {
+ passivationManager.postReplicate(entry);
+ }
+
+ passivationManager.postActivate(entry);
+
+// if(entry.getCacheState() != Cacheable.State.READY)
+// {
+// throw new IllegalStateException("entry " + key + " is not ready");
+// }
+ entry.setInUse(true);
+ return entry;
+ }
+ }
+
+ public void passivate(Object key)
+ {
+ log.trace("passivate " + key);
+ synchronized (store)
+ {
+ T entry = store.get(key);
+
+ if(entry == null)
+ throw new IllegalArgumentException("entry " + key + " not found in cache " + this);
+
+ if(entry.isInUse())
+ {
+ throw new ItemInUseException("entry " + entry + " is in use");
+ }
+
+ passivationManager.prePassivate(entry);
+
+ store.passivate(entry);
+ }
+ }
+
+ public T peek(Object key) throws NoSuchEJBException
+ {
+ synchronized (store)
+ {
+ T entry = store.get(key);
+ if(entry == null)
+ throw new NoSuchEJBException(String.valueOf(key));
+
+ if (isClustered())
+ {
+ passivationManager.postReplicate(entry);
+ }
+
+ passivationManager.postActivate(entry);
+
+ return entry;
+ }
+ }
+
+ public void release(T obj)
+ {
+ releaseByKey(obj.getId());
+ }
+
+ protected T releaseByKey(Object key)
+ {
+ synchronized (store)
+ {
+ T entry = store.get(key);
+ if(entry == null)
+ throw new IllegalStateException("object " + key + " not from this cache");
+ if(!entry.isInUse())
+ throw new IllegalStateException("entry " + entry + " is not in use");
+ entry.setInUse(false);
+ return entry;
+ }
+ }
+
+ public void remove(Object key)
+ {
+ T entry;
+ synchronized (store)
+ {
+ entry = store.remove(key);
+ if(entry.isInUse())
+ throw new ItemInUseException("entry " + entry + " is in use");
+ }
+ if(entry != null)
+ factory.destroy(entry);
+ }
+
+ public void start()
+ {
+ store.start();
+ }
+
+ public void stop()
+ {
+ store.stop();
+ }
+
+}
More information about the jboss-cvs-commits
mailing list