[infinispan-commits] Infinispan SVN: r1159 - in trunk/lucene-directory: src/main/java/org/infinispan/lucene and 2 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Nov 12 15:36:25 EST 2009


Author: sannegrinovero
Date: 2009-11-12 15:36:24 -0500 (Thu, 12 Nov 2009)
New Revision: 1159

Added:
   trunk/lucene-directory/src/test/java/org/infinispan/lucene/DynamicClusterIndexStressTest.java
   trunk/lucene-directory/src/test/java/org/infinispan/lucene/testutils/
   trunk/lucene-directory/src/test/java/org/infinispan/lucene/testutils/ClusteredCacheFactory.java
Modified:
   trunk/lucene-directory/pom.xml
   trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
   trunk/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java
   trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryStressTest.java
Log:
improving tests for Lucene Directory

Modified: trunk/lucene-directory/pom.xml
===================================================================
--- trunk/lucene-directory/pom.xml	2009-11-12 19:35:56 UTC (rev 1158)
+++ trunk/lucene-directory/pom.xml	2009-11-12 20:36:24 UTC (rev 1159)
@@ -1,7 +1,6 @@
 <?xml version="1.0"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
    <modelVersion>4.0.0</modelVersion>
    <parent>
@@ -41,4 +40,19 @@
       </dependency>
    </dependencies>
 
