[jbosscache-commits] JBoss Cache SVN: r7275 - in core/branches/flat/src: main/java/org/jboss/starobrno/commands/write and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Dec 10 10:10:12 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-12-10 10:10:12 -0500 (Wed, 10 Dec 2008)
New Revision: 7275

Modified:
   core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java
   core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ReplaceCommand.java
   core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/read_committed/ReadCommittedLockTest.java
   core/branches/flat/src/test/java/org/jboss/starobrno/profiling/ProfileTest.java
   core/branches/flat/src/test/java/org/jboss/starobrno/profiling/TreeProfileTest.java
   core/branches/flat/src/test/java/org/jboss/starobrno/profiling/testinternals/Generator.java
Log:
- Implemented putForExternalRead
- Fixed a bug in replace()
- Added better test for replace()
- Improved profiler tests

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java	2008-12-10 12:51:42 UTC (rev 7274)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java	2008-12-10 15:10:12 UTC (rev 7275)
@@ -37,6 +37,33 @@
  */
 public interface Cache<K, V> extends ConcurrentMap<K, V>, Lifecycle
 {
+   /**
+    * Under special operating behavior, associates the value with the specified key.
+    * <ul>
+    * <li> Only goes through if the key specified does not exist; no-op otherwise (similar to {@link java.util.concurrent.ConcurrentMap#replace(Object, Object)})</i>
+    * <li> Force asynchronous mode for replication to prevent any blocking.</li>
+    * <li> invalidation does not take place. </li>
+    * <li> 0ms lock timeout to prevent any blocking here either. If the lock is not acquired, this method is a no-op, and swallows the timeout exception.</li>
+    * <li> Ongoing transactions are suspended before this call, so failures here will not affect any ongoing transactions.</li>
+    * <li> Errors and exceptions are 'silent' - logged at a much lower level than normal, and this method does not throw exceptions</li>
+    * </ul>
+    * This method is for caching data that has an external representation in storage, where, concurrent modification and
+    * transactions are not a consideration, and failure to put the data in the cache should be treated as a 'suboptimal outcome'
+    * rather than a 'failing outcome'.
+    * <p/>
+    * An example of when this method is useful is when data is read from, for example, a legacy datastore, and is cached before
+    * returning the data to the caller.  Subsequent calls would prefer to get the data from the cache and if the data doesn't exist
+    * in the cache, fetch again from the legacy datastore.
+    * <p/>
+    * See <a href="http://jira.jboss.com/jira/browse/JBCACHE-848">JBCACHE-848</a> for details around this feature.
+    * <p/>
+    *
+    * @param key   key with which the specified value is to be associated.
+    * @param value value to be associated with the specified key.
+    * @throws IllegalStateException if {@link #getCacheStatus()} would not return {@link org.jboss.cache.CacheStatus#STARTED}.
+    */
+   void putForExternalRead(K key, V value);
+
    void evict(K key);
 
    Configuration getConfiguration();

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java	2008-12-10 12:51:42 UTC (rev 7274)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java	2008-12-10 15:10:12 UTC (rev 7275)
@@ -21,6 +21,8 @@
  */
 package org.jboss.starobrno;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.CacheStatus;
 import org.jboss.cache.Version;
 import org.jboss.cache.buddyreplication.BuddyManager;
@@ -79,6 +81,7 @@
    private EvictionManager evictionManager;
    private DataContainer dataContainer;
    private LockManager lockManager;
+   private static final Log log = LogFactory.getLog(CacheDelegate.class);
 
 
    @Inject
@@ -131,7 +134,7 @@
 
    public V replace(K key, V value)
    {
-      ReplaceCommand command = commandsFactory.buildReplaceCommand(key, value, null);
+      ReplaceCommand command = commandsFactory.buildReplaceCommand(key, null, value);
       return (V) invoker.invoke(buildCtx(), command);
    }
 
@@ -206,6 +209,40 @@
       throw new UnsupportedOperationException("Go away");
    }
 
