[infinispan-commits] Infinispan SVN: r1159 - in trunk/lucene-directory: src/main/java/org/infinispan/lucene and 2 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Nov 12 15:36:25 EST 2009
Author: sannegrinovero
Date: 2009-11-12 15:36:24 -0500 (Thu, 12 Nov 2009)
New Revision: 1159
Added:
trunk/lucene-directory/src/test/java/org/infinispan/lucene/DynamicClusterIndexStressTest.java
trunk/lucene-directory/src/test/java/org/infinispan/lucene/testutils/
trunk/lucene-directory/src/test/java/org/infinispan/lucene/testutils/ClusteredCacheFactory.java
Modified:
trunk/lucene-directory/pom.xml
trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
trunk/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java
trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryStressTest.java
Log:
improving tests for Lucene Directory
Modified: trunk/lucene-directory/pom.xml
===================================================================
--- trunk/lucene-directory/pom.xml 2009-11-12 19:35:56 UTC (rev 1158)
+++ trunk/lucene-directory/pom.xml 2009-11-12 20:36:24 UTC (rev 1159)
@@ -1,7 +1,6 @@
<?xml version="1.0"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -41,4 +40,19 @@
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.4.3</version>
+ <configuration>
+ <excludes>
+ <exclude>**/*StressTest.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
Modified: trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java 2009-11-12 19:35:56 UTC (rev 1158)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java 2009-11-12 20:36:24 UTC (rev 1159)
@@ -23,7 +23,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.HashSet;
import java.util.Set;
import org.apache.lucene.store.AlreadyClosedException;
Modified: trunk/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java 2009-11-12 19:35:56 UTC (rev 1158)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java 2009-11-12 20:36:24 UTC (rev 1159)
@@ -53,11 +53,14 @@
public static Configuration createTestConfiguration() {
Configuration c = new Configuration();
- c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ c.setCacheMode(Configuration.CacheMode.DIST_SYNC);
c.setSyncReplTimeout(10000);
c.setLockAcquisitionTimeout(10000);
c.setUseLockStriping(false);
c.setSyncCommitPhase(true);
+ c.setL1CacheEnabled(true);
+ c.setExposeJmxStatistics(false);
+ c.setUseEagerLocking(false);
c.setSyncRollbackPhase(true);
c.setTransactionManagerLookupClass(JBossStandaloneJTAManagerLookup.class.getName());
c.setDeadlockDetectionSpinDuration( 10000 );
Added: trunk/lucene-directory/src/test/java/org/infinispan/lucene/DynamicClusterIndexStressTest.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/DynamicClusterIndexStressTest.java (rev 0)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/DynamicClusterIndexStressTest.java 2009-11-12 20:36:24 UTC (rev 1159)
@@ -0,0 +1,254 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.SimpleAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.IndexWriter.MaxFieldLength;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.infinispan.Cache;
+import org.infinispan.lucene.testutils.ClusteredCacheFactory;
+import org.infinispan.test.TestingUtil;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * <p>
+ * DynamicClusterIndexStressTest verifies the index state is healthy while the cluster topology changes.
+ * While doing rehashing of the cluster in background it makes changes and searches, verifying the
+ * search results.
+ * </p>
+ * Two pools of strings are maintained: one containing strings which are guaranteed to be found in the index,
+ * and one containing the strings which are guaranteed to not be found in the index.
+ * Threads take from these pools, check the index state towards them, then write/delete them from
+ * index, commit and move them to the appropriate pool to check again or have another thread check them.
+ * Different threads own different Directory instances - linked by Infinispan - while the pools are BlockingDeques
+ * shared by reference.
+ * </p>
+ *
+ * Run with -Dbind.address=127.0.0.1 -Djava.net.preferIPv4Stack=true
+ *
+ * @author Sanne Grinovero
+ * @since 4.0
+ */
+ at Test(groups = "profiling", testName = "lucene.DynamicClusterIndexStressTest")
+public class DynamicClusterIndexStressTest {
+
+ private static final int TOTAL_NODES_TO_CREATE = 1000;
+ private static final int NODE_EXISTING_MILLISECONDS = 60000;
+ private static final int TIME_BETWEEN_NODE_CREATIONS = 1000;
+ private static final int CONCURRENCY_LIMIT = 10;
+ private static final int STRING_POOL_SIZE = 1000;
+ private static final Analyzer anyAnalyzer = new SimpleAnalyzer();
+
+ private final BlockingDeque<String> stringsInIndex = new LinkedBlockingDeque<String>();
+ private final BlockingDeque<String> stringsOutOfIndex = new LinkedBlockingDeque<String>();
+ private final ClusteredCacheFactory cacheFactory = new ClusteredCacheFactory(CacheTestSupport.createTestConfiguration());
+
+ private volatile boolean failed = false;
+ private volatile String failureMessage = "";
+
+// @Test
+// public void periodicallyAddingANode() throws InterruptedException, LockObtainFailedException, IOException {
+// for (int i = 0; i < STRING_POOL_SIZE; i++) {
+// stringsOutOfIndex.add(String.valueOf(i));
+// }
+// Cache<CacheKey, Object> cache = cacheFactory.createClusteredCache();
+// try {
+// createIndex(cache);
+// runMoreNodes();
+// } finally {
+// cleanup(cache);
+// }
+// }
+
+ /**
+ * Initialize the empty index
+ *
+ * @param cache to use to contain the index
+ * @throws CorruptIndexException
+ * @throws LockObtainFailedException
+ * @throws IOException
+ */
+ private void createIndex(Cache<CacheKey, Object> cache) throws CorruptIndexException, LockObtainFailedException, IOException {
+ InfinispanDirectory directory = new InfinispanDirectory(cache, "indexName");
+ IndexWriter iwriter = new IndexWriter(directory, anyAnalyzer, true, MaxFieldLength.UNLIMITED);
+ iwriter.commit();
+ iwriter.close();
+ IndexSearcher searcher = new IndexSearcher(directory);
+ searcher.close();
+ System.out.println("Index created by " + buildName(cache));
+ // verify it can be reopened:
+ InfinispanDirectory directory2 = new InfinispanDirectory(cache, "indexName");
+ IndexSearcher searcher2 = new IndexSearcher(directory2);
+ searcher2.close();
+ }
+
+ private void runMoreNodes() throws InterruptedException {
+ ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY_LIMIT);
+ for (int i = 0; !failed && (i < TOTAL_NODES_TO_CREATE); i++) {
+ executor.execute(new LuceneUserThread());
+ Thread.sleep(TIME_BETWEEN_NODE_CREATIONS);
+ }
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.HOURS);
+ Assert.assertTrue(!failed, failureMessage);
+ }
+
+ @Test
+ public void testWihtoutRehashing() throws InterruptedException{
+ ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY_LIMIT);
+ Cache[] caches = new Cache[CONCURRENCY_LIMIT];
+ for (int i=0; i<CONCURRENCY_LIMIT; i++){
+ caches[i] = cacheFactory.createClusteredCacheWaitingForNodesView(i+1);
+ }
+ //TODO run the tests
+ //FIXME: some NPEs are logged here during cleanup
+ for (int i=0; i<CONCURRENCY_LIMIT; i++){
+// Thread.sleep(1000); Enabling sleep here "fixes" the NPEs
+ cleanup(caches[i]);
+ }
+ }
+
+ private static String buildName(Cache cache) {
+ return Thread.currentThread().getName() + "[" + cache.getCacheManager().getAddress().toString() + "]";
+ }
+
+ private class LuceneUserThread implements Runnable {
+
+ private Directory directory;
+
+ @Override
+ public void run() {
+ if (failed)
+ return;
+ String name = "";
+ try {
+ Cache<CacheKey, Object> cache = cacheFactory.createClusteredCache();
+ try {
+ name = buildName(cache);
+ directory = new InfinispanDirectory(cache, "indexName");
+ System.out.println("Created Directory in " + name);
+ try {
+ runTest();
+ } catch (Exception e) {
+ System.out.println("Error in " + name);
+ e.printStackTrace();
+ failed = true;
+ failureMessage = e.getMessage();
+ }
+ } finally {
+ cleanup(cache);
+ }
+ } catch (InterruptedException e) {
+ failed = true;
+ failureMessage = e.getMessage();
+ } finally {
+ System.out.println("Leaving thread " + name);
+ }
+ }
+
+ private void runTest() throws CorruptIndexException, IOException {
+ long finishTime = System.currentTimeMillis() + NODE_EXISTING_MILLISECONDS;
+ while (!failed && System.currentTimeMillis() < finishTime) {
+ verifyStringsExistInIndex();
+ // verifyStringsNotExistInIndex(); //TODO
+ addSomeStrings();
+ // deleteSomeStrings(); //TODO
+ }
+ }
+
+ private void addSomeStrings() throws CorruptIndexException, LockObtainFailedException, IOException {
+ Set<String> strings = new HashSet<String>();
+ stringsOutOfIndex.drainTo(strings, 5);
+ IndexWriter iwriter = new IndexWriter(directory, anyAnalyzer, false, MaxFieldLength.UNLIMITED);
+ for (String term : strings) {
+ Document doc = new Document();
+ doc.add(new Field("main", term, Store.NO, Index.NOT_ANALYZED));
+ iwriter.addDocument(doc);
+ }
+ iwriter.commit();
+ stringsInIndex.addAll(strings);
+ iwriter.close();
+ }
+
+ private void verifyStringsExistInIndex() throws CorruptIndexException, IOException {
+ // take ownership of some strings, so that no other thread will change status for them:
+ Set<String> strings = new HashSet<String>();
+ stringsInIndex.drainTo(strings, 50);
+ IndexSearcher searcher = new IndexSearcher(directory);
+ for (String term : strings) {
+ Query query = new TermQuery(new Term("main", term));
+ TopDocs docs = searcher.search(query, null, 1);
+ if (docs.totalHits != 1) {
+ failed = true;
+ failureMessage = "couldn't find expected term in index";
+ }
+ }
+ // put the strings back at their place:
+ stringsInIndex.addAll(strings);
+ }
+
+ }
+
+ @BeforeClass
+ public void beforeTest() {
+ cacheFactory.start();
+ }
+
+ @AfterClass
+ public void afterTest() {
+ cacheFactory.stop();
+ }
+
+ private static void cleanup(Cache<CacheKey, Object> cache) {
+ try {
+ TestingUtil.killCaches(cache);
+ } finally {
+ TestingUtil.killCacheManagers(cache.getCacheManager());
+ }
+ }
+
+}
Property changes on: trunk/lucene-directory/src/test/java/org/infinispan/lucene/DynamicClusterIndexStressTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryStressTest.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryStressTest.java 2009-11-12 19:35:56 UTC (rev 1158)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryStressTest.java 2009-11-12 20:36:24 UTC (rev 1159)
@@ -42,6 +42,7 @@
import org.testng.annotations.Test;
/**
+ * Basic stress test: one thread writes, some other read.
* @author Lukasz Moren
* @author Sanne Grinovero
*/
@@ -82,14 +83,17 @@
final CountDownLatch latch = new CountDownLatch(1);
List<InfinispanDirectoryThread> threads = new ArrayList<InfinispanDirectoryThread>();
Cache<CacheKey, Object> cache = CacheTestSupport.createTestCacheManager().getCache();
- Cache<CacheKey, Object> cache2 = CacheTestSupport.createTestCacheManager().getCache(); // dummy cache, to force replication
Directory directory1 = new InfinispanDirectory(cache, "indexName");
- Directory directory2 = new InfinispanDirectory(cache2, "indexName");
IndexWriter.MaxFieldLength fieldLength = new IndexWriter.MaxFieldLength(IndexWriter.DEFAULT_MAX_FIELD_LENGTH);
IndexWriter iw = new IndexWriter(directory1, new StandardAnalyzer(), true, fieldLength);
iw.close();
+ // second cache joins after index creation: tests proper configuration
+ Cache<CacheKey, Object> cache2 = CacheTestSupport.createTestCacheManager().getCache(); // dummy cache, to force replication
+ Directory directory2 = new InfinispanDirectory(cache2, "indexName");
+ Thread.sleep(3000);
+
// create first writing thread
InfinispanDirectoryThread tr = new InfinispanDirectoryThread(latch, directory1, true);
threads.add(tr);
Added: trunk/lucene-directory/src/test/java/org/infinispan/lucene/testutils/ClusteredCacheFactory.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/testutils/ClusteredCacheFactory.java (rev 0)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/testutils/ClusteredCacheFactory.java 2009-11-12 20:36:24 UTC (rev 1159)
@@ -0,0 +1,116 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.testutils;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.lucene.CacheKey;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+
+/**
+ * CacheFactory. This is currently needed as a workaround for ISPN-261 : Cachemanager instantiated in different threads in same JVM don't interact
+ *
+ * @author Sanne Grinovero
+ * @since 4.0
+ */
+public class ClusteredCacheFactory {
+
+ private final BlockingQueue<Configuration> requests = new SynchronousQueue<Configuration>();
+ private final BlockingQueue<Cache<CacheKey, Object>> results = new SynchronousQueue<Cache<CacheKey, Object>>();
+ private final ExecutorService executor = Executors.newFixedThreadPool(1);
+ private final Configuration cfg;
+ private boolean started = false;
+ private boolean stopped = false;
+
+ /**
+ * Create a new ClusteredCacheFactory.
+ *
+ * @param cfg defines the configuration used to build the caches
+ */
+ public ClusteredCacheFactory(Configuration cfg) {
+ this.cfg = cfg;
+ }
+
+ /**
+ * Create a cache using default configuration
+ * @return
+ * @throws InterruptedException
+ */
+ public synchronized Cache<CacheKey, Object> createClusteredCache() throws InterruptedException {
+ if (!started)
+ throw new IllegalStateException("was not started");
+ if (stopped)
+ throw new IllegalStateException("was already stopped");
+ requests.put(cfg);
+ return results.take();
+ }
+
+ public Cache<CacheKey, Object> createClusteredCacheWaitingForNodesView(int expectedGroupSize) throws InterruptedException {
+ Cache<CacheKey, Object> cache = createClusteredCache();
+ TestingUtil.blockUntilViewReceived(cache, expectedGroupSize, 10000, false);
+ return cache;
+ }
+
+ public synchronized void start() {
+ if (started)
+ throw new IllegalStateException("was already started");
+ if (stopped)
+ throw new IllegalStateException("was already stopped");
+ started = true;
+ executor.execute(new Worker());
+ }
+
+ public synchronized void stop() {
+ if (stopped)
+ throw new IllegalStateException("was already stopped");
+ if (!started)
+ throw new IllegalStateException("was not started");
+ stopped = true;
+ executor.shutdownNow();
+ }
+
+ private class Worker implements Runnable {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Configuration configuration = requests.take();
+ CacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(configuration);
+ Cache<CacheKey, Object> cache = cacheManager.getCache();
+ results.put(cache);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+
+ }
+
+}
Property changes on: trunk/lucene-directory/src/test/java/org/infinispan/lucene/testutils/ClusteredCacheFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
More information about the infinispan-commits
mailing list