+   <build>
+      <plugins>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <version>2.4.3</version>
+            <configuration>
+               <excludes>
+                  <exclude>**/*StressTest.java</exclude>
+               </excludes>
+            </configuration>
+         </plugin>
+      </plugins>
+   </build>
+
 </project>

Modified: trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java	2009-11-12 19:35:56 UTC (rev 1158)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java	2009-11-12 20:36:24 UTC (rev 1159)
@@ -23,7 +23,6 @@
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.lucene.store.AlreadyClosedException;

Modified: trunk/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java	2009-11-12 19:35:56 UTC (rev 1158)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java	2009-11-12 20:36:24 UTC (rev 1159)
@@ -53,11 +53,14 @@
    
    public static Configuration createTestConfiguration() {
       Configuration c = new Configuration();
-      c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      c.setCacheMode(Configuration.CacheMode.DIST_SYNC);
       c.setSyncReplTimeout(10000);
       c.setLockAcquisitionTimeout(10000);
       c.setUseLockStriping(false);
       c.setSyncCommitPhase(true);
+      c.setL1CacheEnabled(true);
+      c.setExposeJmxStatistics(false);
+      c.setUseEagerLocking(false);
       c.setSyncRollbackPhase(true);
       c.setTransactionManagerLookupClass(JBossStandaloneJTAManagerLookup.class.getName());
       c.setDeadlockDetectionSpinDuration( 10000 );

Added: trunk/lucene-directory/src/test/java/org/infinispan/lucene/DynamicClusterIndexStressTest.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/DynamicClusterIndexStressTest.java	                        (rev 0)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/DynamicClusterIndexStressTest.java	2009-11-12 20:36:24 UTC (rev 1159)
@@ -0,0 +1,254 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.SimpleAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.IndexWriter.MaxFieldLength;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.infinispan.Cache;
+import org.infinispan.lucene.testutils.ClusteredCacheFactory;
+import org.infinispan.test.TestingUtil;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * <p>
+ * DynamicClusterIndexStressTest verifies the index state is healthy while the cluster topology changes.
+ * While doing rehashing of the cluster in background it makes changes and searches, verifying the
+ * search results.
+ * </p>
+ * Two pools of strings are maintained: one containing strings which are guaranteed to be found in the index,
+ * and one containing the strings which are guaranteed to not be found in the index.
+ * Threads take from these pools, check the index state towards them, then write/delete them from
+ * index, commit and move them to the appropriate pool to check again or have another thread check them.
+ * Different threads own different Directory instances - linked by Infinispan - while the pools are BlockingDeques
+ * shared by reference.
+ * </p>
+ * 
+ * Run with -Dbind.address=127.0.0.1 -Djava.net.preferIPv4Stack=true
+ * 
+ * @author Sanne Grinovero
+ * @since 4.0
+ */
+ at Test(groups = "profiling", testName = "lucene.DynamicClusterIndexStressTest")
+public class DynamicClusterIndexStressTest {
+
+   private static final int TOTAL_NODES_TO_CREATE = 1000;
+   private static final int NODE_EXISTING_MILLISECONDS = 60000;
+   private static final int TIME_BETWEEN_NODE_CREATIONS = 1000;
+   private static final int CONCURRENCY_LIMIT = 10;
+   private static final int STRING_POOL_SIZE = 1000;
+   private static final Analyzer anyAnalyzer = new SimpleAnalyzer();
+
+   private final BlockingDeque<String> stringsInIndex = new LinkedBlockingDeque<String>();
+   private final BlockingDeque<String> stringsOutOfIndex = new LinkedBlockingDeque<String>();
+   private final ClusteredCacheFactory cacheFactory = new ClusteredCacheFactory(CacheTestSupport.createTestConfiguration());
+
+   private volatile boolean failed = false;
+   private volatile String failureMessage = "";
+
+//   @Test
+//   public void periodicallyAddingANode() throws InterruptedException, LockObtainFailedException, IOException {
+//      for (int i = 0; i < STRING_POOL_SIZE; i++) {
+//         stringsOutOfIndex.add(String.valueOf(i));
+//      }
+//      Cache<CacheKey, Object> cache = cacheFactory.createClusteredCache();
+//      try {
+//         createIndex(cache);
+//         runMoreNodes();
+//      } finally {
+//         cleanup(cache);
+//      }
+//   }
+
+   /**
+    * Initialize the empty index
+    * 
+    * @param cache to use to contain the index
+    * @throws CorruptIndexException
+    * @throws LockObtainFailedException
+    * @throws IOException
+    */
+   private void createIndex(Cache<CacheKey, Object> cache) throws CorruptIndexException, LockObtainFailedException, IOException {
+      InfinispanDirectory directory = new InfinispanDirectory(cache, "indexName");
+      IndexWriter iwriter = new IndexWriter(directory, anyAnalyzer, true, MaxFieldLength.UNLIMITED);
+      iwriter.commit();
+      iwriter.close();
+      IndexSearcher searcher = new IndexSearcher(directory);
+      searcher.close();
+      System.out.println("Index created by " + buildName(cache));
+      // verify it can be reopened:
+      InfinispanDirectory directory2 = new InfinispanDirectory(cache, "indexName");
+      IndexSearcher searcher2 = new IndexSearcher(directory2);
+      searcher2.close();
+   }
+
+   private void runMoreNodes() throws InterruptedException {
+      ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY_LIMIT);
+      for (int i = 0; !failed && (i < TOTAL_NODES_TO_CREATE); i++) {
+         executor.execute(new LuceneUserThread());
+         Thread.sleep(TIME_BETWEEN_NODE_CREATIONS);
+      }
+      executor.shutdown();
+      executor.awaitTermination(1, TimeUnit.HOURS);
+      Assert.assertTrue(!failed, failureMessage);
+   }
+   
+   @Test
+   public void testWihtoutRehashing() throws InterruptedException{
+      ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY_LIMIT);
+      Cache[] caches = new Cache[CONCURRENCY_LIMIT];
+      for (int i=0; i<CONCURRENCY_LIMIT; i++){
+         caches[i] = cacheFactory.createClusteredCacheWaitingForNodesView(i+1);
+      }
+      //TODO run the tests
+      //FIXME: some NPEs are logged here during cleanup
+      for (int i=0; i<CONCURRENCY_LIMIT; i++){
+//         Thread.sleep(1000); Enabling sleep here "fixes" the NPEs
+         cleanup(caches[i]);
+      }
+   }
+   
+   private static String buildName(Cache cache) {
+      return Thread.currentThread().getName() +  "[" + cache.getCacheManager().getAddress().toString() + "]";
+   }
+
+   private class LuceneUserThread implements Runnable {
+
+      private Directory directory;
+
+      @Override
+      public void run() {
+         if (failed)
+            return;
+         String name = "";
+         try {
+            Cache<CacheKey, Object> cache = cacheFactory.createClusteredCache();
+            try {
+               name = buildName(cache);
+               directory = new InfinispanDirectory(cache, "indexName");
+               System.out.println("Created Directory in " + name);
+               try {
+                  runTest();
+               } catch (Exception e) {
+                  System.out.println("Error in " + name);
+                  e.printStackTrace();
+                  failed = true;
+                  failureMessage = e.getMessage();
+               }
+            } finally {
+               cleanup(cache);
+            }
+         } catch (InterruptedException e) {
+            failed = true;
+            failureMessage = e.getMessage();
+         } finally {
+            System.out.println("Leaving thread " + name);
+         }
+      }
+
+      private void runTest() throws CorruptIndexException, IOException {
+         long finishTime = System.currentTimeMillis() + NODE_EXISTING_MILLISECONDS;
+         while (!failed && System.currentTimeMillis() < finishTime) {
+            verifyStringsExistInIndex();
+            // verifyStringsNotExistInIndex(); //TODO
+            addSomeStrings();
+            // deleteSomeStrings(); //TODO
+         }
+      }
+
+      private void addSomeStrings() throws CorruptIndexException, LockObtainFailedException, IOException {
+         Set<String> strings = new HashSet<String>();
+         stringsOutOfIndex.drainTo(strings, 5);
+         IndexWriter iwriter = new IndexWriter(directory, anyAnalyzer, false, MaxFieldLength.UNLIMITED);
+         for (String term : strings) {
+            Document doc = new Document();
+            doc.add(new Field("main", term, Store.NO, Index.NOT_ANALYZED));
+            iwriter.addDocument(doc);
+         }
+         iwriter.commit();
+         stringsInIndex.addAll(strings);
+         iwriter.close();
+      }
+
+      private void verifyStringsExistInIndex() throws CorruptIndexException, IOException {
+         // take ownership of some strings, so that no other thread will change status for them:
+         Set<String> strings = new HashSet<String>();
+         stringsInIndex.drainTo(strings, 50);
+         IndexSearcher searcher = new IndexSearcher(directory);
+         for (String term : strings) {
+            Query query = new TermQuery(new Term("main", term));
+            TopDocs docs = searcher.search(query, null, 1);
+            if (docs.totalHits != 1) {
+               failed = true;
+               failureMessage = "couldn't find expected term in index";
+            }
+         }
+         // put the strings back at their place:
+         stringsInIndex.addAll(strings);
+      }
+
+   }
+
+   @BeforeClass
+   public void beforeTest() {
+      cacheFactory.start();
+   }
+
+   @AfterClass
+   public void afterTest() {
+      cacheFactory.stop();
+   }
+   
+   private static void cleanup(Cache<CacheKey, Object> cache) {
+      try {
+         TestingUtil.killCaches(cache);
+      } finally {
+         TestingUtil.killCacheManagers(cache.getCacheManager());
+      }
+   }
+
+}


