[infinispan-commits] Infinispan SVN: r991 - trunk/core/src/main/java/org/infinispan/container.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Oct 23 11:17:11 EDT 2009


Author: mircea.markus
Date: 2009-10-23 11:17:11 -0400 (Fri, 23 Oct 2009)
New Revision: 991

Added:
   trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java
Removed:
   trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java
Log:
[ISPN-236]-(cache.putIfAbsent() is not atomic) - fix

Deleted: trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java	2009-10-23 14:10:13 UTC (rev 990)
+++ trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java	2009-10-23 15:17:11 UTC (rev 991)
@@ -1,238 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.container;
-
-import org.infinispan.config.Configuration;
-import org.infinispan.container.entries.CacheEntry;
-import org.infinispan.container.entries.MVCCEntry;
-import org.infinispan.container.entries.NullMarkerEntry;
-import org.infinispan.container.entries.NullMarkerEntryForRemoval;
-import org.infinispan.container.entries.ReadCommittedEntry;
-import org.infinispan.container.entries.RepeatableReadEntry;
-import org.infinispan.context.Flag;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.factories.annotations.Inject;
-import org.infinispan.factories.annotations.Start;
-import org.infinispan.notifications.cachelistener.CacheNotifier;
-import org.infinispan.util.Util;
-import org.infinispan.util.concurrent.IsolationLevel;
-import org.infinispan.util.concurrent.TimeoutException;
-import org.infinispan.util.concurrent.locks.LockManager;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-public class EntryFactoryImpl implements EntryFactory {
-   private boolean useRepeatableRead;
-   DataContainer container;
-   boolean writeSkewCheck;
-   LockManager lockManager;
-   Configuration configuration;
-   CacheNotifier notifier;
-
-   private static final Log log = LogFactory.getLog(EntryFactoryImpl.class);
-   private static final boolean trace = log.isTraceEnabled();
-
-   @Inject
-   public void injectDependencies(DataContainer dataContainer, LockManager lockManager, Configuration configuration, CacheNotifier notifier) {
-      this.container = dataContainer;
-      this.configuration = configuration;
-      this.lockManager = lockManager;
-      this.notifier = notifier;
-   }
-
-   @Start
-   public void init() {
-      useRepeatableRead = configuration.getIsolationLevel() == IsolationLevel.REPEATABLE_READ;
-      writeSkewCheck = configuration.isWriteSkewCheck();
-   }
-
-   private MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert, boolean forRemoval, long lifespan) {
-      if (value == null && !isForInsert) return useRepeatableRead ?
-            forRemoval ? new NullMarkerEntryForRemoval(key) : NullMarkerEntry.getInstance()
-            : null;
-
-      return useRepeatableRead ? new RepeatableReadEntry(key, value, lifespan) : new ReadCommittedEntry(key, value, lifespan);
-   }
-
-   public final CacheEntry wrapEntryForReading(InvocationContext ctx, Object key) throws InterruptedException {
-      CacheEntry cacheEntry;
-      if (ctx.hasFlag(Flag.FORCE_WRITE_LOCK)) {
-         if (trace) log.trace("Forcing lock on reading");
-         return wrapEntryForWriting(ctx, key, false, false, false, false);
-      } else if ((cacheEntry = ctx.lookupEntry(key)) == null) {
-         if (trace) log.trace("Key {0} is not in context, fetching from container.", key);
-         // simple implementation.  Peek the entry, wrap it, put wrapped entry in the context.
-         cacheEntry = container.get(key);
-
-         // do not bother wrapping though if this is not in a tx.  repeatable read etc are all meaningless unless there is a tx.
-         if (useRepeatableRead && ctx.isInTxScope()) {
-            MVCCEntry mvccEntry = cacheEntry == null ?
-                     createWrappedEntry(key, null, false, false, -1) :
-                     createWrappedEntry(key, cacheEntry.getValue(), false, false, cacheEntry.getLifespan());
-               if (mvccEntry != null) ctx.putLookedUpEntry(key, mvccEntry);
-               return mvccEntry;
-         }
-         // if not in transaction and repeatable read, or simply read committed (regardless of whether in TX or not), do not wrap
-         if (cacheEntry != null) ctx.putLookedUpEntry(key, cacheEntry);
-            return cacheEntry;
-      } else {
-         if (trace) log.trace("Key is already in context");
-         return cacheEntry;
-      }
-   }
-
-   public final MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean createIfAbsent, boolean forceLockIfAbsent, boolean alreadyLocked, boolean forRemoval) throws InterruptedException {
-      CacheEntry cacheEntry = ctx.lookupEntry(key);
-      MVCCEntry mvccEntry = null;
-      if (createIfAbsent && cacheEntry != null && cacheEntry.isNull()) cacheEntry = null;
-      if (cacheEntry != null) // exists in context!  Just acquire lock if needed, and wrap.
-      {
-         if (trace) log.trace("Exists in context.");
-         // acquire lock if needed
-         if (alreadyLocked || acquireLock(ctx, key)) {
-
-            if (cacheEntry instanceof MVCCEntry && (!forRemoval || !(cacheEntry instanceof NullMarkerEntry))) {
-               mvccEntry = (MVCCEntry) cacheEntry;
-            } else {
-               // this is a read-only entry that needs to be copied to a proper read-write entry!!
-               mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, forRemoval, cacheEntry.getLifespan());
-               cacheEntry = mvccEntry;
-               ctx.putLookedUpEntry(key, cacheEntry);
-            }
-
-            // create a copy of the underlying entry
-            mvccEntry.copyForUpdate(container, writeSkewCheck);
-         }
-
-         if (cacheEntry.isRemoved() && createIfAbsent) {
-            if (trace) log.trace("Entry is deleted in current scope.  Need to un-delete.");
-            if (mvccEntry != cacheEntry) mvccEntry = (MVCCEntry) cacheEntry;
-            mvccEntry.setRemoved(false);
-            mvccEntry.setValid(true);
-         }
-
-         return mvccEntry;
-
-      } else {
-         // else, fetch from dataContainer.
-         cacheEntry = container.get(key);
-         if (cacheEntry != null) {
-            if (trace) log.trace("Retrieved from container.");
-            // exists in cache!  Just acquire lock if needed, and wrap.
-            // do we need a lock?
-            boolean lockAcquired = acquireLock(ctx, key);
-            mvccEntry = wrapExistingEntryForWriting(ctx, key, alreadyLocked, cacheEntry, lockAcquired);
-         } else if (createIfAbsent) {
-            // this is the *only* point where new entries can be created!!
-            if (trace) log.trace("Creating new entry.");
-
-            boolean lockAcquired = false;
-            // now to lock and create the entry.  Lock first to prevent concurrent creation!
-            if (!alreadyLocked)  {
-               lockAcquired = acquireLock(ctx, key);
-            }
-            if (lockAcquired) {
-               //double check existance, as the prev lock owner might have created a cache entry for the same key
-               cacheEntry = container.get(key);
-               if (cacheEntry != null) {
-                 mvccEntry = wrapExistingEntryForWriting(ctx, key, true, cacheEntry, lockAcquired);
-               } else {
-                  mvccEntry = createAndWrapEntryForWriting(ctx, key);
-               }
-            } else {
-               mvccEntry = createAndWrapEntryForWriting(ctx, key);
-            }
-         }
-      }
-
-      // see if we need to force the lock on nonexistent entries.
-      if (mvccEntry == null && forceLockIfAbsent) {
-         // make sure we record this! Null value since this is a forced lock on the key
-         if (acquireLock(ctx, key)) ctx.putLookedUpEntry(key, null);
-      }
-
-      return mvccEntry;
-   }
-
-   private MVCCEntry createAndWrapEntryForWriting(InvocationContext ctx, Object key) {
-      MVCCEntry mvccEntry;
-      notifier.notifyCacheEntryCreated(key, true, ctx);
-      mvccEntry = createWrappedEntry(key, null, true, false, -1);
-      mvccEntry.setCreated(true);
-      ctx.putLookedUpEntry(key, mvccEntry);
-      mvccEntry.copyForUpdate(container, writeSkewCheck);
-      notifier.notifyCacheEntryCreated(key, false, ctx);
-      return mvccEntry;
-   }
-
-   private MVCCEntry wrapExistingEntryForWriting(InvocationContext ctx, Object key, boolean alreadyLocked, CacheEntry cacheEntry, boolean lockAquired) {
-      MVCCEntry mvccEntry;
-      mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, false, cacheEntry.getLifespan());
-      ctx.putLookedUpEntry(key, mvccEntry);
-      boolean needToCopy = alreadyLocked || lockAquired || ctx.hasFlag(Flag.SKIP_LOCKING); // even if we do not acquire a lock, if skip-locking is enabled we should copy
-      if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck);
-      return mvccEntry;
-   }
-
-   /**
-    * Attempts to lock an entry if the lock isn't already held in the current scope, and records the lock in the
-    * context.
-    *
-    * @param ctx context
-    * @param key Key to lock
-    * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was
-    *         already held)
-    * @throws InterruptedException if interrupted
-    * @throws org.infinispan.util.concurrent.TimeoutException
-    *                              if we are unable to acquire the lock after a specified timeout.
-    */
-   public final boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException {
-      // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
-      // lock which may be shared with another key that we have a lock for already.
-      // nothing wrong, just means that we fail to record the lock.  And that is a problem.
-      // Better to check our records and lock again if necessary.
-
-      boolean shouldSkipLocking = ctx.hasFlag(Flag.SKIP_LOCKING);
-
-      if (!ctx.hasLockedKey(key) && !shouldSkipLocking) {
-         if (lockManager.lockAndRecord(key, ctx)) {
-            // successfully locked!
-            return true;
-         } else {
-            Object owner = lockManager.getOwner(key);
-            throw new TimeoutException("Unable to acquire lock after [" + Util.prettyPrintTime(getLockAcquisitionTimeout(ctx)) + "] on key [" + key + "] for requestor [" +
-                  ctx.getLockOwner() + "]! Lock held by [" + owner + "]");
-         }
-      }
-
-      return false;
-   }
-
-   private long getLockAcquisitionTimeout(InvocationContext ctx) {
-      return ctx.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT) ?
-            0 : configuration.getLockAcquisitionTimeout();
-   }
-
-   public final void releaseLock(Object key) {
-      lockManager.unlock(key, lockManager.getOwner(key));
-   }
-}

