[jbosscache-commits] JBoss Cache SVN: r5441 - in core/trunk/src: test/java/org/jboss/cache/optimistic and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Mar 17 14:29:38 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-03-17 14:29:37 -0400 (Mon, 17 Mar 2008)
New Revision: 5441

Modified:
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java
   core/trunk/src/test/java/org/jboss/cache/optimistic/AbstractOptimisticTestCase.java
   core/trunk/src/test/java/org/jboss/cache/optimistic/ThreadedCacheAccessTest.java
Log:
JBCACHE-1309 -  Unguarded adding of nodes to workspace

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java	2008-03-17 17:21:12 UTC (rev 5440)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java	2008-03-17 18:29:37 UTC (rev 5441)
@@ -11,6 +11,7 @@
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.NodeFactory;
 import org.jboss.cache.NodeSPI;
+import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.notifications.Notifier;
 import org.jboss.cache.optimistic.DataVersion;
@@ -39,6 +40,7 @@
     * NodeSPI objects in the underlying data structure.
     */
    private NodeFactory nodeFactory;
+   private long lockAcquisitionTimeout;
 
    public OptimisticCreateIfNotExistsInterceptor()
    {
@@ -46,9 +48,10 @@
    }
 
    @Inject
-   private void injectDependencies(NodeFactory nodeFactory)
+   private void injectDependencies(NodeFactory nodeFactory, Configuration cfg)
    {
       this.nodeFactory = nodeFactory;
+      this.lockAcquisitionTimeout = cfg.getLockAcquisitionTimeout();
    }
 
    @Override
@@ -111,7 +114,7 @@
    protected Object handleMoveMethod(InvocationContext ctx, Fqn from, Fqn to) throws Throwable
    {
       List<Fqn> fqns = new ArrayList<Fqn>();
-      fqns.add((Fqn) to);
+      fqns.add(to);
 
       //  peek into Node and get a hold of all child fqns as these need to be in the workspace.
       NodeSPI node = peekNode(ctx, (Fqn) from, false, true, true);
@@ -165,7 +168,8 @@
       if (workspaceNode == null)
       {
          NodeSPI node = cache.getRoot();
-         workspaceNode = nodeFactory.createWorkspaceNode(node, workspace);
+         workspaceNode = lockAndCreateWorkspaceNode(nodeFactory, node, workspace, gtx, lockAcquisitionTimeout);
+         //workspaceNode = nodeFactory.createWorkspaceNode(node, workspace);
          workspace.addNode(workspaceNode);
          log.debug("Created root node in workspace.");
       }
@@ -227,7 +231,8 @@
             NodeSPI newUnderlyingChildNode = workspaceNode.createChild(childName, workspaceNode.getNode(), cache, versionToPassIn);
 
             // now assign "workspaceNode" to the new child created.
-            workspaceNode = nodeFactory.createWorkspaceNode(newUnderlyingChildNode, workspace);
+//            workspaceNode = nodeFactory.createWorkspaceNode(newUnderlyingChildNode, workspace);
+            workspaceNode = lockAndCreateWorkspaceNode(nodeFactory, newUnderlyingChildNode, workspace, gtx, lockAcquisitionTimeout);
             workspaceNode.setVersioningImplicit(versionToPassIn == null || !isTargetFqn);
             if (trace)
                log.trace("setting versioning of " + workspaceNode.getFqn() + " to be " + (workspaceNode.isVersioningImplicit() ? "implicit" : "explicit"));
@@ -249,7 +254,8 @@
                if (trace)
                   log.trace("Child node " + currentNode.getFqn() + " doesn't exist in workspace or has been deleted.  Adding to workspace in gtx " + gtx);
 
-               workspaceNode = nodeFactory.createWorkspaceNode(currentNode, workspace);
+//               workspaceNode = nodeFactory.createWorkspaceNode(currentNode, workspace);
+               workspaceNode = lockAndCreateWorkspaceNode(nodeFactory, currentNode, workspace, gtx, lockAcquisitionTimeout);
 
                // if the underlying node is a tombstone then mark the workspace node as newly created
                if (!currentNode.isValid()) workspaceNode.markAsCreated();

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java	2008-03-17 17:21:12 UTC (rev 5440)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java	2008-03-17 18:29:37 UTC (rev 5441)
@@ -9,8 +9,10 @@
 import org.jboss.cache.CacheException;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.InvocationContext;
+import org.jboss.cache.NodeFactory;
 import org.jboss.cache.NodeSPI;
 import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.lock.TimeoutException;
 import org.jboss.cache.optimistic.TransactionWorkspace;
 import org.jboss.cache.optimistic.WorkspaceNode;
 import org.jboss.cache.transaction.GlobalTransaction;
@@ -102,5 +104,23 @@
       nodeToUndelete.markAsResurrected(true);
    }
 
+   protected WorkspaceNode lockAndCreateWorkspaceNode(NodeFactory nodeFactory, NodeSPI node, TransactionWorkspace workspace, GlobalTransaction gtx, long timeout)
+   {
+      boolean locked = false;
+      try
+      {
+         locked = node.getLock().acquireWriteLock(gtx, timeout);
+      }
+      catch (InterruptedException e)
+      {
+         // do nothing
+      }
+      if (!locked)
+         throw new TimeoutException("Unable to lock node " + node.getFqn() + " after timeout " + timeout + " for copying into workspace");
+      WorkspaceNode wn = nodeFactory.createWorkspaceNode(node, workspace);
+      node.getLock().release(gtx);
+      return wn;
+   }
 