Property changes on: trunk/lucene-directory/src/test/java/org/infinispan/lucene/DynamicClusterIndexStressTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryStressTest.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryStressTest.java	2009-11-12 19:35:56 UTC (rev 1158)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryStressTest.java	2009-11-12 20:36:24 UTC (rev 1159)
@@ -42,6 +42,7 @@
 import org.testng.annotations.Test;
 
 /**
+ * Basic stress test: one thread writes, some other read.
  * @author Lukasz Moren
  * @author Sanne Grinovero
  */
@@ -82,14 +83,17 @@
       final CountDownLatch latch = new CountDownLatch(1);
       List<InfinispanDirectoryThread> threads = new ArrayList<InfinispanDirectoryThread>();
       Cache<CacheKey, Object> cache = CacheTestSupport.createTestCacheManager().getCache();
-      Cache<CacheKey, Object> cache2 = CacheTestSupport.createTestCacheManager().getCache(); // dummy cache, to force replication
       Directory directory1 = new InfinispanDirectory(cache, "indexName");
-      Directory directory2 = new InfinispanDirectory(cache2, "indexName");
 
       IndexWriter.MaxFieldLength fieldLength = new IndexWriter.MaxFieldLength(IndexWriter.DEFAULT_MAX_FIELD_LENGTH);
       IndexWriter iw = new IndexWriter(directory1, new StandardAnalyzer(), true, fieldLength);
       iw.close();
 