+   public void putForExternalRead(K key, V value)
+   {
+      InvocationContext ctx = invocationContextContainer.get();
+      Transaction ongoingTransaction = null;
+      try
+      {
+         if (transactionManager != null && (ongoingTransaction = transactionManager.getTransaction()) != null)
+         {
+            transactionManager.suspend();
+         }
+
+         // if the node exists then this should be a no-op.
+         ctx.getOptionOverrides().setFailSilently(true);
+         ctx.getOptionOverrides().setForceAsynchronous(true);
+         ctx.getOptionOverrides().setLockAcquisitionTimeout(0);
+         replace(key, value);
+      }
+      catch (Exception e)
+      {
+         if (log.isDebugEnabled()) log.debug("Caught exception while doing putForExternalRead()", e);
+      }
+      finally
+      {
+         try
+         {
+            if (ongoingTransaction != null) transactionManager.resume(ongoingTransaction);
+         }
+         catch (Exception e)
+         {
+            log.debug("Had problems trying to resume a transaction after putForExternalread()", e);
+         }
+      }
+   }
+
    public void evict(K key)
    {
       EvictCommand command = commandsFactory.buildEvictCommand(key);

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java	2008-12-10 12:51:42 UTC (rev 7274)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java	2008-12-10 15:10:12 UTC (rev 7275)
@@ -79,7 +79,7 @@
    {
       notifier.notifyCacheEntryModified(key, true, ctx);
       MVCCEntry e = ctx.lookupEntry(key);
-      Object o = null;
+      Object o;
       if (value instanceof Delta)
       {
          // magic

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ReplaceCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ReplaceCommand.java	2008-12-10 12:51:42 UTC (rev 7274)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ReplaceCommand.java	2008-12-10 15:10:12 UTC (rev 7275)
@@ -57,7 +57,7 @@
    {
       MVCCEntry e = ctx.lookupEntry(key);
       if (e == null || e.isNullEntry()) return false;
-      if (oldValue.equals(e.getValue()))
+      if (oldValue == null || oldValue.equals(e.getValue()))
       {
          e.setValue(newValue);
          return true;

Modified: core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/read_committed/ReadCommittedLockTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/read_committed/ReadCommittedLockTest.java	2008-12-10 12:51:42 UTC (rev 7274)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/read_committed/ReadCommittedLockTest.java	2008-12-10 15:10:12 UTC (rev 7275)
@@ -14,7 +14,7 @@
       repeatableRead = false;
    }
 
-   public void testVisibilityOfCommittedData() throws Exception
+   public void testVisibilityOfCommittedDataPut() throws Exception
    {
       Cache c = threadLocal.get().cache;
       c.put("k", "v");
@@ -42,4 +42,33 @@
       assert "v2".equals(c.get("k")) : "Should read committed data";
       threadLocal.get().tm.commit();
    }
+
+   public void testVisibilityOfCommittedDataReplace() throws Exception
+   {
+      Cache c = threadLocal.get().cache;
+      c.put("k", "v");
+
+      assert "v".equals(c.get("k"));
+
+      // start a tx and read K
+      threadLocal.get().tm.begin();
+      assert "v".equals(c.get("k"));
+      assert "v".equals(c.get("k"));
+      Transaction reader = threadLocal.get().tm.suspend();
+
+      threadLocal.get().tm.begin();
+      c.replace("k", "v2");
+      Transaction writer = threadLocal.get().tm.suspend();
+
+      threadLocal.get().tm.resume(reader);
+      assert "v".equals(c.get("k")) : "Should not read uncommitted data";
+      reader = threadLocal.get().tm.suspend();
+
+      threadLocal.get().tm.resume(writer);
+      threadLocal.get().tm.commit();
+
+      threadLocal.get().tm.resume(reader);
+      assert "v2".equals(c.get("k")) : "Should read committed data";
+      threadLocal.get().tm.commit();
+   }
 }

Modified: core/branches/flat/src/test/java/org/jboss/starobrno/profiling/ProfileTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/profiling/ProfileTest.java	2008-12-10 12:51:42 UTC (rev 7274)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/profiling/ProfileTest.java	2008-12-10 15:10:12 UTC (rev 7275)
@@ -5,16 +5,14 @@
 import org.jboss.cache.lock.IsolationLevel;
 import org.jboss.starobrno.Cache;
 import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.profiling.testinternals.Generator;
+import org.jboss.starobrno.profiling.testinternals.TaskRunner;
 import org.jboss.starobrno.util.TestingUtil;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -44,7 +42,6 @@
    protected static final boolean USE_SLEEP = false; // throttle generation a bit
 
    private List<Object> keys = new ArrayList<Object>(MAX_OVERALL_KEYS);
-   private Random r = new Random();
 
    Log log = LogFactory.getLog(ProfileTest.class);
 
@@ -54,7 +51,7 @@
       Cache c = (Cache) cache;
       c.getConfiguration().setCacheMode(Configuration.CacheMode.LOCAL);
       c.getConfiguration().setConcurrencyLevel(2000);
-      c.getConfiguration().setIsolationLevel(IsolationLevel.READ_COMMITTED);
+      c.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
       runCompleteTest();
    }
 
@@ -80,8 +77,13 @@
       keys.clear();
       for (int i = 0; i < MAX_OVERALL_KEYS; i++)
       {
-         Object key = createRandomKey(r);
-         while (keys.contains(key)) key = createRandomKey(r);
+         Object key;
+         do
+         {
+            key = Generator.createRandomKey();
+         }
+         while (keys.contains(key));
+
          if (i % 10 == 0)
          {
             log.warn("Generated " + i + " keys");
@@ -93,12 +95,7 @@
       log.warn("Finished init() phase.  " + printDuration(duration));
    }
 
-   private Object createRandomKey(Random r)
-   {
-      return Integer.toHexString(r.nextInt(Integer.MAX_VALUE));
-   }
 
-
    protected void startup()
    {
       long startTime = System.currentTimeMillis();
@@ -111,7 +108,7 @@
    private void warmup() throws InterruptedException
    {
       long startTime = System.currentTimeMillis();
-      ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
+      TaskRunner exec = new TaskRunner(NUM_THREADS);
       log.warn("Starting warmup");
       // creates all the Fqns since this can be expensive and we don't really want to measure this (for now)
       for (final Object key : keys)
@@ -133,7 +130,7 @@
          {
             public void run()
             {
-               Object key = keys.get(r.nextInt(MAX_OVERALL_KEYS));
+               Object key = Generator.getRandomElement(keys);
                cache.get(key);
                cache.put(key, "Value");
                cache.remove(key);
@@ -141,8 +138,7 @@
          });
       }
 
-      exec.shutdown();
-      exec.awaitTermination(360, TimeUnit.SECONDS);
+      exec.stop();
 
       long duration = System.currentTimeMillis() - startTime;
       log.warn("Finished warmup.  " + printDuration(duration));
@@ -154,7 +150,7 @@
 
    private void doTest() throws Exception
    {
-      ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
+      TaskRunner exec = new TaskRunner(NUM_THREADS);
       log.warn("Starting test");
       int i;
       long print = NUM_OPERATIONS / 10;
@@ -187,11 +183,7 @@
       }
       log.warn("Finished generating runnables; awaiting executor completion");
       // wait for executors to complete!
-      exec.shutdown();
-      while (!exec.awaitTermination(((long) i), TimeUnit.SECONDS))
-      {
-         Thread.sleep(1);
-      }
+      exec.stop();
 
       // wait up to 1 sec for each call?
       long elapsedTimeNanos = System.nanoTime() - stElapsed;
@@ -229,12 +221,12 @@
 
       public void run()
       {
-         Object key = keys.get(r.nextInt(MAX_OVERALL_KEYS));
+         Object key = Generator.getRandomElement(keys);
          long d = 0, st = 0;
          switch (mode)
          {
             case PUT:
-               Object value = getRandomString();
+               Object value = Generator.getRandomString();
                st = System.nanoTime();
                cache.put(key, value);
                d = System.nanoTime() - st;
@@ -284,18 +276,6 @@
       }
    }
 
-   private String getRandomString()
-   {
-      StringBuilder sb = new StringBuilder();
-      int len = r.nextInt(10);
-
-      for (int i = 0; i < len; i++)
-      {
-         sb.append((char) (63 + r.nextInt(26)));
-      }
-      return sb.toString();
-   }
-
    protected String printDuration(long duration)
    {
       if (duration > 2000)

Modified: core/branches/flat/src/test/java/org/jboss/starobrno/profiling/TreeProfileTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/profiling/TreeProfileTest.java	2008-12-10 12:51:42 UTC (rev 7274)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/profiling/TreeProfileTest.java	2008-12-10 15:10:12 UTC (rev 7275)
@@ -60,7 +60,7 @@
       cfg.setConcurrencyLevel(2000);
       cfg.setLockAcquisitionTimeout(120000);
       cfg.setLockParentForChildInsertRemove(true);
-      cfg.setIsolationLevel(IsolationLevel.READ_COMMITTED);
+      cfg.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
       Cache c = new UnitTestCacheFactory().createCache(cfg);
       cache = new TreeCacheImpl(c);
    }

Modified: core/branches/flat/src/test/java/org/jboss/starobrno/profiling/testinternals/Generator.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/profiling/testinternals/Generator.java	2008-12-10 12:51:42 UTC (rev 7274)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/profiling/testinternals/Generator.java	2008-12-10 15:10:12 UTC (rev 7275)
@@ -33,4 +33,9 @@
       for (int i = 0; i < depth; i++) fqnElements.add(Integer.toHexString(r.nextInt(Integer.MAX_VALUE)));
       return Fqn.fromList(fqnElements, true);
    }
+
+   public static Object createRandomKey()
+   {
+      return Integer.toHexString(r.nextInt(Integer.MAX_VALUE));
+   }
 }




More information about the jbosscache-commits mailing list