[infinispan-commits] Infinispan SVN: r991 - trunk/core/src/main/java/org/infinispan/container.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Oct 23 11:17:11 EDT 2009
Author: mircea.markus
Date: 2009-10-23 11:17:11 -0400 (Fri, 23 Oct 2009)
New Revision: 991
Added:
trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java
Removed:
trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java
Log:
[ISPN-236]-(cache.putIfAbsent() is not atomic) - fix
Deleted: trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java 2009-10-23 14:10:13 UTC (rev 990)
+++ trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java 2009-10-23 15:17:11 UTC (rev 991)
@@ -1,238 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.container;
-
-import org.infinispan.config.Configuration;
-import org.infinispan.container.entries.CacheEntry;
-import org.infinispan.container.entries.MVCCEntry;
-import org.infinispan.container.entries.NullMarkerEntry;
-import org.infinispan.container.entries.NullMarkerEntryForRemoval;
-import org.infinispan.container.entries.ReadCommittedEntry;
-import org.infinispan.container.entries.RepeatableReadEntry;
-import org.infinispan.context.Flag;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.factories.annotations.Inject;
-import org.infinispan.factories.annotations.Start;
-import org.infinispan.notifications.cachelistener.CacheNotifier;
-import org.infinispan.util.Util;
-import org.infinispan.util.concurrent.IsolationLevel;
-import org.infinispan.util.concurrent.TimeoutException;
-import org.infinispan.util.concurrent.locks.LockManager;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-public class EntryFactoryImpl implements EntryFactory {
- private boolean useRepeatableRead;
- DataContainer container;
- boolean writeSkewCheck;
- LockManager lockManager;
- Configuration configuration;
- CacheNotifier notifier;
-
- private static final Log log = LogFactory.getLog(EntryFactoryImpl.class);
- private static final boolean trace = log.isTraceEnabled();
-
- @Inject
- public void injectDependencies(DataContainer dataContainer, LockManager lockManager, Configuration configuration, CacheNotifier notifier) {
- this.container = dataContainer;
- this.configuration = configuration;
- this.lockManager = lockManager;
- this.notifier = notifier;
- }
-
- @Start
- public void init() {
- useRepeatableRead = configuration.getIsolationLevel() == IsolationLevel.REPEATABLE_READ;
- writeSkewCheck = configuration.isWriteSkewCheck();
- }
-
- private MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert, boolean forRemoval, long lifespan) {
- if (value == null && !isForInsert) return useRepeatableRead ?
- forRemoval ? new NullMarkerEntryForRemoval(key) : NullMarkerEntry.getInstance()
- : null;
-
- return useRepeatableRead ? new RepeatableReadEntry(key, value, lifespan) : new ReadCommittedEntry(key, value, lifespan);
- }
-
- public final CacheEntry wrapEntryForReading(InvocationContext ctx, Object key) throws InterruptedException {
- CacheEntry cacheEntry;
- if (ctx.hasFlag(Flag.FORCE_WRITE_LOCK)) {
- if (trace) log.trace("Forcing lock on reading");
- return wrapEntryForWriting(ctx, key, false, false, false, false);
- } else if ((cacheEntry = ctx.lookupEntry(key)) == null) {
- if (trace) log.trace("Key {0} is not in context, fetching from container.", key);
- // simple implementation. Peek the entry, wrap it, put wrapped entry in the context.
- cacheEntry = container.get(key);
-
- // do not bother wrapping though if this is not in a tx. repeatable read etc are all meaningless unless there is a tx.
- if (useRepeatableRead && ctx.isInTxScope()) {
- MVCCEntry mvccEntry = cacheEntry == null ?
- createWrappedEntry(key, null, false, false, -1) :
- createWrappedEntry(key, cacheEntry.getValue(), false, false, cacheEntry.getLifespan());
- if (mvccEntry != null) ctx.putLookedUpEntry(key, mvccEntry);
- return mvccEntry;
- }
- // if not in transaction and repeatable read, or simply read committed (regardless of whether in TX or not), do not wrap
- if (cacheEntry != null) ctx.putLookedUpEntry(key, cacheEntry);
- return cacheEntry;
- } else {
- if (trace) log.trace("Key is already in context");
- return cacheEntry;
- }
- }
-
- public final MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean createIfAbsent, boolean forceLockIfAbsent, boolean alreadyLocked, boolean forRemoval) throws InterruptedException {
- CacheEntry cacheEntry = ctx.lookupEntry(key);
- MVCCEntry mvccEntry = null;
- if (createIfAbsent && cacheEntry != null && cacheEntry.isNull()) cacheEntry = null;
- if (cacheEntry != null) // exists in context! Just acquire lock if needed, and wrap.
- {
- if (trace) log.trace("Exists in context.");
- // acquire lock if needed
- if (alreadyLocked || acquireLock(ctx, key)) {
-
- if (cacheEntry instanceof MVCCEntry && (!forRemoval || !(cacheEntry instanceof NullMarkerEntry))) {
- mvccEntry = (MVCCEntry) cacheEntry;
- } else {
- // this is a read-only entry that needs to be copied to a proper read-write entry!!
- mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, forRemoval, cacheEntry.getLifespan());
- cacheEntry = mvccEntry;
- ctx.putLookedUpEntry(key, cacheEntry);
- }
-
- // create a copy of the underlying entry
- mvccEntry.copyForUpdate(container, writeSkewCheck);
- }
-
- if (cacheEntry.isRemoved() && createIfAbsent) {
- if (trace) log.trace("Entry is deleted in current scope. Need to un-delete.");
- if (mvccEntry != cacheEntry) mvccEntry = (MVCCEntry) cacheEntry;
- mvccEntry.setRemoved(false);
- mvccEntry.setValid(true);
- }
-
- return mvccEntry;
-
- } else {
- // else, fetch from dataContainer.
- cacheEntry = container.get(key);
- if (cacheEntry != null) {
- if (trace) log.trace("Retrieved from container.");
- // exists in cache! Just acquire lock if needed, and wrap.
- // do we need a lock?
- boolean lockAcquired = acquireLock(ctx, key);
- mvccEntry = wrapExistingEntryForWriting(ctx, key, alreadyLocked, cacheEntry, lockAcquired);
- } else if (createIfAbsent) {
- // this is the *only* point where new entries can be created!!
- if (trace) log.trace("Creating new entry.");
-
- boolean lockAcquired = false;
- // now to lock and create the entry. Lock first to prevent concurrent creation!
- if (!alreadyLocked) {
- lockAcquired = acquireLock(ctx, key);
- }
- if (lockAcquired) {
- //double check existance, as the prev lock owner might have created a cache entry for the same key
- cacheEntry = container.get(key);
- if (cacheEntry != null) {
- mvccEntry = wrapExistingEntryForWriting(ctx, key, true, cacheEntry, lockAcquired);
- } else {
- mvccEntry = createAndWrapEntryForWriting(ctx, key);
- }
- } else {
- mvccEntry = createAndWrapEntryForWriting(ctx, key);
- }
- }
- }
-
- // see if we need to force the lock on nonexistent entries.
- if (mvccEntry == null && forceLockIfAbsent) {
- // make sure we record this! Null value since this is a forced lock on the key
- if (acquireLock(ctx, key)) ctx.putLookedUpEntry(key, null);
- }
-
- return mvccEntry;
- }
-
- private MVCCEntry createAndWrapEntryForWriting(InvocationContext ctx, Object key) {
- MVCCEntry mvccEntry;
- notifier.notifyCacheEntryCreated(key, true, ctx);
- mvccEntry = createWrappedEntry(key, null, true, false, -1);
- mvccEntry.setCreated(true);
- ctx.putLookedUpEntry(key, mvccEntry);
- mvccEntry.copyForUpdate(container, writeSkewCheck);
- notifier.notifyCacheEntryCreated(key, false, ctx);
- return mvccEntry;
- }
-
- private MVCCEntry wrapExistingEntryForWriting(InvocationContext ctx, Object key, boolean alreadyLocked, CacheEntry cacheEntry, boolean lockAquired) {
- MVCCEntry mvccEntry;
- mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, false, cacheEntry.getLifespan());
- ctx.putLookedUpEntry(key, mvccEntry);
- boolean needToCopy = alreadyLocked || lockAquired || ctx.hasFlag(Flag.SKIP_LOCKING); // even if we do not acquire a lock, if skip-locking is enabled we should copy
- if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck);
- return mvccEntry;
- }
-
- /**
- * Attempts to lock an entry if the lock isn't already held in the current scope, and records the lock in the
- * context.
- *
- * @param ctx context
- * @param key Key to lock
- * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was
- * already held)
- * @throws InterruptedException if interrupted
- * @throws org.infinispan.util.concurrent.TimeoutException
- * if we are unable to acquire the lock after a specified timeout.
- */
- public final boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException {
- // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
- // lock which may be shared with another key that we have a lock for already.
- // nothing wrong, just means that we fail to record the lock. And that is a problem.
- // Better to check our records and lock again if necessary.
-
- boolean shouldSkipLocking = ctx.hasFlag(Flag.SKIP_LOCKING);
-
- if (!ctx.hasLockedKey(key) && !shouldSkipLocking) {
- if (lockManager.lockAndRecord(key, ctx)) {
- // successfully locked!
- return true;
- } else {
- Object owner = lockManager.getOwner(key);
- throw new TimeoutException("Unable to acquire lock after [" + Util.prettyPrintTime(getLockAcquisitionTimeout(ctx)) + "] on key [" + key + "] for requestor [" +
- ctx.getLockOwner() + "]! Lock held by [" + owner + "]");
- }
- }
-
- return false;
- }
-
- private long getLockAcquisitionTimeout(InvocationContext ctx) {
- return ctx.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT) ?
- 0 : configuration.getLockAcquisitionTimeout();
- }
-
- public final void releaseLock(Object key) {
- lockManager.unlock(key, lockManager.getOwner(key));
- }
-}
Added: trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java 2009-10-23 15:17:11 UTC (rev 991)
@@ -0,0 +1,216 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.container;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.container.entries.MVCCEntry;
+import org.infinispan.container.entries.NullMarkerEntry;
+import org.infinispan.container.entries.NullMarkerEntryForRemoval;
+import org.infinispan.container.entries.ReadCommittedEntry;
+import org.infinispan.container.entries.RepeatableReadEntry;
+import org.infinispan.context.Flag;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.factories.annotations.Inject;
+import org.infinispan.factories.annotations.Start;
+import org.infinispan.notifications.cachelistener.CacheNotifier;
+import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.IsolationLevel;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+public class EntryFactoryImpl implements EntryFactory {
+ private boolean useRepeatableRead;
+ DataContainer container;
+ boolean writeSkewCheck;
+ LockManager lockManager;
+ Configuration configuration;
+ CacheNotifier notifier;
+
+ private static final Log log = LogFactory.getLog(EntryFactoryImpl.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ @Inject
+ public void injectDependencies(DataContainer dataContainer, LockManager lockManager, Configuration configuration, CacheNotifier notifier) {
+ this.container = dataContainer;
+ this.configuration = configuration;
+ this.lockManager = lockManager;
+ this.notifier = notifier;
+ }
+
+ @Start
+ public void init() {
+ useRepeatableRead = configuration.getIsolationLevel() == IsolationLevel.REPEATABLE_READ;
+ writeSkewCheck = configuration.isWriteSkewCheck();
+ }
+
+ private MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert, boolean forRemoval, long lifespan) {
+ if (value == null && !isForInsert) return useRepeatableRead ?
+ forRemoval ? new NullMarkerEntryForRemoval(key) : NullMarkerEntry.getInstance()
+ : null;
+
+ return useRepeatableRead ? new RepeatableReadEntry(key, value, lifespan) : new ReadCommittedEntry(key, value, lifespan);
+ }
+
+ public final CacheEntry wrapEntryForReading(InvocationContext ctx, Object key) throws InterruptedException {
+ CacheEntry cacheEntry;
+ if (ctx.hasFlag(Flag.FORCE_WRITE_LOCK)) {
+ if (trace) log.trace("Forcing lock on reading");
+ return wrapEntryForWriting(ctx, key, false, false, false, false);
+ } else if ((cacheEntry = ctx.lookupEntry(key)) == null) {
+ if (trace) log.trace("Key {0} is not in context, fetching from container.", key);
+ // simple implementation. Peek the entry, wrap it, put wrapped entry in the context.
+ cacheEntry = container.get(key);
+
+ // do not bother wrapping though if this is not in a tx. repeatable read etc are all meaningless unless there is a tx.
+ if (useRepeatableRead && ctx.isInTxScope()) {
+ MVCCEntry mvccEntry = cacheEntry == null ?
+ createWrappedEntry(key, null, false, false, -1) :
+ createWrappedEntry(key, cacheEntry.getValue(), false, false, cacheEntry.getLifespan());
+ if (mvccEntry != null) ctx.putLookedUpEntry(key, mvccEntry);
+ return mvccEntry;
+ }
+ // if not in transaction and repeatable read, or simply read committed (regardless of whether in TX or not), do not wrap
+ if (cacheEntry != null) ctx.putLookedUpEntry(key, cacheEntry);
+ return cacheEntry;
+ } else {
+ if (trace) log.trace("Key is already in context");
+ return cacheEntry;
+ }
+ }
+
+ public final MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean createIfAbsent, boolean forceLockIfAbsent, boolean alreadyLocked, boolean forRemoval) throws InterruptedException {
+ CacheEntry cacheEntry = ctx.lookupEntry(key);
+ MVCCEntry mvccEntry = null;
+ if (createIfAbsent && cacheEntry != null && cacheEntry.isNull()) cacheEntry = null;
+ if (cacheEntry != null) // exists in context! Just acquire lock if needed, and wrap.
+ {
+ if (trace) log.trace("Exists in context.");
+ // acquire lock if needed
+ if (alreadyLocked || acquireLock(ctx, key)) {
+
+ if (cacheEntry instanceof MVCCEntry && (!forRemoval || !(cacheEntry instanceof NullMarkerEntry))) {
+ mvccEntry = (MVCCEntry) cacheEntry;
+ } else {
+ // this is a read-only entry that needs to be copied to a proper read-write entry!!
+ mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, forRemoval, cacheEntry.getLifespan());
+ cacheEntry = mvccEntry;
+ ctx.putLookedUpEntry(key, cacheEntry);
+ }
+
+ // create a copy of the underlying entry
+ mvccEntry.copyForUpdate(container, writeSkewCheck);
+ }
+
+ if (cacheEntry.isRemoved() && createIfAbsent) {
+ if (trace) log.trace("Entry is deleted in current scope. Need to un-delete.");
+ if (mvccEntry != cacheEntry) mvccEntry = (MVCCEntry) cacheEntry;
+ mvccEntry.setRemoved(false);
+ mvccEntry.setValid(true);
+ }
+
+ return mvccEntry;
+
+ } else {
+ boolean lockAcquired = false;
+ if (!alreadyLocked) {
+ lockAcquired = acquireLock(ctx, key);
+ }
+ // else, fetch from dataContainer.
+ cacheEntry = container.get(key);
+ if (cacheEntry != null) {
+ if (trace) log.trace("Retrieved from container.");
+ // exists in cache! Just acquire lock if needed, and wrap.
+ // do we need a lock?
+ boolean needToCopy = alreadyLocked || lockAcquired || ctx.hasFlag(Flag.SKIP_LOCKING); // even if we do not acquire a lock, if skip-locking is enabled we should copy
+ mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, false, cacheEntry.getLifespan());
+ ctx.putLookedUpEntry(key, mvccEntry);
+ if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck);
+ } else if (createIfAbsent) {
+ // this is the *only* point where new entries can be created!!
+ if (trace) log.trace("Creating new entry.");
+ // now to lock and create the entry. Lock first to prevent concurrent creation!
+ notifier.notifyCacheEntryCreated(key, true, ctx);
+ mvccEntry = createWrappedEntry(key, null, true, false, -1);
+ mvccEntry.setCreated(true);
+ ctx.putLookedUpEntry(key, mvccEntry);
+ mvccEntry.copyForUpdate(container, writeSkewCheck);
+ notifier.notifyCacheEntryCreated(key, false, ctx);
+ } else {
+ releaseLock(key);
+ }
+ }
+
+ // see if we need to force the lock on nonexistent entries.
+ if (mvccEntry == null && forceLockIfAbsent) {
+ // make sure we record this! Null value since this is a forced lock on the key
+ if (acquireLock(ctx, key)) ctx.putLookedUpEntry(key, null);
+ }
+
+ return mvccEntry;
+ }
+
+ /**
+ * Attempts to lock an entry if the lock isn't already held in the current scope, and records the lock in the
+ * context.
+ *
+ * @param ctx context
+ * @param key Key to lock
+ * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was
+ * already held)
+ * @throws InterruptedException if interrupted
+ * @throws org.infinispan.util.concurrent.TimeoutException
+ * if we are unable to acquire the lock after a specified timeout.
+ */
+ public final boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException {
+ // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
+ // lock which may be shared with another key that we have a lock for already.
+ // nothing wrong, just means that we fail to record the lock. And that is a problem.
+ // Better to check our records and lock again if necessary.
+
+ boolean shouldSkipLocking = ctx.hasFlag(Flag.SKIP_LOCKING);
+
+ if (!ctx.hasLockedKey(key) && !shouldSkipLocking) {
+ if (lockManager.lockAndRecord(key, ctx)) {
+ // successfully locked!
+ return true;
+ } else {
+ Object owner = lockManager.getOwner(key);
+ throw new TimeoutException("Unable to acquire lock after [" + Util.prettyPrintTime(getLockAcquisitionTimeout(ctx)) + "] on key [" + key + "] for requestor [" +
+ ctx.getLockOwner() + "]! Lock held by [" + owner + "]");
+ }
+ }
+
+ return false;
+ }
+
+ private long getLockAcquisitionTimeout(InvocationContext ctx) {
+ return ctx.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT) ?
+ 0 : configuration.getLockAcquisitionTimeout();
+ }
+
+ public final void releaseLock(Object key) {
+ lockManager.unlock(key, lockManager.getOwner(key));
+ }
+}
More information about the infinispan-commits
mailing list