Added: trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java	2009-10-23 15:17:11 UTC (rev 991)
@@ -0,0 +1,216 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.container;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.container.entries.MVCCEntry;
+import org.infinispan.container.entries.NullMarkerEntry;
+import org.infinispan.container.entries.NullMarkerEntryForRemoval;
+import org.infinispan.container.entries.ReadCommittedEntry;
+import org.infinispan.container.entries.RepeatableReadEntry;
+import org.infinispan.context.Flag;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.factories.annotations.Inject;
+import org.infinispan.factories.annotations.Start;
+import org.infinispan.notifications.cachelistener.CacheNotifier;
+import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.IsolationLevel;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+public class EntryFactoryImpl implements EntryFactory {
+   private boolean useRepeatableRead;
+   DataContainer container;
+   boolean writeSkewCheck;
+   LockManager lockManager;
+   Configuration configuration;
+   CacheNotifier notifier;
+
+   private static final Log log = LogFactory.getLog(EntryFactoryImpl.class);
+   private static final boolean trace = log.isTraceEnabled();
+
+   @Inject
+   public void injectDependencies(DataContainer dataContainer, LockManager lockManager, Configuration configuration, CacheNotifier notifier) {
+      this.container = dataContainer;
+      this.configuration = configuration;
+      this.lockManager = lockManager;
+      this.notifier = notifier;
+   }
+
+   @Start
+   public void init() {
+      useRepeatableRead = configuration.getIsolationLevel() == IsolationLevel.REPEATABLE_READ;
+      writeSkewCheck = configuration.isWriteSkewCheck();
+   }
+
+   private MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert, boolean forRemoval, long lifespan) {
+      if (value == null && !isForInsert) return useRepeatableRead ?
+            forRemoval ? new NullMarkerEntryForRemoval(key) : NullMarkerEntry.getInstance()
+            : null;
+
+      return useRepeatableRead ? new RepeatableReadEntry(key, value, lifespan) : new ReadCommittedEntry(key, value, lifespan);
+   }
+
+   public final CacheEntry wrapEntryForReading(InvocationContext ctx, Object key) throws InterruptedException {
+      CacheEntry cacheEntry;
+      if (ctx.hasFlag(Flag.FORCE_WRITE_LOCK)) {
+         if (trace) log.trace("Forcing lock on reading");
+         return wrapEntryForWriting(ctx, key, false, false, false, false);
+      } else if ((cacheEntry = ctx.lookupEntry(key)) == null) {
+         if (trace) log.trace("Key {0} is not in context, fetching from container.", key);
+         // simple implementation.  Peek the entry, wrap it, put wrapped entry in the context.
+         cacheEntry = container.get(key);
+
+         // do not bother wrapping though if this is not in a tx.  repeatable read etc are all meaningless unless there is a tx.
+         if (useRepeatableRead && ctx.isInTxScope()) {
+            MVCCEntry mvccEntry = cacheEntry == null ?
+                     createWrappedEntry(key, null, false, false, -1) :
+                     createWrappedEntry(key, cacheEntry.getValue(), false, false, cacheEntry.getLifespan());
+               if (mvccEntry != null) ctx.putLookedUpEntry(key, mvccEntry);
+               return mvccEntry;
+         }
+         // if not in transaction and repeatable read, or simply read committed (regardless of whether in TX or not), do not wrap
+         if (cacheEntry != null) ctx.putLookedUpEntry(key, cacheEntry);
+            return cacheEntry;
+      } else {
+         if (trace) log.trace("Key is already in context");
+         return cacheEntry;
+      }
+   }
+
+   public final MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean createIfAbsent, boolean forceLockIfAbsent, boolean alreadyLocked, boolean forRemoval) throws InterruptedException {
+      CacheEntry cacheEntry = ctx.lookupEntry(key);
+      MVCCEntry mvccEntry = null;
+      if (createIfAbsent && cacheEntry != null && cacheEntry.isNull()) cacheEntry = null;
+      if (cacheEntry != null) // exists in context!  Just acquire lock if needed, and wrap.
+      {
+         if (trace) log.trace("Exists in context.");
+         // acquire lock if needed
+         if (alreadyLocked || acquireLock(ctx, key)) {
+
+            if (cacheEntry instanceof MVCCEntry && (!forRemoval || !(cacheEntry instanceof NullMarkerEntry))) {
+               mvccEntry = (MVCCEntry) cacheEntry;
+            } else {
+               // this is a read-only entry that needs to be copied to a proper read-write entry!!
+               mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, forRemoval, cacheEntry.getLifespan());
+               cacheEntry = mvccEntry;
+               ctx.putLookedUpEntry(key, cacheEntry);
+            }
+
+            // create a copy of the underlying entry
+            mvccEntry.copyForUpdate(container, writeSkewCheck);
+         }
+
+         if (cacheEntry.isRemoved() && createIfAbsent) {
+            if (trace) log.trace("Entry is deleted in current scope.  Need to un-delete.");
+            if (mvccEntry != cacheEntry) mvccEntry = (MVCCEntry) cacheEntry;
+            mvccEntry.setRemoved(false);
+            mvccEntry.setValid(true);
+         }
+
+         return mvccEntry;
+
+      } else {
+         boolean lockAcquired = false;
+         if (!alreadyLocked) {
+            lockAcquired = acquireLock(ctx, key);
+         }
+         // else, fetch from dataContainer.
+         cacheEntry = container.get(key);
+         if (cacheEntry != null) {
+            if (trace) log.trace("Retrieved from container.");
+            // exists in cache!  Just acquire lock if needed, and wrap.
+            // do we need a lock?
+            boolean needToCopy = alreadyLocked || lockAcquired || ctx.hasFlag(Flag.SKIP_LOCKING); // even if we do not acquire a lock, if skip-locking is enabled we should copy
+            mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, false, cacheEntry.getLifespan());
+            ctx.putLookedUpEntry(key, mvccEntry);
+            if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck);
+         } else if (createIfAbsent) {
+            // this is the *only* point where new entries can be created!!
+            if (trace) log.trace("Creating new entry.");
+            // now to lock and create the entry.  Lock first to prevent concurrent creation!
+            notifier.notifyCacheEntryCreated(key, true, ctx);
+            mvccEntry = createWrappedEntry(key, null, true, false, -1);
+            mvccEntry.setCreated(true);
+            ctx.putLookedUpEntry(key, mvccEntry);
+            mvccEntry.copyForUpdate(container, writeSkewCheck);
+            notifier.notifyCacheEntryCreated(key, false, ctx);
+         } else {
+            releaseLock(key);
+         }
+      }
+
+      // see if we need to force the lock on nonexistent entries.
+      if (mvccEntry == null && forceLockIfAbsent) {
+         // make sure we record this! Null value since this is a forced lock on the key
+         if (acquireLock(ctx, key)) ctx.putLookedUpEntry(key, null);
+      }
+
+      return mvccEntry;
+   }
+
+   /**
+    * Attempts to lock an entry if the lock isn't already held in the current scope, and records the lock in the
+    * context.
+    *
+    * @param ctx context
+    * @param key Key to lock
+    * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was
+    *         already held)
+    * @throws InterruptedException if interrupted
+    * @throws org.infinispan.util.concurrent.TimeoutException
+    *                              if we are unable to acquire the lock after a specified timeout.
+    */
+   public final boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException {
+      // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
+      // lock which may be shared with another key that we have a lock for already.
+      // nothing wrong, just means that we fail to record the lock.  And that is a problem.
+      // Better to check our records and lock again if necessary.
+
+      boolean shouldSkipLocking = ctx.hasFlag(Flag.SKIP_LOCKING);
+
+      if (!ctx.hasLockedKey(key) && !shouldSkipLocking) {
+         if (lockManager.lockAndRecord(key, ctx)) {
+            // successfully locked!
+            return true;
+         } else {
+            Object owner = lockManager.getOwner(key);
+            throw new TimeoutException("Unable to acquire lock after [" + Util.prettyPrintTime(getLockAcquisitionTimeout(ctx)) + "] on key [" + key + "] for requestor [" +
+                  ctx.getLockOwner() + "]! Lock held by [" + owner + "]");
+         }
+      }
+
+      return false;
+   }
+
+   private long getLockAcquisitionTimeout(InvocationContext ctx) {
+      return ctx.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT) ?
+            0 : configuration.getLockAcquisitionTimeout();
+   }
+
+   public final void releaseLock(Object key) {
+      lockManager.unlock(key, lockManager.getOwner(key));
+   }
+}



More information about the infinispan-commits mailing list