[teiid-commits] teiid SVN: r2990 - in trunk: build/kits/jboss-container/deploy/teiid and 8 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Fri Mar 11 12:39:22 EST 2011
Author: shawkins
Date: 2011-03-11 12:39:21 -0500 (Fri, 11 Mar 2011)
New Revision: 2990
Modified:
trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
trunk/build/kits/jboss-container/teiid-releasenotes.html
trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/message/AtomicRequestMessage.java
trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
trunk/engine/src/main/resources/org/teiid/query/i18n.properties
trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
Log:
TEIID-1474 TEIID-1505 adding a limit to the number of concurrent source requests and ensuring that maxactiveplans is read from the config.
Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-03-11 17:39:21 UTC (rev 2990)
@@ -92,6 +92,11 @@
<property name="maxThreads">64</property>
<!-- Max active plans (default 20). Increase this value on highly concurrent systems - but ensure that the underlying pools can handle the increased load without timeouts. -->
<property name="maxActivePlans">20</property>
+ <!-- Max source query concurrency per user request (default 0).
+ 0 indicates use the default calculated value based on max active plans and max threads - approximately 2*(max threads)/(max active plans).
+ 1 forces serial execution in the processing thread, just as is done for a transactional request.
+ Any number greater than 1 limits the maximum number of concurrently executing source requests accordingly. -->
+ <property name="userRequestSourceConcurrency">0</property>
<!-- Query processor time slice, in milliseconds. (default 2000) -->
<property name="timeSliceInMilli">2000</property>
<!-- Maximum allowed fetch size, set via JDBC. User requested value ignored above this value. (default 20480) -->
Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html 2011-03-11 17:39:21 UTC (rev 2990)
@@ -37,7 +37,9 @@
</UL>
<LI><B>ARRAYTABLE</B> - the ARRAYTABLE table function was added to simplify array value extraction into a tabular format.
<LI><B>Ingres</B> - Ingres database translator is now available to use as supported source under Teiid.
+ <LI><B>Optional Join Enhancements</B> - the optional join hint no longer requires the use of ANSI joins and can will not remove optional bridging tables that are used by two other tables that are required.
<LI><B>InterSystems Cache</B> - InterSystems Cache database translator is now available to use as supported source under Teiid.
+ <LI><B>userRequestSourceConcurrency</B> - was added to control the number of concurrent source queries allowed for each user request.
</UL>
<h2><a name="Compatibility">Compatibility Issues</a></h2>
@@ -103,6 +105,11 @@
See the <a href="teiid-docs/teiid_admin_guide.pdf">Admin Guide</a> for more on configuration and installation.
+<h4>from 7.3</h4>
+<ul>
+ <LI>SocketConfiguration.maxSocketThreads will interpret a setting of 0 to mean use the system default of max available processors.
+</ul>
+
<h4>from 7.2</h4>
<ul>
<LI>Temporary tables can now be restricted by data roles. Use the data-role attribute allow-create-temporary-tables to explicitly enable or disable the usage of temporary tables.
Modified: trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml
===================================================================
--- trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml 2011-03-11 17:39:21 UTC (rev 2990)
@@ -62,13 +62,21 @@
They handle NIO non-blocking IO operations as well as directly servicing any operation that can run without blocking.</para>
<para>For longer running operations, the socket threads queue with work the query engine.
The query engine has two settings that determine its thread utilization.
+
<code>maxThreads</code> sets the total number of threads available for query engine work (processing plans, transaction control operations, processing source queries, etc.).
You should consider increasing the maximum threads on systems with a large number of available processors and/or when it's common to issue non-transactional queries with that
issue a large number of concurrent source requests.
+
<code>maxActivePlans</code>, which should always be smaller than maxThreads, sets the number of the maxThreads
that should be used for user query processing. Increasing the maxActivePlans should be considered for workloads with a high number of long
running queries and/or systems with a large number of available processors. If memory issues arise from increasing the max threads and the
- max active plans, then consider decreasing the processor/connector batch sizes to limit the base number of memory rows consumed by each plan.</para>
+ max active plans, then consider decreasing the processor/connector batch sizes to limit the base number of memory rows consumed by each plan.
+
+ <code>userRequestSourceConcurrency</code>, which should always be smaller than maxThreads, sets the number of concurrently executing source queries per user request.
+ Setting this value to 1 forces serial execution of all source queries by the processing thread. The default value is computed based upon 2*maxThreads/maxActivePlans.
+ Using the respective default values, this means that each user request would be allowed 6 concurrently executing source queries. If the default calculated value is
+ not applicable to your workload, for example if you have queries that generate more concurrent long running source queries, you should adjust this value.
+ </para>
</section>
<section>
<title>Cache Tuning</title>
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java 2011-03-11 17:39:21 UTC (rev 2990)
@@ -38,6 +38,7 @@
public static final int DEFAULT_MAX_PROCESS_WORKERS = 64;
public static final int DEFAULT_MAX_SOURCE_ROWS = -1;
public static final int DEFAULT_MAX_ACTIVE_PLANS = 20;
+ public static final int DEFAULT_USER_REQUEST_SOURCE_CONCURRENCY = 0;
private int maxThreads = DEFAULT_MAX_PROCESS_WORKERS;
private int timeSliceInMilli = DEFAULT_PROCESSOR_TIMESLICE;
@@ -52,6 +53,7 @@
private int maxActivePlans = DEFAULT_MAX_ACTIVE_PLANS;
private CacheConfiguration resultsetCacheConfig;
private int maxODBCLobSizeAllowed = 5*1024*1024; // 5 MB
+ private int userRequestSourceConcurrency = DEFAULT_USER_REQUEST_SOURCE_CONCURRENCY;
@ManagementProperty(description="Max active plans (default 20). Increase this value, and max threads, on highly concurrent systems - but ensure that the underlying pools can handle the increased load without timeouts.")
public int getMaxActivePlans() {
@@ -62,6 +64,18 @@
this.maxActivePlans = maxActivePlans;
}
+ @ManagementProperty(description="Max source query concurrency per user request (default 0). " +
+ "0 indicates use the default calculated value based on max active plans and max threads - approximately 2*(max threads)/(max active plans). " +
+ "1 forces serial execution in the processing thread, just as is done for a transactional request. " +
+ "Any number greater than 1 limits the maximum number of concurrently executing source requests accordingly.")
+ public int getUserRequestSourceConcurrency() {
+ return userRequestSourceConcurrency;
+ }
+
+ public void setUserRequestSourceConcurrency(int userRequestSourceConcurrency) {
+ this.userRequestSourceConcurrency = userRequestSourceConcurrency;
+ }
+
@ManagementProperty(description="Process pool maximum thread count. (default 64)")
public int getMaxThreads() {
return maxThreads;
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-03-11 17:39:21 UTC (rev 2990)
@@ -30,11 +30,10 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkEvent;
-import javax.resource.spi.work.WorkListener;
import javax.transaction.xa.Xid;
import org.teiid.adminapi.Admin;
@@ -83,72 +82,58 @@
*/
public class DQPCore implements DQP {
- //TODO: replace with FutureTask
- public final static class FutureWork<T> implements Work, WorkListener, PrioritizedRunnable {
- private final Callable<T> toCall;
- private ResultsFuture<T> result = new ResultsFuture<T>();
- private ResultsReceiver<T> receiver = result.getResultsReceiver();
+ public interface CompletionListener<T> {
+ void onCompletion(FutureWork<T> future);
+ }
+
+ public final static class FutureWork<T> extends FutureTask<T> implements PrioritizedRunnable, Work {
private int priority;
private long creationTime = System.currentTimeMillis();
private DQPWorkContext workContext = DQPWorkContext.getWorkContext();
+ private List<CompletionListener<T>> completionListeners = new LinkedList<CompletionListener<T>>();
- public FutureWork(Callable<T> processor, int priority) {
- this.toCall = processor;
+ public FutureWork(final Callable<T> processor, int priority) {
+ super(processor);
this.priority = priority;
}
- public ResultsFuture<T> getResult() {
- return result;
+ public FutureWork(final Runnable processor, T result, int priority) {
+ super(processor, result);
+ this.priority = priority;
}
@Override
- public void run() {
- try {
- receiver.receiveResults(toCall.call());
- } catch (Throwable t) {
- receiver.exceptionOccurred(t);
- }
+ public int getPriority() {
+ return priority;
}
-
+
@Override
- public void release() {
-
+ public long getCreationTime() {
+ return creationTime;
}
@Override
- public void workAccepted(WorkEvent arg0) {
-
+ public DQPWorkContext getDqpWorkContext() {
+ return workContext;
}
@Override
- public void workCompleted(WorkEvent arg0) {
+ public void release() {
}
- @Override
- public void workRejected(WorkEvent arg0) {
- receiver.exceptionOccurred(arg0.getException());
+ void addCompletionListener(CompletionListener<T> completionListener) {
+ this.completionListeners.add(completionListener);
}
@Override
- public void workStarted(WorkEvent arg0) {
-
+ protected void done() {
+ for (CompletionListener<T> listener : this.completionListeners) {
+ listener.onCompletion(this);
+ }
+ completionListeners.clear();
}
-
- @Override
- public int getPriority() {
- return priority;
- }
- @Override
- public long getCreationTime() {
- return creationTime;
- }
-
- @Override
- public DQPWorkContext getDqpWorkContext() {
- return workContext;
- }
}
static class ClientState {
@@ -200,6 +185,7 @@
private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS;
private int currentlyActivePlans;
+ private int userRequestSourceConcurrency;
private LinkedList<RequestWorkItem> waitingPlans = new LinkedList<RequestWorkItem>();
private CacheFactory cacheFactory;
@@ -690,9 +676,19 @@
prepPlanCache = new SessionAwareCache<PreparedPlan>(this.cacheFactory, SessionAwareCache.Type.PREPAREDPLAN, new CacheConfiguration(Policy.LRU, 60*60*8, config.getPreparedPlanCacheMaxCount(), "PreparedCache")); //$NON-NLS-1$
prepPlanCache.setBufferManager(this.bufferManager);
-
this.processWorkerPool = new ThreadReuseExecutor(DQPConfiguration.PROCESS_PLAN_QUEUE_NAME, config.getMaxThreads());
+ this.maxActivePlans = config.getMaxActivePlans();
+ if (this.maxActivePlans > config.getMaxThreads()) {
+ LogManager.logWarning(LogConstants.CTX_DQP, QueryPlugin.Util.getString("DQPCore.invalid_max_active_plan", this.maxActivePlans, config.getMaxThreads())); //$NON-NLS-1$
+ this.maxActivePlans = config.getMaxThreads();
+ }
+
+ this.userRequestSourceConcurrency = config.getUserRequestSourceConcurrency();
+ if (this.userRequestSourceConcurrency < 1) {
+ this.userRequestSourceConcurrency = Math.min(config.getMaxThreads(), 2*config.getMaxThreads()/this.maxActivePlans);
+ }
+
if (cacheFactory.isReplicated()) {
matTables = new SessionAwareCache<CachedResults>(this.cacheFactory, SessionAwareCache.Type.RESULTSET, new CacheConfiguration(Policy.EXPIRATION, -1, -1, "MaterilizationTables")); //$NON-NLS-1$
matTables.setBufferManager(this.bufferManager);
@@ -786,10 +782,23 @@
return addWork(processor, 10);
}
- <T> ResultsFuture<T> addWork(Callable<T> processor, int priority) {
- FutureWork<T> work = new FutureWork<T>(processor, priority);
+ <T> ResultsFuture<T> addWork(final Callable<T> processor, int priority) {
+ final ResultsFuture<T> result = new ResultsFuture<T>();
+ final ResultsReceiver<T> receiver = result.getResultsReceiver();
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ receiver.receiveResults(processor.call());
+ } catch (Throwable t) {
+ receiver.exceptionOccurred(t);
+ }
+ }
+ };
+ FutureWork<T> work = new FutureWork<T>(r, null, priority);
this.addWork(work);
- return work.getResult();
+ return result;
}
// global txn
@@ -852,4 +861,16 @@
this.cacheFactory = factory;
}
+ public int getUserRequestSourceConcurrency() {
+ return userRequestSourceConcurrency;
+ }
+
+ void setUserRequestSourceConcurrency(int userRequestSourceConcurrency) {
+ this.userRequestSourceConcurrency = userRequestSourceConcurrency;
+ }
+
+ public int getMaxActivePlans() {
+ return maxActivePlans;
+ }
+
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2011-03-11 17:39:21 UTC (rev 2990)
@@ -30,13 +30,11 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import org.teiid.adminapi.impl.ModelMetaData;
import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.api.exception.query.QueryMetadataException;
import org.teiid.client.RequestMessage;
-import org.teiid.client.util.ResultsFuture;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleSource;
@@ -360,6 +358,7 @@
aqr.setMaxResultRows(requestMgr.getMaxSourceRows());
aqr.setExceptionOnMaxRows(requestMgr.isExceptionOnMaxSourceRows());
aqr.setPartialResults(request.supportsPartialResults());
+ aqr.setSerial(requestMgr.getUserRequestSourceConcurrency() == 1);
if (nodeID >= 0) {
aqr.setTransactionContext(workItem.getTransactionContext());
}
@@ -389,14 +388,6 @@
throw new UnsupportedOperationException();
}
- <T> ResultsFuture<T> addWork(Callable<T> callable, int priority) {
- return requestMgr.addWork(callable, priority);
- }
-
- void scheduleWork(Runnable r, int priority, long delay) {
- requestMgr.scheduleWork(r, priority, delay);
- }
-
BufferManager getBufferManager() {
return bufferService.getBufferManager();
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2011-03-11 17:39:21 UTC (rev 2990)
@@ -27,12 +27,12 @@
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.activation.DataSource;
import javax.xml.transform.Source;
import org.teiid.client.SourceWarning;
-import org.teiid.client.util.ResultsFuture;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.FileStore;
import org.teiid.common.buffer.TupleSource;
@@ -51,6 +51,7 @@
import org.teiid.core.util.Assertion;
import org.teiid.core.util.ObjectConverterUtil;
import org.teiid.dqp.internal.datamgr.ConnectorWork;
+import org.teiid.dqp.internal.process.DQPCore.FutureWork;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
import org.teiid.query.processor.xml.XMLUtil;
@@ -86,13 +87,15 @@
private int index;
private int rowsProcessed;
private AtomicResultsMessage arm;
- private boolean closed;
+ private AtomicBoolean closed = new AtomicBoolean();
+ private volatile boolean canAsynchClose;
private volatile boolean canceled;
+ private volatile boolean cancelAsynch;
private boolean executed;
private volatile boolean done;
private boolean explicitClose;
- private volatile ResultsFuture<AtomicResultsMessage> futureResult;
+ private volatile FutureWork<AtomicResultsMessage> futureResult;
private volatile boolean running;
public DataTierTupleSource(AtomicRequestMessage aqr, RequestWorkItem workItem, ConnectorWork cwi, DataTierManagerImpl dtm, int limit) {
@@ -114,23 +117,19 @@
Assertion.isNull(workItem.getConnectorRequest(aqr.getAtomicRequestID()));
workItem.addConnectorRequest(aqr.getAtomicRequestID(), this);
- if (!aqr.isTransactional()) {
+ if (!aqr.isSerial()) {
addWork();
}
}
private void addWork() {
- futureResult = dtm.addWork(new Callable<AtomicResultsMessage>() {
+ this.canAsynchClose = true;
+ futureResult = workItem.addWork(new Callable<AtomicResultsMessage>() {
@Override
public AtomicResultsMessage call() throws Exception {
return getResults();
}
}, 100);
- futureResult.addCompletionListener(new ResultsFuture.CompletionListener<AtomicResultsMessage>() {
- public void onCompletion(ResultsFuture<AtomicResultsMessage> future) {
- workItem.moreWork();
- }
- });
}
private List correctTypes(List row) throws TransformationException {
@@ -208,7 +207,7 @@
if (arm == null) {
AtomicResultsMessage results = null;
try {
- if (futureResult != null || !aqr.isTransactional()) {
+ if (futureResult != null || !aqr.isSerial()) {
results = asynchGet();
} else {
results = getResults();
@@ -216,7 +215,7 @@
} catch (TranslatorException e) {
results = exceptionOccurred(e, true);
} catch (DataNotAvailableException e) {
- dtm.scheduleWork(new Runnable() {
+ workItem.scheduleWork(new Runnable() {
@Override
public void run() {
workItem.moreWork();
@@ -250,7 +249,7 @@
if (!futureResult.isDone()) {
throw BlockedException.INSTANCE;
}
- ResultsFuture<AtomicResultsMessage> currentResults = futureResult;
+ FutureWork<AtomicResultsMessage> currentResults = futureResult;
futureResult = null;
AtomicResultsMessage results = null;
try {
@@ -284,6 +283,9 @@
TranslatorException {
AtomicResultsMessage results = null;
try {
+ if (cancelAsynch) {
+ return null;
+ }
running = true;
if (!executed) {
results = cwi.execute();
@@ -292,13 +294,20 @@
results = cwi.more();
}
} finally {
+ if (!cancelAsynch) {
+ workItem.moreWork();
+ }
+ canAsynchClose = false;
+ if (closed.get()) {
+ cwi.close();
+ }
running = false;
}
return results;
}
public boolean isQueued() {
- ResultsFuture<AtomicResultsMessage> future = futureResult;
+ FutureWork<AtomicResultsMessage> future = futureResult;
return !running && future != null && !future.isDone();
}
@@ -311,32 +320,20 @@
}
public void fullyCloseSource() {
- if (!closed) {
- if (cwi != null) {
- workItem.closeAtomicRequest(this.aqr.getAtomicRequestID());
- if (!aqr.isTransactional()) {
- if (futureResult != null && !futureResult.isDone()) {
- futureResult.addCompletionListener(new ResultsFuture.CompletionListener<AtomicResultsMessage>() {
- @Override
- public void onCompletion(
- ResultsFuture<AtomicResultsMessage> future) {
- cwi.close(); // there is a small chance that this will be done in the processing thread
- }
- });
- } else {
- dtm.addWork(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- cwi.close();
- return null;
- }
- }, 0);
- }
- } else {
- this.cwi.close();
- }
- }
- closed = true;
+ cancelAsynch = true;
+ if (closed.compareAndSet(false, true)) {
+ workItem.closeAtomicRequest(this.aqr.getAtomicRequestID());
+ if (aqr.isSerial()) {
+ this.cwi.close();
+ } else if (!canAsynchClose) {
+ workItem.addHighPriorityWork(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ cwi.close();
+ return null;
+ }
+ });
+ }
}
}
@@ -346,15 +343,14 @@
public void cancelRequest() {
this.canceled = true;
- if (this.cwi != null) {
- this.cwi.cancel();
- }
+ this.cwi.cancel();
}
/**
* @see TupleSource#closeSource()
*/
public void closeSource() {
+ cancelAsynch = true;
if (!explicitClose) {
fullyCloseSource();
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-03-11 17:39:21 UTC (rev 2990)
@@ -28,6 +28,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.teiid.client.RequestMessage;
@@ -46,6 +47,7 @@
import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.dqp.internal.process.DQPCore.FutureWork;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
import org.teiid.dqp.message.AtomicRequestID;
@@ -71,12 +73,45 @@
public class RequestWorkItem extends AbstractWorkItem implements PrioritizedRunnable {
+ private final class WorkWrapper<T> implements
+ DQPCore.CompletionListener<T> {
+
+ boolean submitted;
+ FutureWork<T> work;
+
+ public WorkWrapper(FutureWork<T> work) {
+ this.work = work;
+ }
+
+ @Override
+ public void onCompletion(FutureWork<T> future) {
+ WorkWrapper<?> nextWork = null;
+ synchronized (queue) {
+ if (!submitted) {
+ return;
+ }
+ nextWork = queue.pollFirst();
+ if (nextWork == null) {
+ totalThreads--;
+ } else {
+ nextWork.submitted = true;
+ }
+ }
+ if (nextWork != null) {
+ dqpCore.addWork(nextWork.work);
+ }
+ }
+ }
+
private enum ProcessingState {NEW, PROCESSING, CLOSE}
private ProcessingState state = ProcessingState.NEW;
private enum TransactionState {NONE, ACTIVE, DONE}
private TransactionState transactionState = TransactionState.NONE;
+ private int totalThreads;
+ private LinkedList<WorkWrapper<?>> queue = new LinkedList<WorkWrapper<?>>();
+
/*
* Obtained at construction time
*/
@@ -764,6 +799,33 @@
@Override
public long getCreationTime() {
return processingTimestamp;
+ }
+
+ <T> FutureWork<T> addHighPriorityWork(Callable<T> callable) {
+ FutureWork<T> work = new FutureWork<T>(callable, PrioritizedRunnable.NO_WAIT_PRIORITY);
+ dqpCore.addWork(work);
+ return work;
}
+
+ <T> FutureWork<T> addWork(Callable<T> callable, int priority) {
+ FutureWork<T> work = new FutureWork<T>(callable, priority);
+ WorkWrapper<T> wl = new WorkWrapper<T>(work);
+ work.addCompletionListener(wl);
+ synchronized (queue) {
+ if (totalThreads < dqpCore.getUserRequestSourceConcurrency()) {
+ dqpCore.addWork(work);
+ totalThreads++;
+ wl.submitted = true;
+ } else {
+ queue.add(wl);
+ LogManager.logDetail(LogConstants.CTX_DQP, this.requestID, " reached max source concurrency of ", dqpCore.getUserRequestSourceConcurrency()); //$NON-NLS-1$
+ }
+ }
+ return work;
+ }
+
+ void scheduleWork(Runnable r, int priority, long delay) {
+ dqpCore.scheduleWork(r, priority, delay);
+ }
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java 2011-03-11 17:39:21 UTC (rev 2990)
@@ -74,6 +74,8 @@
public interface PrioritizedRunnable extends Runnable {
+ final static int NO_WAIT_PRIORITY = 0;
+
int getPriority();
long getCreationTime();
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java 2011-03-11 17:39:21 UTC (rev 2990)
@@ -248,7 +248,7 @@
}
}, 0);
workManager.doWork(work, WorkManager.INDEFINITE, tc, null);
- tc.setTransaction(work.getResult().get());
+ tc.setTransaction(work.get());
}
} catch (NotSupportedException e) {
throw new XATransactionException(e, XAException.XAER_INVAL);
Modified: trunk/engine/src/main/java/org/teiid/dqp/message/AtomicRequestMessage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/message/AtomicRequestMessage.java 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/message/AtomicRequestMessage.java 2011-03-11 17:39:21 UTC (rev 2990)
@@ -81,6 +81,8 @@
private boolean exceptionOnMaxRows;
private int maxRows;
+ private boolean serial;
+
private DQPWorkContext workContext;
public AtomicRequestMessage() {
@@ -137,6 +139,14 @@
public void setTransactionContext(TransactionContext context) {
txnContext = context;
}
+
+ public boolean isSerial() {
+ return serial || isTransactional();
+ }
+
+ public void setSerial(boolean serial) {
+ this.serial = serial;
+ }
public boolean isTransactional(){
return this.txnContext != null && this.txnContext.getTransactionType() != Scope.NONE;
Modified: trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java 2011-03-11 17:39:21 UTC (rev 2990)
@@ -569,17 +569,7 @@
}
} else { // value is null
-
- switch(criteria.getPredicateQuantifier()) {
- case SubqueryCompareCriteria.ALL:
- // null counts as unknown; one unknown means the whole thing is unknown
- return null;
- case SubqueryCompareCriteria.SOME:
- result = null;
- break;
- default:
- throw new ExpressionEvaluationException("ERR.015.006.0057", QueryPlugin.Util.getString("ERR.015.006.0057", criteria.getPredicateQuantifier())); //$NON-NLS-1$ //$NON-NLS-2$
- }
+ result = null;
}
Modified: trunk/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2011-03-11 17:39:21 UTC (rev 2990)
@@ -839,6 +839,7 @@
DQPCore.The_request_has_been_closed.=The request {0} has been closed.
DQPCore.The_atomic_request_has_been_cancelled=The atomic request {0} has been canceled.
DQPCore.failed_to_cancel=Failed to Cancel request, as request already finished processing
+DQPCore.invalid_max_active_plan=The maxActivePlan {0} setting should never be greater than the max processing threads {1}.
ProcessWorker.failed_rollback=Failed to properly rollback autowrap transaction properly
ProcessWorker.error=Unexpected exception for request {0}
Modified: trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java 2011-03-11 17:39:21 UTC (rev 2990)
@@ -215,10 +215,10 @@
synchronized (pool) {
pool.notifyAll();
}
- work1.getResult().get();
- work2.getResult().get();
- work3.getResult().get();
- work4.getResult().get();
+ work1.get();
+ work2.get();
+ work3.get();
+ work4.get();
assertEquals(Integer.valueOf(3), order.remove());
assertEquals(Integer.valueOf(2), order.remove());
assertEquals(Integer.valueOf(4), order.remove());
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-03-11 17:39:21 UTC (rev 2990)
@@ -49,7 +49,7 @@
import org.teiid.query.optimizer.capabilities.SourceCapabilities.Capability;
import org.teiid.query.unittest.FakeMetadataFactory;
-
+ at SuppressWarnings("nls")
public class TestDQPCore {
private DQPCore core;
@@ -70,7 +70,10 @@
core.setCacheFactory(new DefaultCacheFactory());
core.setTransactionService(new FakeTransactionService());
- core.start(new DQPConfiguration());
+ DQPConfiguration config = new DQPConfiguration();
+ config.setMaxActivePlans(1);
+ config.setUserRequestSourceConcurrency(2);
+ core.start(config);
}
@After public void tearDown() throws Exception {
@@ -86,6 +89,11 @@
msg.setExecutionId(100);
return msg;
}
+
+ @Test public void testConfigurationSets() {
+ assertEquals(1, core.getMaxActivePlans());
+ assertEquals(2, core.getUserRequestSourceConcurrency());
+ }
@Test public void testRequest1() throws Exception {
helpExecute("SELECT IntKey FROM BQT1.SmallA", "a"); //$NON-NLS-1$ //$NON-NLS-2$
@@ -272,6 +280,35 @@
assertEquals(100, item.resultsBuffer.getRowCount());
}
+ @Test public void testSourceConcurrency() throws Exception {
+ //setup default of 2
+ agds.setSleep(100);
+ StringBuffer sql = new StringBuffer();
+ int branches = 20;
+ for (int i = 0; i < branches; i++) {
+ if (i > 0) {
+ sql.append(" union all ");
+ }
+ sql.append("select intkey || " + i + " from bqt1.smalla");
+ }
+ sql.append(" limit 2");
+ helpExecute(sql.toString(), "a");
+ //there's isn't a hard guarantee that only two requests will get started
+ assertTrue(agds.getExecuteCount().get() <= 6);
+
+ //20 concurrent
+ core.setUserRequestSourceConcurrency(20);
+ agds.getExecuteCount().set(0);
+ helpExecute(sql.toString(), "a");
+ assertEquals(20, agds.getExecuteCount().get());
+
+ //serial
+ core.setUserRequestSourceConcurrency(1);
+ agds.getExecuteCount().set(0);
+ helpExecute(sql.toString(), "a");
+ assertEquals(1, agds.getExecuteCount().get());
+ }
+
public void helpTestVisibilityFails(String sql) throws Exception {
RequestMessage reqMsg = exampleRequestMessage(sql);
reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);
@@ -295,7 +332,7 @@
Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
assertNotNull(core.getClientState(String.valueOf(sessionid), false));
- ResultsMessage results = message.get(5000, TimeUnit.MILLISECONDS);
+ ResultsMessage results = message.get(500000, TimeUnit.MILLISECONDS);
core.terminateSession(String.valueOf(sessionid));
assertNull(core.getClientState(String.valueOf(sessionid), false));
if (results.getException() != null) {
Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2011-03-11 17:39:21 UTC (rev 2990)
@@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.types.DataTypeManager;
@@ -53,12 +54,20 @@
private SourceCapabilities caps;
public boolean throwExceptionOnExecute;
public int dataNotAvailable = -1;
+ public int sleep;
+ private final AtomicInteger executeCount = new AtomicInteger();
+ private final AtomicInteger closeCount = new AtomicInteger();
+
public AutoGenDataService() {
super("FakeConnector","FakeConnector"); //$NON-NLS-1$ //$NON-NLS-2$
caps = TestOptimizer.getTypicalCapabilities();
}
+ public void setSleep(int sleep) {
+ this.sleep = sleep;
+ }
+
public void setCaps(SourceCapabilities caps) {
this.caps = caps;
}
@@ -88,6 +97,14 @@
@Override
public AtomicResultsMessage execute() throws TranslatorException {
+ executeCount.incrementAndGet();
+ if (sleep > 0) {
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
if (throwExceptionOnExecute) {
throw new TranslatorException("Connector Exception"); //$NON-NLS-1$
}
@@ -101,7 +118,7 @@
@Override
public void close() {
-
+ closeCount.incrementAndGet();
}
@Override
@@ -112,6 +129,14 @@
};
}
+ public AtomicInteger getExecuteCount() {
+ return executeCount;
+ }
+
+ public AtomicInteger getCloseCount() {
+ return closeCount;
+ }
+
private List[] createResults(List symbols) {
List[] rows = new List[this.rows];
More information about the teiid-commits
mailing list