+
 }

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java	2008-03-17 17:21:12 UTC (rev 5440)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java	2008-03-17 18:29:37 UTC (rev 5441)
@@ -7,6 +7,7 @@
 package org.jboss.cache.interceptors;
 
 import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.NodeFactory;
@@ -43,6 +44,7 @@
     */
    private NodeFactory nodeFactory;
    private Notifier notifier;
+   private long lockAcquisitionTimeout;
 
    @Inject
    protected void injectDependencies(Notifier notifier, NodeFactory nodeFactory)
@@ -51,6 +53,13 @@
       this.nodeFactory = nodeFactory;
    }
 
+   @Override
+   public void setCache(CacheSPI cache)
+   {
+      super.setCache(cache);
+      lockAcquisitionTimeout = cache.getConfiguration().getLockAcquisitionTimeout();
+   }
+
    public OptimisticNodeInterceptor()
    {
       initLogger();
@@ -529,10 +538,9 @@
       {
          NodeSPI node = peekNode(ctx, fqn, false, true, includeInvalidNodes);
          if (node == null) return null;
+         GlobalTransaction gtx = ctx.getGlobalTransaction();
+         workspaceNode = lockAndCreateWorkspaceNode(nodeFactory, node, workspace, gtx, lockAcquisitionTimeout);
 
-         // create new workspace node based on the node from the underlying data structure
-         workspaceNode = nodeFactory.createWorkspaceNode(node, workspace);
-
          // and add the node to the workspace.
          workspace.addNode(workspaceNode);
       }

Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/AbstractOptimisticTestCase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/AbstractOptimisticTestCase.java	2008-03-17 17:21:12 UTC (rev 5440)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/AbstractOptimisticTestCase.java	2008-03-17 18:29:37 UTC (rev 5441)
@@ -236,11 +236,10 @@
       return cache;
    }
 
-   protected Random random;
+   protected Random random = new Random();
 
    protected void randomSleep(int min, int max)
    {
-      if (random == null) random = new Random();
       long l = -1;
       while (l < min) l = random.nextInt(max);
       TestingUtil.sleepThread(l);

Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/ThreadedCacheAccessTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/ThreadedCacheAccessTest.java	2008-03-17 17:21:12 UTC (rev 5440)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/ThreadedCacheAccessTest.java	2008-03-17 18:29:37 UTC (rev 5441)
@@ -9,8 +9,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.CacheSPI;
-import org.jboss.cache.Fqn;
-import static org.testng.AssertJUnit.assertTrue;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
@@ -26,20 +24,18 @@
 {
    private static final Log log = LogFactory.getLog(ThreadedCacheAccessTest.class);
    // 5 concurrent threads.
-   private final int numThreads = 5;
+   private final int numThreads = 10;
    // how many times each thread loops
-   private final int numLoopsPerThread = 25;
+   private final int numLoopsPerThread = 1000;
    // write frequency.  1 in writeFrequency loops will do a put().
    private final int writeFrequency = 5;
-   // random sleep params (ms)
-   private final int minSleep = 0, maxSleep = 100;
 
-   private final Fqn fqn = Fqn.fromString("/a/b");
    private final String key = "key", value = "value";
 
    private CacheSPI cache;
    private WorkerThread[] threads;
 
+   @Override
    @AfterMethod(alwaysRun = true)
    public void tearDown()
    {
@@ -62,34 +58,38 @@
          threads[i].start();
       }
 
-      for (int i = 0; i < numThreads; i++)
+      for (WorkerThread t : threads)
       {
-         threads[i].join();
+         t.join();
+         if (t.e != null) throw t.e;
       }
-
-      // test results.
-      for (int i = 0; i < numThreads; i++)
-      {
-         assertTrue("Thread threw an exception!", threads[i].success);
-      }
    }
 
    public class WorkerThread extends Thread
    {
-      public boolean success = true;
+      Exception e = null;
 
+      public WorkerThread()
+      {
+         setDaemon(true);
+      }
+
+      @Override
       public void run()
       {
          log.debug(getName() + " starting up ... ");
+
          for (int j = 0; j < numLoopsPerThread; j++)
          {
             TransactionManager tm = cache.getTransactionManager();
+            boolean write = j % writeFrequency == 0;
+
             try
             {
                tm.begin();
                // read something from the cache - it should be in it's own thread.
                cache.get(fqn, key);
-               if (j % writeFrequency == 0)
+               if (write)
                {
                   cache.put(fqn, key, value + j);
                }
@@ -97,20 +97,21 @@
             }
             catch (Exception e)
             {
-               log.error("Caught Exception!", e);
-               assertTrue("Caught Exception!", false);
-               success = false;
+               if (!write) // writes could fail from a perfectly acceptable data versioning exception
+               {
+                  this.e = e;
+               }
+
                try
                {
-                  tm.rollback();
+                  if (tm.getTransaction() != null) tm.rollback();
                }
                catch (Exception e2)
                {
                   log.error("Rollback failed!", e2);
                }
-               break;
+               if (!write) break;
             }
-            randomSleep(minSleep, maxSleep);
          }
       }
    }




More information about the jbosscache-commits mailing list