[jboss-cvs] JBossAS SVN: r70922 - in projects/ejb3/branches/cluster-dev/ejb3-cache/src: main/java/org/jboss/ejb3/cache/impl/backing and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Mar 17 16:08:59 EDT 2008
Author: bstansberry at jboss.com
Date: 2008-03-17 16:08:59 -0400 (Mon, 17 Mar 2008)
New Revision: 70922
Added:
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/PassivatingBackingCacheEntry.java
Modified:
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/GroupAwareTransactionalCache.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/TransactionalCache.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/GroupAwareBackingCacheImpl.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/NonPassivatingBackingCacheEntry.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/PassivatingBackingCacheImpl.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SerializationGroupImpl.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SerializationGroupMemberContainer.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SerializationGroupMemberImpl.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SimplePassivatingIntegratedObjectStore.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/GroupAwareBackingCache.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/IntegratedObjectStore.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/PassivatingBackingCache.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/PassivatingIntegratedObjectStore.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/SerializationGroup.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/SerializationGroupMember.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/impl/AbstractPassivatingIntegratedObjectStore.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/distributed/GroupedPassivatingUnitTestCase.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/distributed/MockJBCIntegratedObjectStore.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockBeanContext.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockCacheItem.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockEntity.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockIdentifiable.java
projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockXPC.java
Log:
[EJBTHREE-1026] More locking tweaks; initial replication fixes
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/GroupAwareTransactionalCache.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/GroupAwareTransactionalCache.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/GroupAwareTransactionalCache.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -30,24 +30,21 @@
import org.jboss.ejb3.cache.api.Cache;
import org.jboss.ejb3.cache.api.CacheItem;
-import org.jboss.ejb3.cache.spi.BackingCacheEntry;
import org.jboss.ejb3.cache.spi.GroupAwareBackingCache;
import org.jboss.ejb3.cache.spi.SerializationGroup;
+import org.jboss.ejb3.cache.spi.SerializationGroupMember;
import org.jboss.ejb3.cache.spi.SynchronizationCoordinator;
import org.jboss.ejb3.cache.spi.impl.GroupCreationContext;
import org.jboss.ejb3.cache.spi.impl.ItemCachePair;
-import org.jboss.logging.Logger;
/**
* {@link Cache#isGroupAware Group-aware} version of {@link TransactionalCache}.
*
* @author Brian Stansberry
*/
-public class GroupAwareTransactionalCache<C extends CacheItem, T extends BackingCacheEntry<C>>
+public class GroupAwareTransactionalCache<C extends CacheItem, T extends SerializationGroupMember<C>>
extends TransactionalCache<C, T>
{
- private static final Logger log = Logger.getLogger(GroupAwareTransactionalCache.class);
-
/**
* Another ref to super.delegate. Just saves having to do casts all the time.
*/
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/TransactionalCache.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/TransactionalCache.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/TransactionalCache.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -40,6 +40,7 @@
import org.jboss.ejb3.cache.spi.IntegratedObjectStore;
import org.jboss.ejb3.cache.spi.SynchronizationCoordinator;
import org.jboss.ejb3.cache.spi.impl.GroupCreationContext;
+import org.jboss.logging.Logger;
/**
* Non-group-aware <code>Cache</code> implementation
@@ -61,6 +62,8 @@
*/
public class TransactionalCache<C extends CacheItem, T extends BackingCacheEntry<C>> implements Cache<C>
{
+ protected final Logger log = Logger.getLogger(getClass().getName());
+
/** BackingCache that handles passivation, groups, etc */
private final BackingCache<C, T> delegate;
@@ -322,6 +325,8 @@
if (entry == null)
{
// TODO is this correct?
+ if (log.isTraceEnabled())
+ log.trace("Item " + key + " is not in use; cannot release");
return;
}
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/GroupAwareBackingCacheImpl.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/GroupAwareBackingCacheImpl.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/GroupAwareBackingCacheImpl.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -21,8 +21,6 @@
*/
package org.jboss.ejb3.cache.impl.backing;
-import javax.ejb.NoSuchEJBException;
-
import org.jboss.ejb3.cache.api.CacheItem;
import org.jboss.ejb3.cache.spi.GroupAwareBackingCache;
import org.jboss.ejb3.cache.spi.PassivatingBackingCache;
@@ -44,6 +42,8 @@
*/
private PassivatingBackingCache<C, SerializationGroup<C>> groupCache;
+ private SerializationGroupMemberContainer<C> memberContainer;
+
/**
* Creates a new GroupAwareCacheImpl.
*
@@ -69,47 +69,53 @@
{
Object key = obj.getId();
SerializationGroupMember<C> entry = peek(key);
- if(entry.getGroup() != null)
- throw new IllegalStateException("object " + key + " is already associated with passivation group " + entry.getGroup());
-
- // Validate we share a common groupCache with the group
- if (groupCache != group.getGroupCache())
- throw new IllegalStateException(obj + " and " + group + " use different group caches");
-
- entry.setGroup(group);
- entry.getGroup().addMember(entry);
+ entry.lock();
+ try
+ {
+ if(entry.getGroup() != null)
+ throw new IllegalStateException("object " + key + " is already associated with passivation group " + entry.getGroup());
+
+ // Validate we share a common groupCache with the group
+ if (groupCache != group.getGroupCache())
+ throw new IllegalStateException(obj + " and " + group + " use different group caches");
+
+
+ entry.setGroup(group);
+ entry.getGroup().addMember(entry);
+ }
+ finally
+ {
+ entry.unlock();
+ }
}
- public SerializationGroup<C> getGroup(C obj)
+ public void notifyPreReplicate(SerializationGroupMember<C> entry)
{
- Object key = obj.getId();
- try
+ log.trace("notifyPreReplicate " + entry);
+
+ if (!entry.isPreReplicated())
{
- SerializationGroupMember<C> entry = peek(key);
- synchronized (entry)
+ // We just *try* to lock; a preReplication is low priority.
+ if (!entry.tryLock())
+ throw new IllegalStateException("entry " + entry + " is in use");
+
+ try
{
- SerializationGroup<C> group = entry.getGroup();
- if (group == null && entry.getGroupId() != null)
+ if(entry.isInUse())
{
- // Need to use get() to postActivate it
- entry = get(key);
- try
- {
- group = entry.getGroup();
- }
- finally
- {
- release(key);
- }
+ throw new IllegalStateException("entry " + entry + " is in use");
}
- return group;
+
+ memberContainer.preReplicate(entry);
+
+ entry.setPreReplicated(true);
}
+ finally
+ {
+ entry.unlock();
+ }
}
- catch (NoSuchEJBException nsee)
- {
- return null;
- }
- }
+ }
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/NonPassivatingBackingCacheEntry.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/NonPassivatingBackingCacheEntry.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/NonPassivatingBackingCacheEntry.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -65,6 +65,12 @@
return wrapped.isModified();
}
+ /**
+ * {@inheritDoc}
+ *
+ * @return the id of the {@link BackingCacheEntry#getUnderlyingItem() underlying item}.
+ * Cannot be <code>null</code>.
+ */
public Object getId()
{
return wrapped.getId();
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/PassivatingBackingCacheImpl.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/PassivatingBackingCacheImpl.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/PassivatingBackingCacheImpl.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -28,8 +28,8 @@
import org.jboss.ejb3.cache.api.CacheItem;
import org.jboss.ejb3.cache.api.PassivationManager;
import org.jboss.ejb3.cache.api.StatefulObjectFactory;
-import org.jboss.ejb3.cache.spi.BackingCacheEntry;
import org.jboss.ejb3.cache.spi.PassivatingBackingCache;
+import org.jboss.ejb3.cache.spi.PassivatingBackingCacheEntry;
import org.jboss.ejb3.cache.spi.PassivatingIntegratedObjectStore;
import org.jboss.logging.Logger;
@@ -40,10 +40,10 @@
* @author Brian Stansberry
* @version $Revision: 65339 $
*/
-public class PassivatingBackingCacheImpl<C extends CacheItem, T extends BackingCacheEntry<C>>
+public class PassivatingBackingCacheImpl<C extends CacheItem, T extends PassivatingBackingCacheEntry<C>>
implements PassivatingBackingCache<C, T>
{
- private static final Logger log = Logger.getLogger(PassivatingBackingCacheImpl.class);
+ protected final Logger log = Logger.getLogger(getClass().getName());
private StatefulObjectFactory<T> factory;
private PassivationManager<T> passivationManager;
@@ -77,12 +77,16 @@
public T get(Object key) throws NoSuchEJBException
{
+ if (log.isTraceEnabled())
+ log.trace("get(): " + key);
+
T entry = store.get(key);
if(entry == null)
throw new NoSuchEJBException(String.valueOf(key));
- synchronized (entry)
+ entry.lock();
+ try
{
if (isClustered())
{
@@ -96,18 +100,26 @@
entry.setInUse(true);
return entry;
}
+ finally
+ {
+ entry.unlock();
+ }
}
public void passivate(Object key)
{
- log.trace("passivate " + key);
+ log.trace("passivate(): " + key);
T entry = store.get(key);
if(entry == null)
throw new IllegalArgumentException("entry " + key + " not found in cache " + this);
-
- synchronized (entry)
+
+ // We just *try* to lock; a passivation is low priority.
+ if (!entry.tryLock())
+ throw new IllegalStateException("entry " + entry + " is in use");
+
+ try
{
if(entry.isInUse())
{
@@ -120,10 +132,17 @@
store.passivate(entry);
}
+ finally
+ {
+ entry.unlock();
+ }
}
public T peek(Object key) throws NoSuchEJBException
{
+ if (log.isTraceEnabled())
+ log.trace("peek(): " + key);
+
T entry = store.get(key);
if(entry == null)
throw new NoSuchEJBException(String.valueOf(key));
@@ -132,40 +151,57 @@
public T release(Object key)
{
+ if (log.isTraceEnabled())
+ log.trace("release(): " + key);
+
T entry = store.get(key);
if(entry == null)
throw new IllegalStateException("object " + key + " not from this cache");
- synchronized (entry)
+ entry.lock();
+ try
{
entry.setInUse(false);
- if (entry.isModified())
+ boolean modified = entry.isModified();
+ if (modified)
{
if (isClustered())
{
passivationManager.preReplicate(entry);
}
- store.update(entry);
}
-
+
+ store.update(entry, modified);
return entry;
}
+ finally
+ {
+ entry.unlock();
+ }
}
public void remove(Object key)
{
+ if (log.isTraceEnabled())
+ log.trace("remove(): " + key);
+
T entry = store.remove(key);
- if (entry != null)
+
+ if(entry == null)
+ throw new NoSuchEJBException(String.valueOf(key));
+
+ entry.lock();
+ try
{
- synchronized (entry)
- {
- if(entry.isInUse())
- entry.setInUse(false);
- factory.destroy(entry);
- }
+ if(entry.isInUse())
+ entry.setInUse(false);
+ factory.destroy(entry);
+ }
+ finally
+ {
+ entry.unlock();
}
-
}
public void start()
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SerializationGroupImpl.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SerializationGroupImpl.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SerializationGroupImpl.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
import org.jboss.ejb3.cache.api.CacheItem;
import org.jboss.ejb3.cache.api.Identifiable;
@@ -92,6 +93,8 @@
private transient boolean invalid;
+ private transient ReentrantLock lock = new ReentrantLock();
+
public Object getId()
{
return id;
@@ -185,9 +188,7 @@
{
for(SerializationGroupMember<T> member : active.values())
{
- member.releaseReferences();
- if(true)
- throw new IllegalStateException("this doesn't invoke prePassivate callbacks!");
+ member.prePassivate();
}
active.clear();
}
@@ -207,9 +208,7 @@
{
for(SerializationGroupMember<T> member : active.values())
{
- member.releaseReferences();
- if(true)
- throw new IllegalStateException("this doesn't invoke preReplicate callbacks!");
+ member.preReplicate();
}
active.clear();
}
@@ -283,8 +282,11 @@
public void removeInUse(Object key)
{
if (inUseKeys.remove(key))
- {
- setLastUsed(System.currentTimeMillis());
+ {
+ if (inUseKeys.size() == 0)
+ setInUse(false);
+ else
+ setLastUsed(System.currentTimeMillis());
}
else if (!getMemberObjects().containsKey(key))
{
@@ -336,7 +338,32 @@
{
return groupCache;
}
+
+ public void lock()
+ {
+ try
+ {
+ lock.lockInterruptibly();
+ }
+ catch (InterruptedException ie)
+ {
+ throw new RuntimeException("interrupted waiting for lock");
+ }
+ }
+
+ public boolean tryLock()
+ {
+ return lock.tryLock();
+ }
+
+ public void unlock()
+ {
+ if (lock.isHeldByCurrentThread())
+ lock.unlock();
+
+ }
+
@Override
public String toString()
{
@@ -374,6 +401,7 @@
throws IOException, ClassNotFoundException
{
in.defaultReadObject();
+ lock = new ReentrantLock();
marshalledMembers= (MarshalledObject) in.readObject();
active = new HashMap<Object, SerializationGroupMember<T>>();
inUseKeys = new HashSet<Object>();
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SerializationGroupMemberContainer.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SerializationGroupMemberContainer.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SerializationGroupMemberContainer.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -27,6 +27,7 @@
import org.jboss.ejb3.cache.api.CacheItem;
import org.jboss.ejb3.cache.api.PassivationManager;
import org.jboss.ejb3.cache.api.StatefulObjectFactory;
+import org.jboss.ejb3.cache.spi.GroupAwareBackingCache;
import org.jboss.ejb3.cache.spi.PassivatingBackingCache;
import org.jboss.ejb3.cache.spi.PassivatingIntegratedObjectStore;
import org.jboss.ejb3.cache.spi.SerializationGroup;
@@ -47,7 +48,7 @@
private StatefulObjectFactory<C> factory;
private PassivationManager<C> passivationManager;
private PassivatingIntegratedObjectStore<C, SerializationGroupMember<C>> store;
- private PassivatingBackingCache<C, SerializationGroupMember<C>> delegate;
+ private GroupAwareBackingCache<C, SerializationGroupMember<C>> delegate;
/**
* Cache that's managing the PassivationGroup
@@ -95,7 +96,7 @@
public void postActivate(SerializationGroupMember<C> entry)
{
- log.trace("post activate " + entry);
+ log.trace("postActivate(): " + entry);
boolean groupOK = false;
while (!groupOK)
@@ -114,7 +115,8 @@
if(group != null)
{
- synchronized (group)
+ group.lock();
+ try
{
if (!group.isInvalid())
{
@@ -126,6 +128,10 @@
}
// else groupOK == false and we loop again
}
+ finally
+ {
+ group.unlock();
+ }
}
else
{
@@ -162,7 +168,9 @@
SerializationGroup<C> group = entry.getGroup();
if(group != null)
{
- synchronized (group)
+ if (!group.tryLock())
+ throw new IllegalStateException("Cannot obtain lock on " + group.getId() + " to passivate " + entry);
+ try
{
if (!group.isInvalid())
{
@@ -180,12 +188,18 @@
// we realize they have a ref to an out-of-date group
group.setInvalid(true);
}
- // else {
- // this turns into a pretty meaningless exercise of just
- // passivating an empty entry. TODO consider throwing
- // ItemInUseException here, thus aborting everything. Need to
- // be sure that doesn't lead to problems as the exception propagates
- // }
+ else {
+ // this turns into a pretty meaningless exercise of just
+ // passivating an empty entry. TODO consider throwing
+ // ItemInUseException here, thus aborting everything. Need to
+ // be sure that doesn't lead to problems as the exception propagates
+ if (log.isTraceEnabled())
+ {
+ log.trace("Group " + group.getId() + " has " +
+ group.getInUseCount() + " in-use members; " +
+ "not passivating group for " + entry.getId());
+ }
+ }
// This call didn't come through entry.prePassivate() (which nulls
// group and obj) so we have to do it ourselves. Otherwise
@@ -195,6 +209,10 @@
entry.setUnderlyingItem(null);
}
}
+ finally
+ {
+ group.unlock();
+ }
}
}
@@ -214,7 +232,8 @@
SerializationGroup<C> group = entry.getGroup();
if(group != null)
{
- synchronized (group)
+ group.lock();
+ try
{
// Remove ourself from group's active list so we don't get
// called again via entry.prePassivate()
@@ -224,7 +243,7 @@
{
if (group.getInUseCount() == 0)
{
- groupCache.release(group);
+ group.getGroupCache().release(group.getId());
}
}
finally
@@ -238,12 +257,16 @@
// entry.setGroup(null);
// entry.setUnderlyingItem(null);
}
+ finally
+ {
+ group.unlock();
+ }
}
}
public void postReplicate(SerializationGroupMember<C> entry)
{
- log.trace("postreplicate " + entry);
+ log.trace("postReplicate(): " + entry);
boolean groupOK = false;
while (!groupOK)
@@ -262,7 +285,8 @@
if(group != null)
{
- synchronized (group)
+ group.lock();
+ try
{
if (!group.isInvalid())
{
@@ -275,6 +299,10 @@
}
// else groupOK == false and we loop again
}
+ finally
+ {
+ group.unlock();
+ }
}
else
{
@@ -290,9 +318,9 @@
}
}
- public void update(SerializationGroupMember<C> entry)
+ public void update(SerializationGroupMember<C> entry, boolean modified)
{
- store.update(entry);
+ store.update(entry, modified);
}
public boolean isClustered()
@@ -346,7 +374,12 @@
public void setPassivatingCache(PassivatingBackingCache<C, SerializationGroupMember<C>> cache)
{
- this.delegate= cache;
+ if (! (cache instanceof GroupAwareBackingCache))
+ {
+ throw new IllegalArgumentException("cache must implement GroupAwareBackingCache");
+ }
+
+ this.delegate= (GroupAwareBackingCache<C, SerializationGroupMember<C>>) cache;
this.store.setPassivatingCache(delegate);
}
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SerializationGroupMemberImpl.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SerializationGroupMemberImpl.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SerializationGroupMemberImpl.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -23,12 +23,16 @@
package org.jboss.ejb3.cache.impl.backing;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import org.jboss.ejb3.cache.api.CacheItem;
+import org.jboss.ejb3.cache.spi.GroupAwareBackingCache;
import org.jboss.ejb3.cache.spi.PassivatingBackingCache;
import org.jboss.ejb3.cache.spi.SerializationGroup;
import org.jboss.ejb3.cache.spi.SerializationGroupMember;
import org.jboss.ejb3.cache.spi.impl.AbstractBackingCacheEntry;
+import org.jboss.logging.Logger;
import org.jboss.serial.io.MarshalledObject;
/**
@@ -43,6 +47,8 @@
/** The serialVersionUID */
private static final long serialVersionUID = 7268142730501106252L;
+ private static final Logger log = Logger.getLogger(SerializationGroupMemberImpl.class);
+
/**
* Identifier for our underlying object
*/
@@ -77,17 +83,20 @@
private boolean preReplicated;
+ private transient ReentrantLock lock = new ReentrantLock();
+ private transient boolean groupLockHeld;
+
/** The cache that's handling us */
-// private transient PassivatingBackingCache<T, SerializationGroupMember<T>> cache;
+ private transient GroupAwareBackingCache<T, SerializationGroupMember<T>> cache;
- public SerializationGroupMemberImpl(T obj, PassivatingBackingCache<T, SerializationGroupMember<T>> cache)
+ public SerializationGroupMemberImpl(T obj, GroupAwareBackingCache<T, SerializationGroupMember<T>> cache)
{
assert obj != null : "obj is null";
assert cache != null : "cache is null";
this.obj = transientObj = obj;
this.id = obj.getId();
-// this.cache = cache;
+ this.cache = cache;
this.clustered = cache.isClustered();
}
@@ -140,11 +149,25 @@
SerializationGroup<T> result = group;
if (result != null)
{
- synchronized (result)
+ boolean localGroupLock = false;
+ if (!groupLockHeld)
{
+ group.lock();
+ localGroupLock = groupLockHeld = true;
+ }
+ try
+ {
if (result.isInvalid())
result = null;
}
+ finally
+ {
+ if (localGroupLock)
+ {
+ group.unlock();
+ groupLockHeld = false;
+ }
+ }
}
return result;
}
@@ -156,9 +179,26 @@
*/
public void setGroup(SerializationGroup<T> group)
{
- this.group = group;
- if (this.groupId == null && group != null)
- this.groupId = group.getId();
+ if (this.group != group)
+ {
+ // Remove any lock held on existing group
+ if (group == null && groupLockHeld)
+ {
+ this.group.unlock();
+ groupLockHeld = false;
+ }
+
+ this.group = group;
+
+ if (this.groupId == null && group != null)
+ this.groupId = group.getId();
+
+ if (group != null && lock.isHeldByCurrentThread())
+ {
+ group.lock();
+ groupLockHeld = true;
+ }
+ }
}
/**
@@ -170,26 +210,79 @@
{
return groupId;
}
-
- /**
- * Prepares the group member for group passivation by ensuring any
- * reference to the {@link #getUnderlyingItem() underlying object} or to the
- * {@link #getGroup()} is nulled.
- */
- public void releaseReferences()
+
+ public void prePassivate()
{
- // make sure we don't passivate the group twice
- group = null;
+ // By the time we get here this thread has already locked the
+ // group. It's possible another thread has locked this member
+ // but not yet the group. We're willing to wait 2ms for that
+ // other thread to see it can't lock the group and release this
+ // member's lock, which it will only do if it is another
+ // passivation thread. A request thread will hold the lock and
+ // we'll fail (as intended)
+ if (tryLock(2))
+ {
+ try
+ {
+ // make sure we don't passivate the group twice
+ // use the setter to clear any group lock
+ setGroup(null);
+
+ // null out obj so when delegate passivates this entry
+ // we don't serialize it. It serializes with the PassivationGroup only
+ // We still have a ref to transientObj, so it can be retrieved
+ // for passivation callbacks
+ obj = null;
+
+ cache.passivate(this.id);
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+ else
+ {
+ throw new IllegalStateException(id + " is in use");
+ }
- // null out obj so when delegate passivates this entry
- // we don't serialize it. It serializes with the PassivationGroup only
- // We still have a ref to transientObj, so it can be retrieved
- // for passivation callbacks
- obj = null;
-
-// cache.passivate(this.id);
}
+ public void preReplicate()
+ {
+ // By the time we get here this thread has already locked the
+ // group. It's possible another thread has locked this member
+ // but not yet the group. We're willing to wait 2ms for that
+ // other thread to see it can't lock the group and release this
+ // member's lock, which it will only do if it is a passivation thread
+ // A request thread will hold the lock and we'll fail (as intended)
+ if (tryLock(2))
+ {
+ try
+ {
+ // make sure we don't passivate the group twice
+ // use the setter to clear any group lock
+ setGroup(null);
+
+ // null out obj so when delegate passivates this entry
+ // we don't serialize it. It serializes with the PassivationGroup only
+ // We still have a ref to transientObj, so it can be retrieved
+ // for passivation callbacks
+ obj = null;
+
+ cache.notifyPreReplicate(this);
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+ else
+ {
+ throw new IllegalStateException(id + " is in use");
+ }
+ }
+
/**
* Notification that the group has been activated from a passivated state.
*/
@@ -224,13 +317,27 @@
// Tell our group about it
if (group != null)
{
- synchronized (group)
+ boolean localGroupLock = false;
+ if (!groupLockHeld)
{
+ group.lock();
+ localGroupLock = groupLockHeld = true;
+ }
+ try
+ {
if (inUse)
- group.addActive(this);
+ group.addInUse(id);
else
- group.removeActive(id);
+ group.removeInUse(id);
}
+ finally
+ {
+ if (localGroupLock)
+ {
+ group.unlock();
+ groupLockHeld = false;
+ }
+ }
}
}
@@ -240,13 +347,107 @@
*
* @param delegate
*/
- public void setPassivatingCache(PassivatingBackingCache<T, SerializationGroupMember<T>> delegate)
+ public void setPassivatingCache(GroupAwareBackingCache<T, SerializationGroupMember<T>> delegate)
{
-// assert delegate != null : "delegate is null";
-//
-// this.cache = delegate;
+ assert delegate != null : "delegate is null";
+
+ this.cache = delegate;
+ }
+
+ public void lock()
+ {
+ try
+ {
+ lock.lockInterruptibly();
+ }
+ catch (InterruptedException ie)
+ {
+ throw new RuntimeException("interrupted waiting for lock");
+ }
+
+ if (!groupLockHeld && group != null)
+ {
+ try
+ {
+ group.lock();
+ groupLockHeld = true;
+ }
+ finally
+ {
+ if (!groupLockHeld)
+ lock.unlock();
+ }
+ }
}
+ public boolean tryLock()
+ {
+ boolean success = lock.tryLock();
+ if (success)
+ {
+ success = (groupLockHeld || group == null);
+ if (!success)
+ {
+ try
+ {
+ success = groupLockHeld = group.tryLock();
+ if (!success && log.isTraceEnabled())
+ log.trace("Member " + id + " cannot lock group " + groupId);
+ }
+ finally
+ {
+ if (!success)
+ lock.unlock();
+ }
+ }
+ }
+ return success;
+ }
+
+ private boolean tryLock(long wait)
+ {
+ boolean success = false;
+ try
+ {
+ success = lock.tryLock(wait, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ie)
+ {
+ // success remains false
+ }
+ if (success)
+ {
+ success = (groupLockHeld || group == null);
+ if (!success)
+ {
+ try
+ {
+ success = groupLockHeld = group.tryLock();
+ if (!success && log.isTraceEnabled())
+ log.trace("Member " + id + " cannot lock group " + groupId);
+ }
+ finally
+ {
+ if (!success)
+ lock.unlock();
+ }
+ }
+ }
+ return success;
+
+ }
+
+ public void unlock()
+ {
+ if (groupLockHeld && lock.getHoldCount() == 1)
+ {
+ // time to release our group lock
+ group.unlock();
+ groupLockHeld = false;
+ }
+ lock.unlock();
+ }
+
@Override
public String toString()
{
@@ -278,6 +479,7 @@
throws IOException, ClassNotFoundException
{
in.defaultReadObject();
+ lock = new ReentrantLock();
if (groupId == null)
{
marshalledObj = (MarshalledObject) in.readObject();
@@ -288,7 +490,7 @@
{
if (groupId != null)
{
- group = null;
+ setGroup(null);
obj = null;
}
out.defaultWriteObject();
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SimplePassivatingIntegratedObjectStore.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SimplePassivatingIntegratedObjectStore.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/impl/backing/SimplePassivatingIntegratedObjectStore.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -26,11 +26,12 @@
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import org.jboss.ejb3.annotation.CacheConfig;
import org.jboss.ejb3.cache.api.CacheItem;
-import org.jboss.ejb3.cache.spi.BackingCacheEntry;
import org.jboss.ejb3.cache.spi.ObjectStore;
+import org.jboss.ejb3.cache.spi.PassivatingBackingCacheEntry;
import org.jboss.ejb3.cache.spi.PassivatingIntegratedObjectStore;
import org.jboss.ejb3.cache.spi.impl.AbstractPassivatingIntegratedObjectStore;
import org.jboss.ejb3.cache.spi.impl.CacheableTimestamp;
@@ -44,7 +45,7 @@
* @author Brian Stansberry
* @version $Revision$
*/
-public class SimplePassivatingIntegratedObjectStore<C extends CacheItem, T extends BackingCacheEntry<C>>
+public class SimplePassivatingIntegratedObjectStore<C extends CacheItem, T extends PassivatingBackingCacheEntry<C>>
extends AbstractPassivatingIntegratedObjectStore<C, T>
{
private static final Logger log = Logger.getLogger(SimplePassivatingIntegratedObjectStore.class);
@@ -66,7 +67,7 @@
assert store != null : "store is null";
this.store = store;
- this.cache = new HashMap<Object, T>();
+ this.cache = new ConcurrentHashMap<Object, T>();
if (!forGroups)
{
this.passivatedEntries = new HashMap<Object, Long>();
@@ -80,57 +81,48 @@
public T get(Object key)
{
- synchronized (cache)
+ T entry = cache.get(key);
+ if(entry == null)
{
- T entry = cache.get(key);
- if(entry == null)
+ entry = store.load(key);
+ if(entry != null)
{
- entry = store.load(key);
- if(entry != null)
+ cache.put(key, entry);
+ if (!isForGroups())
{
- cache.put(key, entry);
- if (!isForGroups())
- {
- passivatedEntries.remove(key);
- }
+ passivatedEntries.remove(key);
}
}
- return entry;
}
+ return entry;
}
public void insert(T entry)
{
Object key = entry.getId();
- synchronized (cache)
+ if (cache.containsKey(key)
+ || (!isForGroups() && passivatedEntries.containsKey(key)))
{
- if (cache.containsKey(key)
- || (!isForGroups() && passivatedEntries.containsKey(key)))
- {
- throw new IllegalStateException(key + " is already in store");
- }
- cache.put(key, entry);
+ throw new IllegalStateException(key + " is already in store");
}
+ cache.put(key, entry);
}
- public void update(T entry)
+ public void update(T entry, boolean modified)
{
Object key = entry.getId();
- synchronized (cache)
+ if (!cache.containsKey(key) &&
+ (isForGroups() || !passivatedEntries.containsKey(key)))
{
- if (!cache.containsKey(key) &&
- (isForGroups() || !passivatedEntries.containsKey(key)))
- {
- throw new IllegalStateException(key + " is not managed by this store");
- }
-
- // Otherwise we do nothing; we already have a ref to the entry
+ throw new IllegalStateException(key + " is not managed by this store");
}
+
+ // Otherwise we do nothing; we already have a ref to the entry
}
public void passivate(T entry)
{
- synchronized (cache)
+ synchronized (entry)
{
Object key = entry.getId();
store.store(entry);
@@ -144,15 +136,12 @@
public T remove(Object id)
{
- synchronized (cache)
+ T entry = get(id);
+ if (entry != null)
{
- T entry = get(id);
- if (entry != null)
- {
- cache.remove(id);
- }
- return entry;
+ cache.remove(id);
}
+ return entry;
}
public void start()
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/GroupAwareBackingCache.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/GroupAwareBackingCache.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/GroupAwareBackingCache.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -31,7 +31,7 @@
* @author Brian Stansberry
* @version $Revision$
*/
-public interface GroupAwareBackingCache<C extends CacheItem, T extends BackingCacheEntry<C>>
+public interface GroupAwareBackingCache<C extends CacheItem, T extends SerializationGroupMember<C>>
extends PassivatingBackingCache<C, T>
{
/**
@@ -54,13 +54,26 @@
* with ourself.
*/
void setGroup(C obj, SerializationGroup<C> group);
-
+
/**
- * Gets the group the given object is a member of
+ * Callback from the group informing the cache it needs to invoke
+ * pre-passivation callbacks on the member.
*
- * @param obj the object
- * @return the group, or <code>null</code> if the object is not a member
- * of a group
+ * @param member the group member
+ *
+ * @throws IllegalStateException if the member is
+ * {@link BackingCacheEntry#isInUse() in-use}.
*/
- SerializationGroup<C> getGroup(C obj);
+// void prePassivate(T member);
+
+ /**
+ * Callback from the group informing the cache it needs to invoke
+ * pre-replication callbacks on the member.
+ *
+ * @param member the group member
+ *
+ * @throws IllegalStateException if the member is
+ * {@link BackingCacheEntry#isInUse() in-use}.
+ */
+ void notifyPreReplicate(T member);
}
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/IntegratedObjectStore.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/IntegratedObjectStore.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/IntegratedObjectStore.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -63,6 +63,7 @@
* Update an already cached item.
*
* @param entry the entry to update
+ * @param modified was the entry modified since {@link #get(Object)} was called?
*
* @throws IllegalStateException if the store isn't already managing an entry
* with the same {@link Identifiable#getId() id}.
@@ -72,7 +73,7 @@
* callers to ensure {@link #insert(Cacheable)}
* is invoked before the first replication.
*/
- void update(T entry);
+ void update(T entry, boolean modified);
/**
* Remove the object with the given key from the store.
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/PassivatingBackingCache.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/PassivatingBackingCache.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/PassivatingBackingCache.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -37,7 +37,7 @@
*
* @version $Revision: 65977 $
*/
-public interface PassivatingBackingCache<C extends CacheItem, T extends BackingCacheEntry<C>>
+public interface PassivatingBackingCache<C extends CacheItem, T extends PassivatingBackingCacheEntry<C>>
extends BackingCache<C, T>
{
/**
Added: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/PassivatingBackingCacheEntry.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/PassivatingBackingCacheEntry.java (rev 0)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/PassivatingBackingCacheEntry.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ejb3.cache.spi;
+
+import org.jboss.ejb3.cache.api.CacheItem;
+
+/**
+ * @author Brian Stansberry
+ *
+ */
+public interface PassivatingBackingCacheEntry<T extends CacheItem>
+ extends BackingCacheEntry<T>
+{
+
+ boolean tryLock();
+ void lock();
+ void unlock();
+
+}
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/PassivatingIntegratedObjectStore.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/PassivatingIntegratedObjectStore.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/PassivatingIntegratedObjectStore.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -33,7 +33,7 @@
* @author Brian Stansberry
* @version $Revision$
*/
-public interface PassivatingIntegratedObjectStore<C extends CacheItem, T extends BackingCacheEntry<C>>
+public interface PassivatingIntegratedObjectStore<C extends CacheItem, T extends PassivatingBackingCacheEntry<C>>
extends IntegratedObjectStore<T>, PassivationExpirationProcessor
{
/**
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/SerializationGroup.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/SerializationGroup.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/SerializationGroup.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -1,3 +1,25 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
package org.jboss.ejb3.cache.spi;
import org.jboss.ejb3.cache.api.CacheItem;
@@ -2,20 +24,137 @@
+/**
+ * Specialized {@link BackingCacheEntry} that represents a group of
+ * underlying items that must always be serialized as a group and whose
+ * members must have coordinated calls to passivation and replication
+ * related callbacks.
+ * <p>
+ * The underlying items in the group are represented as instances of
+ * {@link SerializationGroupMember}.
+ * </p>
+ *
+ * @author Brian Stansberry
+ *
+ * @param <T> the type of the underlying items that form the group.
+ */
public interface SerializationGroup<T extends CacheItem>
- extends BackingCacheEntry<T>
+ extends PassivatingBackingCacheEntry<T>
{
+ /**
+ * Adds a member to the group.
+ *
+ * @param member the member. Cannot be <code>null</code>.
+ */
void addMember(SerializationGroupMember<T> member);
+
+ /**
+ * Removes a member from the group.
+ *
+ * @param key the id of the member. Cannot be <code>null</code>.
+ */
void removeMember(Object key);
+
+ /**
+ * Gets the number of group members.
+ */
int size();
+
+ /**
+ * Gets whether the group should be considered invalid. A reference
+ * to an invalid group should be replaced with a fresh reference gotten
+ * from the group's cache.
+ *
+ * @return
+ */
boolean isInvalid();
+
+ /**
+ * Marks the group as invalid (or once again as valid).
+ */
void setInvalid(boolean invalid);
+
+ /**
+ * Gets the {@link BackingCacheEntry#getUnderlyingItem() underlying item}
+ * whose {@link SerializationGroupMember#getId() id} matches <code>key</code>.
+ *
+ * @param key the id of the member. Cannot be <code>null</code>.
+ *
+ * @return the member's underlying item.
+ */
T getMemberObject(Object key);
+
+ /**
+ * Marks the given member as being "active", i.e. in need of pre-passivation
+ * and pre-replication callbacks before the group is passivated or replicated.
+ *
+ * @param member the member. Cannot be <code>null</code>.
+ */
void addActive(SerializationGroupMember<T> member);
+ /**
+ * Marks the given member as no longer being "active", i.e. as no longer
+ * in need of pre-passivation and pre-replication callbacks before the group
+ * is passivated or replicated.
+ *
+ * @param key the id of the member. Cannot be <code>null</code>.
+ */
void removeActive(Object key);
+
+ /**
+ * Tells the group the given member is "in use". A group should not be
+ * serialized while any members are in use.
+ *
+ * @param key the id of the member. Cannot be <code>null</code>.
+ */
+ void addInUse(Object key);
+
+ /**
+ * Tells the group the given member is no longer "in use".
+ *
+ * @param key the id of the member. Cannot be <code>null</code>.
+ */
+ void removeInUse(Object key);
+
+ /**
+ * Gets the number of group member's currently
+ * {@link BackingCacheEntry#setInUse(boolean) "in use"}.
+ */
int getInUseCount();
+
+ /**
+ * Gets the cache used to manage the group.
+ *
+ * @return the cache. Will not return <code>null</code> if the group is
+ * {@link #isInvalid() valid}.
+ */
PassivatingBackingCache<T, SerializationGroup<T>> getGroupCache();
+
+ /**
+ * Handback provided by the cache managing the group.
+ *
+ * @param groupCache the cache. Cannot be <code>null</code>.
+ */
void setGroupCache(PassivatingBackingCache<T, SerializationGroup<T>> groupCache);
- void postActivate();
- void prePassivate();
+
+ /**
+ * Callback that must be invoked before the group is replicated.
+ */
void preReplicate();
+
+ /**
+ * Callback that must be invoked some time after the group has been
+ * replicated but before a reference to any member of the group is provided
+ * to an external caller.
+ */
void postReplicate();
+
+ /**
+ * Callback that must be invoked before the group is passivated.
+ */
+ void prePassivate();
+
+ /**
+ * Callback that must be invoked some time after the group has been
+ * activated but before a reference to any member of the group is provided
+ * to an external caller.
+ */
+ void postActivate();
}
\ No newline at end of file
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/SerializationGroupMember.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/SerializationGroupMember.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/SerializationGroupMember.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -1,3 +1,25 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
package org.jboss.ejb3.cache.spi;
import org.jboss.ejb3.cache.api.CacheItem;
@@ -2,13 +24,132 @@
+/**
+ * Specialized {@link BackingCacheEntry} that wraps an {@link #setUnderlyingItem(CacheItem)}
+ * and represents it as a strong>potential</strong> member of a
+ * {@link SerializationGroup}. Note that a particular instance need not actually
+ * be a member of a group; such an instance will return <code>null</code>
+ * from {@link #getGroupId()}.
+ *
+ * @author Brian Stansberry
+ *
+ * @param <T> the type of the underlying item
+ */
public interface SerializationGroupMember<T extends CacheItem>
- extends BackingCacheEntry<T>
-{
- Object getGroupId();
+ extends PassivatingBackingCacheEntry<T>
+{
+ /**
+ * {@inheritDoc}
+ *
+ * @return the id of the {@link BackingCacheEntry#getUnderlyingItem() underlying item}.
+ * Cannot be <code>null</code>.
+ */
+ Object getId();
+
+ /**
+ * Gets the group of which this object is a member.
+ *
+ * @return the group. May return <code>null</code> if the instance is
+ * not a member of the group (use {@link #getGroupId()} to check
+ * for this) or does not currently have a reference to its group.
+ */
SerializationGroup<T> getGroup();
+
+ /**
+ * Sets the group to which this object belongs. The first time a non-null
+ * value is passed to this method, a member should set its
+ * {@link #getGroupId() groupId}.
+ *
+ * @param group May be <code>null</code>, which does not mean the member
+ * is no longer part of a group, but rather that any
+ * reference to the group should be released.
+ */
void setGroup(SerializationGroup<T> group);
- void releaseReferences();
- void setPassivatingCache(PassivatingBackingCache<T, SerializationGroupMember<T>> cache);
+
+ /**
+ * Gets the {@link Identifiable#getId()} of this object's
+ * {@link SerializationGroup}. Will return <code>null</code> if this
+ * object has not yet been
+ * {@link #setGroup(SerializationGroup) assigned to a group}.
+ *
+ * @return the group id, or <code>null</code>
+ */
+ Object getGroupId();
+
+ /**
+ * Instructs this group member to ensure any prePassivate callbacks are
+ * invoked on the underlying item and any internal references to its
+ * group and underlying item are released. Called in preparation for
+ * serialization of the group. Clearing references is necessary in order to
+ * ensure no stale references to group content are retained outside of
+ * the {@link SerializationGroup} itself.
+ */
+ void prePassivate();
+
+ /**
+ * Instructs this group member to ensure any preReplicate callbacks are
+ * invoked on the underlying item and any internal references to its
+ * group and underlying item are released. Called in preparation for
+ * serialization of the group. Clearing references is necessary in order to
+ * ensure no stale references to group content are retained outside of
+ * the {@link SerializationGroup} itself.
+ */
+ void preReplicate();
+
+ /**
+ * Provides a reference to the cache that is managing this item.
+ *
+ * @param cache the cache. May be <code>null</code>.
+ */
+ void setPassivatingCache(GroupAwareBackingCache<T, SerializationGroupMember<T>> cache);
+
+ /**
+ * Sets this object's underlying item.
+ *
+ * @param obj the underlying item. Cannot be <code>null</code>.
+ */
void setUnderlyingItem(T obj);
+
+ /**
+ * Gets whether pre-replication callbacks have been invoked on
+ * the underlying item.
+ */
boolean isPreReplicated();
+
+ /**
+ * Sets whether pre-replication callbacks have been invoked on the
+ * underlying item.
+ */
void setPreReplicated(boolean preReplicated);
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * If the lock on this member can be acquired and the member has
+ * {@link #getGroup() a reference to a group},
+ * <code>SerializationGroup.tryLock()</code> will also be invoked. If the
+ * group lock cannot be obtained, the member lock will be released and
+ * <code>false</code> returned.
+ * </p>
+ */
+ boolean tryLock();
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * If the lock on this member is acquired and the member has
+ * {@link #getGroup() a reference to a group},
+ * <code>SerializationGroup.lock()</code> will also be invoked. If the
+ * thread is interrupted waiting for the group lock, the member lock will be
+ * released.
+ * </p>
+ */
+ void lock();
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * If the member has {@link #getGroup() a reference to a group},
+ * <code>SerializationGroup.unlock()</code> will first be invoked.
+ * </p>
+ */
+ void unlock();
}
\ No newline at end of file
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/impl/AbstractPassivatingIntegratedObjectStore.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/impl/AbstractPassivatingIntegratedObjectStore.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/main/java/org/jboss/ejb3/cache/spi/impl/AbstractPassivatingIntegratedObjectStore.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -24,8 +24,8 @@
import org.jboss.ejb3.annotation.CacheConfig;
import org.jboss.ejb3.cache.api.CacheItem;
-import org.jboss.ejb3.cache.spi.BackingCacheEntry;
import org.jboss.ejb3.cache.spi.PassivatingBackingCache;
+import org.jboss.ejb3.cache.spi.PassivatingBackingCacheEntry;
import org.jboss.ejb3.cache.spi.PassivatingIntegratedObjectStore;
import org.jboss.logging.Logger;
@@ -36,7 +36,7 @@
* @author Brian Stansberry
* @version $Revision$
*/
-public abstract class AbstractPassivatingIntegratedObjectStore<C extends CacheItem, T extends BackingCacheEntry<C>>
+public abstract class AbstractPassivatingIntegratedObjectStore<C extends CacheItem, T extends PassivatingBackingCacheEntry<C>>
implements PassivatingIntegratedObjectStore<C, T>
{
private static final Logger log = Logger.getLogger(AbstractPassivatingIntegratedObjectStore.class);
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/distributed/GroupedPassivatingUnitTestCase.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/distributed/GroupedPassivatingUnitTestCase.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/distributed/GroupedPassivatingUnitTestCase.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -27,6 +27,7 @@
import org.jboss.ejb3.test.cache.mock.MockBeanContainer;
import org.jboss.ejb3.test.cache.mock.MockBeanContext;
import org.jboss.ejb3.test.cache.mock.MockCacheConfig;
+import org.jboss.ejb3.test.cache.mock.MockEntity;
import org.jboss.ejb3.test.cache.mock.MockPassivationManager;
import org.jboss.ejb3.test.cache.mock.MockXPC;
import org.jboss.logging.Logger;
@@ -55,7 +56,7 @@
public void testSimpleGroupPassivation() throws Exception
{
- log.info("testSimpleGroupPassivation()");
+ log.info("====== testSimpleGroupPassivation() ======");
MockCluster cluster = new MockCluster(false);
MockClusterMember node0 = cluster.getNode0();
@@ -108,7 +109,7 @@
assertTrue("ctx1 must be different than firstCtx1 (else no passivation has taken place)", ctx1 != firstCtx1);
assertNotNull(ctx1.getXPC());
- assertEquals(ctx1.getXPC(), ctx2.getXPC());
+ assertSame(ctx1.getXPC(), ctx2.getXPC());
}
finally
{
@@ -123,4 +124,110 @@
}
}
}
+
+ public void testSimpleGroupReplication() throws Exception
+ {
+ log.info("====== testSimpleGroupReplication() ======");
+
+ MockCluster cluster = new MockCluster(false);
+ MockCacheConfig cacheConfig = new MockCacheConfig();
+ cacheConfig.setIdleTimeoutSeconds(1);
+ MockXPC sharedXPC = new MockXPC("XPCA");
+ MockBeanContainer[] firstSet = cluster.deployBeanContainer("MockBeanContainer1", null, CacheType.DISTRIBUTED, cacheConfig, sharedXPC.getName(), sharedXPC.getName());
+ MockBeanContainer[] secondSet = cluster.deployBeanContainer("MockBeanContainer2", "MockBeanContainer1", CacheType.DISTRIBUTED, cacheConfig, sharedXPC.getName(), sharedXPC.getName());
+ MockBeanContainer container1A = firstSet[0];
+ MockBeanContainer container1B = firstSet[1];
+ MockBeanContainer container2A = secondSet[0];
+ MockBeanContainer container2B = secondSet[1];
+
+ Object key1 = null;
+ Object key2 = null;
+ MockEntity entityA = null;
+
+ cluster.getNode0().setTCCL();
+ try
+ {
+ key1 = container1A.getCache().create(null, null);
+ MockBeanContext ctx1A = container1A.getCache().get(key1);
+
+ key2 = ctx1A.getChild(container2A.getName());
+ MockBeanContext ctx2A = container2A.getCache().get(key2);
+
+ assertNotNull(ctx1A.getXPC());
+ assertSame(ctx1A.getXPC(), ctx2A.getXPC());
+
+ entityA = ctx2A.createEntity();
+ assertSame(entityA, ctx1A.getEntity());
+
+ container2A.getCache().finished(ctx2A);
+ container1A.getCache().finished(ctx1A);
+
+ MockPassivationManager pass1A = (MockPassivationManager) container1A.getPassivationManager();
+ MockPassivationManager pass2A = (MockPassivationManager) container2A.getPassivationManager();
+
+ assertEquals("ctx1 should have been replicated", 1, pass1A.getPreReplicateCount());
+ assertEquals("ctx2 should have been passivated", 1, pass2A.getPreReplicateCount());
+
+ ctx2A = container2A.getCache().get(key2);
+
+ log.info("ctx2 = " + ctx2A);
+ assertNotNull(ctx2A);
+
+ assertEquals("ctx2 should have been postReplicated", 1, pass2A.getPostReplicateCount());
+ assertEquals("ctx2 should not have been activated", 0, pass2A.getPostActivateCount());
+
+ ctx1A = container1A.getCache().get(key1);
+
+ log.info("ctx1 = " + ctx1A);
+ assertNotNull(ctx1A);
+
+ assertEquals("ctx1 should have been postReplicated", 1, pass1A.getPostReplicateCount());
+ assertEquals("ctx1 should not have been activated", 0, pass1A.getPostActivateCount());
+
+ assertNotNull(ctx1A.getXPC());
+ assertSame(ctx1A.getXPC(), ctx2A.getXPC());
+
+ MockEntity entity1x = ctx1A.getEntity();
+ assertEquals(entityA, entity1x);
+
+ container1A.getCache().finished(ctx1A);
+ container2A.getCache().finished(ctx2A);
+ }
+ catch (Exception e)
+ {
+ container1A.stop();
+ container2A.stop();
+ throw e;
+ }
+ finally
+ {
+ cluster.getNode0().restoreTCCL();
+ }
+
+ // Switch to second node
+ cluster.getNode1().setTCCL();
+ try
+ {
+ MockBeanContext ctx1B = container1B.getCache().get(key1);
+ MockBeanContext ctx2B = container2B.getCache().get(key2);
+
+ MockEntity entityB = ctx2B.getEntity();
+ assertSame(entityB, ctx1B.getEntity());
+ assertEquals(entityA, entityB);
+ assertNotSame(entityA, entityB);
+
+ container2B.getCache().finished(ctx2B);
+ container1B.getCache().finished(ctx1B);
+ }
+ catch (Exception e)
+ {
+ container1B.stop();
+ container2B.stop();
+ throw e;
+ }
+ finally
+ {
+ cluster.getNode1().restoreTCCL();
+ }
+ }
}
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/distributed/MockJBCIntegratedObjectStore.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/distributed/MockJBCIntegratedObjectStore.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/distributed/MockJBCIntegratedObjectStore.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -35,9 +35,9 @@
import org.jboss.ejb3.annotation.CacheConfig;
import org.jboss.ejb3.cache.api.CacheItem;
-import org.jboss.ejb3.cache.spi.BackingCacheEntry;
import org.jboss.ejb3.cache.spi.IntegratedObjectStore;
import org.jboss.ejb3.cache.spi.PassivatingBackingCache;
+import org.jboss.ejb3.cache.spi.PassivatingBackingCacheEntry;
import org.jboss.ejb3.cache.spi.PassivatingIntegratedObjectStore;
import org.jboss.ejb3.cache.spi.impl.CacheableTimestamp;
import org.jboss.ejb3.cache.spi.impl.PassivationExpirationRunner;
@@ -50,7 +50,7 @@
* @author Brian Stansberry
* @version $Revision$
*/
-public class MockJBCIntegratedObjectStore<C extends CacheItem, T extends BackingCacheEntry<C>>
+public class MockJBCIntegratedObjectStore<C extends CacheItem, T extends PassivatingBackingCacheEntry<C>>
implements PassivatingIntegratedObjectStore<C, T>
{
private static final Logger log = Logger.getLogger(MockJBCIntegratedObjectStore.class);
@@ -135,12 +135,17 @@
putInCache(entry.getId(), entry);
}
- public void update(T entry)
+ public void update(T entry, boolean modified)
{
- if (entry.isModified())
+ log.trace("updating " + entry.getId());
+ if (modified)
{
putInCache(entry.getId(), entry);
}
+ else
+ {
+ timestamps.put(entry.getId(), new Long(entry.getLastUsed()));
+ }
}
@SuppressWarnings("unchecked")
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockBeanContext.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockBeanContext.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockBeanContext.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -165,14 +165,17 @@
// -- Underlying bean operations
- public void createEntity()
+ public MockEntity createEntity()
{
entity = xpc.createEntity();
setModified(true);
+ log.trace(getId() + ": createEntity()");
+ return entity;
}
public MockEntity getEntity()
{
+ log.trace(getId() + ": getEntity()");
MockEntity was = entity;
entity = xpc.getEntity();
if (was != entity)
@@ -182,6 +185,7 @@
public void removeEntity()
{
+ log.trace(getId() + ": removeEntity()");
xpc.removeEntity();
if (entity != null)
{
@@ -192,11 +196,12 @@
public void invokeNonModifying()
{
- // no-op
+ log.trace(getId() + ": invokeNonModifying()");
}
public void invokeModifying()
{
- setModified(true);
+ log.trace(getId() + ": invokeModifying()");
+ setModified(true);
}
}
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockCacheItem.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockCacheItem.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockCacheItem.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -25,8 +25,9 @@
import org.jboss.ejb3.cache.api.CacheItem;
/**
+ * Mock implementation of a CacheItem.
+ *
* @author Brian Stansberry
- *
*/
public class MockCacheItem
extends MockIdentifiable
@@ -37,7 +38,7 @@
private boolean modified;
- public MockCacheItem(long id)
+ public MockCacheItem(int id)
{
super(id);
}
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockEntity.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockEntity.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockEntity.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -24,8 +24,9 @@
import java.io.Serializable;
+
/**
- * A SharedObject.
+ * A mock entity. We subclass MockIdentifiable just to pick up its code.
*
* @author Brian Stansberry
* @version $Revision$
@@ -33,4 +34,41 @@
public class MockEntity implements Serializable
{
private static final long serialVersionUID = 1L;
+
+ private int id;
+
+ public MockEntity(int id)
+ {
+ this.id = id;
+ }
+
+ public Object getId()
+ {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj instanceof MockEntity)
+ {
+ MockEntity other = (MockEntity) obj;
+ return this.id == other.id;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return id;
+ }
+
+ @Override
+ public String toString()
+ {
+ return super.toString() + "{id=" + id + "}";
+ }
}
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockIdentifiable.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockIdentifiable.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockIdentifiable.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -24,9 +24,10 @@
import java.io.Serializable;
import org.jboss.ejb3.cache.api.Identifiable;
+import org.jboss.logging.Logger;
/**
- * Comment
+ * Mock implementation of an Identifiable.
*
* @author <a href="mailto:carlo.dewolf at jboss.com">Carlo de Wolf</a>
* @version $Revision: $
@@ -36,16 +37,18 @@
/** The serialVersionUID */
private static final long serialVersionUID = 828205583403324513L;
- private static volatile long currentId = 0;
+ private static volatile int currentId = 0;
- public static long createId()
+ protected Logger log = Logger.getLogger(getClass());
+
+ public static int createId()
{
return ++currentId;
}
- private long id;
+ private int id;
- public MockIdentifiable(long id)
+ public MockIdentifiable(int id)
{
this.id = id;
}
Modified: projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockXPC.java
===================================================================
--- projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockXPC.java 2008-03-17 19:54:44 UTC (rev 70921)
+++ projects/ejb3/branches/cluster-dev/ejb3-cache/src/test/java/org/jboss/ejb3/test/cache/mock/MockXPC.java 2008-03-17 20:08:59 UTC (rev 70922)
@@ -55,7 +55,7 @@
{
if (entity != null)
throw new IllegalStateException("entity already created");
- entity = new MockEntity();
+ entity = new MockEntity(MockIdentifiable.createId());
return entity;
}
More information about the jboss-cvs-commits
mailing list