+      // second cache joins after index creation: tests proper configuration
+      Cache<CacheKey, Object> cache2 = CacheTestSupport.createTestCacheManager().getCache(); // dummy cache, to force replication
+      Directory directory2 = new InfinispanDirectory(cache2, "indexName");
+      Thread.sleep(3000);
+
       // create first writing thread
       InfinispanDirectoryThread tr = new InfinispanDirectoryThread(latch, directory1, true);
       threads.add(tr);

Added: trunk/lucene-directory/src/test/java/org/infinispan/lucene/testutils/ClusteredCacheFactory.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/testutils/ClusteredCacheFactory.java	                        (rev 0)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/testutils/ClusteredCacheFactory.java	2009-11-12 20:36:24 UTC (rev 1159)
@@ -0,0 +1,116 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.testutils;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.lucene.CacheKey;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+
+/**
+ * CacheFactory. This is currently needed as a workaround for ISPN-261 : Cachemanager instantiated in different threads in same JVM don't interact
+ * 
+ * @author Sanne Grinovero
+ * @since 4.0
+ */
+public class ClusteredCacheFactory {
+
+   private final BlockingQueue<Configuration> requests = new SynchronousQueue<Configuration>();
+   private final BlockingQueue<Cache<CacheKey, Object>> results = new SynchronousQueue<Cache<CacheKey, Object>>();
+   private final ExecutorService executor = Executors.newFixedThreadPool(1);
+   private final Configuration cfg;
+   private boolean started = false;
+   private boolean stopped = false;
+
+   /**
+    * Create a new ClusteredCacheFactory.
+    * 
+    * @param cfg defines the configuration used to build the caches
+    */
+   public ClusteredCacheFactory(Configuration cfg) {
+      this.cfg = cfg;
+   }
+
+   /**
+    * Create a cache using default configuration 
+    * @return
+    * @throws InterruptedException
+    */
+   public synchronized Cache<CacheKey, Object> createClusteredCache() throws InterruptedException {
+      if (!started)
+         throw new IllegalStateException("was not started");
+      if (stopped)
+         throw new IllegalStateException("was already stopped");
+      requests.put(cfg);
+      return results.take();
+   }
+   
+   public Cache<CacheKey, Object> createClusteredCacheWaitingForNodesView(int expectedGroupSize) throws InterruptedException {
+      Cache<CacheKey, Object> cache = createClusteredCache();
+      TestingUtil.blockUntilViewReceived(cache, expectedGroupSize, 10000, false);
+      return cache;
+   }
+   
+   public synchronized void start() {
+      if (started)
+         throw new IllegalStateException("was already started");
+      if (stopped)
+         throw new IllegalStateException("was already stopped");
+      started = true;
+      executor.execute(new Worker());
+   }
+
+   public synchronized void stop() {
+      if (stopped)
+         throw new IllegalStateException("was already stopped");
+      if (!started)
+         throw new IllegalStateException("was not started");
+      stopped = true;
+      executor.shutdownNow();
+   }
+
+   private class Worker implements Runnable {
+
+      @Override
+      public void run() {
+         while (true) {
+            try {
+               Configuration configuration = requests.take();
+               CacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(configuration);
+               Cache<CacheKey, Object> cache = cacheManager.getCache();
+               results.put(cache);
+            } catch (InterruptedException e) {
+               return;
+            }
+         }
+      }
+
+   }
+
+}


Property changes on: trunk/lucene-directory/src/test/java/org/infinispan/lucene/testutils/ClusteredCacheFactory.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF



More information about the infinispan-commits mailing list