Author: shawkins
Date: 2010-07-12 16:59:02 -0400 (Mon, 12 Jul 2010)
New Revision: 2333
Added:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java
Removed:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/StatsCapturingWorkManager.java
trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java
Modified:
trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
trunk/build/kits/jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml
trunk/common-core/src/main/resources/org/teiid/core/i18n.properties
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/CapabilitiesConverter.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManagerRepository.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
trunk/engine/src/main/resources/org/teiid/query/i18n.properties
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestUnionAllNode.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
Log:
TEIID-1151 re-adding support for multi-threaded source queries. also fixing partial
results and issameconnector check
Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2010-07-08
15:38:23 UTC (rev 2332)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2010-07-12
20:59:02 UTC (rev 2333)
@@ -75,8 +75,10 @@
<property name="securityHelper"><inject
bean="SecurityHelper"/></property>
<property name="VDBRepository"><inject
bean="VDBRepository"/></property>
- <!-- Process pool maximum thread count. (default 32) Increase this value if
the system's available processors is larger than 8. -->
- <property name="maxThreads">32</property>
+ <!-- Process pool maximum thread count. (default 64) -->
+ <property name="maxThreads">64</property>
+ <!-- Max active plans (default 20). Increase this value on highly concurrent
systems - but ensure that the underlying pools can handle the increased load without
timeouts. -->
+ <property name="maxActivePlans">20</property>
<!-- Query processor time slice, in milliseconds. (default 2000) -->
<property name="timeSliceInMilli">2000</property>
<!-- Maximum allowed fetch size, set via JDBC. User requested value ignored
above this value. (default 20480) -->
Modified:
trunk/build/kits/jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml
===================================================================
---
trunk/build/kits/jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/build/kits/jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml 2010-07-12
20:59:02 UTC (rev 2333)
@@ -57,7 +57,6 @@
</bean>
<bean name="ConnectionFactoryDeployer"
class="org.teiid.jboss.deployers.ConnectionFactoryDeployer">
- <property name="connectorManagerRepository"><inject
bean="ConnectorManagerRepository"/></property>
<property name="VDBStatusChecker"><inject
bean="VDBStatusChecker"/></property>
</bean>
Modified: trunk/common-core/src/main/resources/org/teiid/core/i18n.properties
===================================================================
--- trunk/common-core/src/main/resources/org/teiid/core/i18n.properties 2010-07-08
15:38:23 UTC (rev 2332)
+++ trunk/common-core/src/main/resources/org/teiid/core/i18n.properties 2010-07-12
20:59:02 UTC (rev 2333)
@@ -328,9 +328,6 @@
Streamable.isNUll=Streamable object argument can not be null
Streamable.InvalidReference=Streamable contents are not available, use the Streaming
interface to get the contents.
-WorkerPool.New_thread=Created worker thread "{0}".
-WorkerPool.uncaughtException=Uncaught exception processing work
-
FloatToBooleanTransform.Failed_transform=Failed to transform Float to Boolean. Expected
0 or 1 for {0}
NullToAnyTransform.Invalid_value=Invalid value for type {0}: {1} of type {2}
ObjectToAnyTransform.Invalid_value=Invalid conversion from type {0} with value
''{2}'' to type {1}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2010-07-08
15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -23,12 +23,11 @@
package org.teiid.common.buffer;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.teiid.core.TeiidComponentException;
@@ -295,7 +294,7 @@
lob.setReferenceStreamId(id);
}
if (this.lobReferences == null) {
- this.lobReferences = Collections.synchronizedMap(new HashMap<String,
Streamable<?>>());
+ this.lobReferences = new ConcurrentHashMap<String,
Streamable<?>>();
}
this.lobReferences.put(id, lob);
if (lob.getReference() == null) {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/CapabilitiesConverter.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/CapabilitiesConverter.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/CapabilitiesConverter.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -44,7 +44,7 @@
return convertCapabilities(srcCaps, null);
}
- public static BasicSourceCapabilities convertCapabilities(ExecutionFactory srcCaps,
String connectorID) {
+ public static BasicSourceCapabilities convertCapabilities(ExecutionFactory srcCaps,
Object connectorID) {
BasicSourceCapabilities tgtCaps = new BasicSourceCapabilities();
tgtCaps.setCapabilitySupport(Capability.QUERY_SELECT_EXPRESSION,
srcCaps.supportsSelectExpression());
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -26,21 +26,18 @@
*/
package org.teiid.dqp.internal.datamgr.impl;
-import java.util.LinkedList;
+import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import org.teiid.common.buffer.BlockedException;
+import org.teiid.core.TeiidComponentException;
import org.teiid.core.util.Assertion;
import org.teiid.dqp.DQPPlugin;
import org.teiid.dqp.internal.cache.DQPContextCache;
-import org.teiid.dqp.internal.datamgr.impl.ConnectorWorkItem.PermitMode;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.dqp.message.AtomicRequestID;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.service.BufferService;
@@ -69,13 +66,8 @@
private static final String JAVA_CONTEXT = "java:"; //$NON-NLS-1$
- public static final int DEFAULT_MAX_THREADS = 20;
-
- private static AtomicInteger ID_SEQUENCE = new AtomicInteger();
-
private String translatorName;
private String connectionName;
- private String connectorId = String.valueOf(ID_SEQUENCE.getAndIncrement());
//services acquired in start
private BufferService bufferService;
@@ -85,10 +77,6 @@
private SourceCapabilities cachedCapabilities;
- private int currentConnections;
- private int maxConnections = DEFAULT_MAX_THREADS;
- private LinkedList<ConnectorWorkItem> queuedRequests = new
LinkedList<ConnectorWorkItem>();
-
private volatile boolean stopped;
private ExecutionFactory<Object, Object> executionFactory;
@@ -115,24 +103,8 @@
return sb.toString();
}
- public synchronized void acquireConnectionLock(ConnectorWorkItem item) throws
BlockedException {
- switch (item.getPermitMode()) {
- case NOT_ACQUIRED:
- if (currentConnections < maxConnections) {
- currentConnections++;
- item.setPermitMode(PermitMode.ACQUIRED);
- return;
- }
- queuedRequests.add(item);
- item.setPermitMode(PermitMode.BLOCKED);
- case BLOCKED:
- throw BlockedException.INSTANCE;
- }
- }
-
public MetadataStore getMetadata(String modelName, Map<String, Datatype>
datatypes, Properties importProperties) throws TranslatorException {
MetadataFactory factory = new MetadataFactory(modelName, datatypes, importProperties);
- ExecutionFactory<Object, Object> executionFactory = getExecutionFactory();
Object connectionFactory = getConnectionFactory();
Object connection = executionFactory.getConnection(connectionFactory);
try {
@@ -143,26 +115,26 @@
return factory.getMetadataStore();
}
- public SourceCapabilities getCapabilities() throws TranslatorException {
+ public SourceCapabilities getCapabilities() throws TeiidComponentException {
if (cachedCapabilities != null) {
return cachedCapabilities;
}
checkStatus();
ExecutionFactory<Object, Object> translator = getExecutionFactory();
- BasicSourceCapabilities resultCaps =
CapabilitiesConverter.convertCapabilities(translator, this.connectorId);
+ BasicSourceCapabilities resultCaps =
CapabilitiesConverter.convertCapabilities(translator, Arrays.asList(translatorName,
connectionName));
resultCaps.setScope(Scope.SCOPE_GLOBAL);
cachedCapabilities = resultCaps;
return resultCaps;
}
- public ConnectorWork executeRequest(AtomicRequestMessage message, AbstractWorkItem
awi) throws TranslatorException {
+ public ConnectorWork registerRequest(AtomicRequestMessage message) throws
TeiidComponentException {
// Set the connector ID to be used; if not already set.
checkStatus();
AtomicRequestID atomicRequestId = message.getAtomicRequestID();
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {atomicRequestId,
"Create State"}); //$NON-NLS-1$
- ConnectorWorkItem item = new ConnectorWorkItem(message, awi, this);
+ ConnectorWorkItem item = new ConnectorWorkItem(message, this);
Assertion.isNull(requestStates.put(atomicRequestId, item), "State already
existed"); //$NON-NLS-1$
return item;
}
@@ -178,17 +150,6 @@
void removeState(AtomicRequestID id) {
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {id, "Remove
State"}); //$NON-NLS-1$
ConnectorWorkItem cwi = requestStates.remove(id);
- if (cwi != null && cwi.getPermitMode() == PermitMode.ACQUIRED) {
- synchronized (this) {
- ConnectorWorkItem next = queuedRequests.pollFirst();
- if (next == null) {
- currentConnections--;
- return;
- }
- next.setPermitMode(PermitMode.ACQUIRED);
- next.getParent().moreWork();
- }
- }
}
int size() {
@@ -257,13 +218,6 @@
* @return the <code>ExecutionFactory</code>.
*/
protected ExecutionFactory<Object, Object> getExecutionFactory() {
- if (this.executionFactory == null) {
- try {
- InitialContext ic = new InitialContext();
- return (ExecutionFactory<Object, Object>)ic.lookup(this.translatorName);
- } catch (NamingException e) {
- }
- }
return this.executionFactory;
}
@@ -306,16 +260,12 @@
return null;
}
- private void checkStatus() throws TranslatorException {
+ private void checkStatus() throws TeiidComponentException {
if (stopped) {
- throw new
TranslatorException(DQPPlugin.Util.getString("ConnectorManager.not_in_valid_state",
this.translatorName)); //$NON-NLS-1$
+ throw new
TeiidComponentException(DQPPlugin.Util.getString("ConnectorManager.not_in_valid_state",
this.translatorName)); //$NON-NLS-1$
}
}
- public void setMaxConnections(int value) {
- this.maxConnections = value;
- }
-
public String getTranslatorName() {
return this.translatorName;
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManagerRepository.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManagerRepository.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManagerRepository.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -43,7 +43,7 @@
}
public List<ConnectorManager> getConnectorManagers() {
- return new ArrayList(this.repo.values());
+ return new ArrayList<ConnectorManager>(this.repo.values());
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -34,12 +34,10 @@
void cancel();
- AtomicResultsMessage more() throws TranslatorException;
+ AtomicResultsMessage more() throws TranslatorException, BlockedException;
void close();
AtomicResultsMessage execute() throws TranslatorException, BlockedException;
- boolean isQueued();
-
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -37,7 +37,6 @@
import org.teiid.dqp.DQPPlugin;
import org.teiid.dqp.internal.datamgr.language.LanguageBridgeFactory;
import org.teiid.dqp.internal.datamgr.metadata.RuntimeMetadataImpl;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.dqp.message.AtomicRequestID;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
@@ -64,18 +63,12 @@
public class ConnectorWorkItem implements ConnectorWork {
- enum PermitMode {
- BLOCKED, ACQUIRED, NOT_ACQUIRED
- }
-
/* Permanent state members */
private AtomicRequestID id;
private ConnectorManager manager;
private AtomicRequestMessage requestMsg;
private ExecutionFactory connector;
private QueryMetadataInterface queryMetadata;
- private PermitMode permitMode = PermitMode.NOT_ACQUIRED;
- private AbstractWorkItem awi;
/* Created on new request */
private Object connection;
@@ -95,7 +88,7 @@
private AtomicBoolean isCancelled = new AtomicBoolean();
- ConnectorWorkItem(AtomicRequestMessage message, AbstractWorkItem awi,
ConnectorManager manager) {
+ ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager) {
this.id = message.getAtomicRequestID();
this.requestMsg = message;
this.manager = manager;
@@ -118,30 +111,12 @@
this.queryMetadata = vdb.getAttachment(QueryMetadataInterface.class);
this.queryMetadata = new TempMetadataAdapter(this.queryMetadata, new
TempMetadataStore());
this.securityContext.setTransactional(requestMsg.isTransactional());
- this.awi = awi;
}
public AtomicRequestID getId() {
return id;
}
- public PermitMode getPermitMode() {
- return permitMode;
- }
-
- public void setPermitMode(PermitMode permitMode) {
- this.permitMode = permitMode;
- }
-
- public AbstractWorkItem getParent() {
- return awi;
- }
-
- @Override
- public boolean isQueued() {
- return this.permitMode == PermitMode.BLOCKED;
- }
-
public void cancel() {
try {
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Processing CANCEL request"}); //$NON-NLS-1$
@@ -189,8 +164,6 @@
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Closed connection"}); //$NON-NLS-1$
}
}
-
-
private TranslatorException handleError(Throwable t) {
if (t instanceof DataNotAvailableException) {
@@ -224,10 +197,6 @@
throw new TranslatorException("Request canceled"); //$NON-NLS-1$
}
- if (!this.securityContext.isTransactional()) {
- this.manager.acquireConnectionLock(this);
- }
-
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[]
{this.requestMsg.getAtomicRequestID(), "Processing NEW request:",
this.requestMsg.getCommand()}); //$NON-NLS-1$
try {
this.connectionFactory = this.manager.getConnectionFactory();
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -37,8 +37,9 @@
static final int DEFAULT_MAX_RESULTSET_CACHE_ENTRIES = 1024;
static final int DEFAULT_QUERY_THRESHOLD = 600;
static final String PROCESS_PLAN_QUEUE_NAME = "QueryProcessorQueue";
//$NON-NLS-1$
- public static final int DEFAULT_MAX_PROCESS_WORKERS = 32;
+ public static final int DEFAULT_MAX_PROCESS_WORKERS = 64;
public static final int DEFAULT_MAX_SOURCE_ROWS = -1;
+ public static final int DEFAULT_MAX_ACTIVE_PLANS = 20;
private int maxThreads = DEFAULT_MAX_PROCESS_WORKERS;
private int timeSliceInMilli = DEFAULT_PROCESSOR_TIMESLICE;
@@ -55,8 +56,18 @@
private int queryThresholdInSecs = DEFAULT_QUERY_THRESHOLD;
private boolean exceptionOnMaxSourceRows = true;
private int maxSourceRows = -1;
+ private int maxActivePlans = DEFAULT_MAX_ACTIVE_PLANS;
+
+ @ManagementProperty(description="Max active plans (default 20). Increase this
value, and max threads, on highly concurrent systems - but ensure that the underlying
pools can handle the increased load without timeouts.")
+ public int getMaxActivePlans() {
+ return maxActivePlans;
+ }
- @ManagementProperty(description="Process pool maximum thread count. (default 32)
Increase this value if the system's available processors is larger than 8")
+ public void setMaxActivePlans(int maxActivePlans) {
+ this.maxActivePlans = maxActivePlans;
+ }
+
+ @ManagementProperty(description="Process pool maximum thread count. (default
64)")
public int getMaxThreads() {
return maxThreads;
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-07-08
15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -35,9 +35,7 @@
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkEvent;
-import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
-import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.Xid;
import org.teiid.adminapi.Admin;
@@ -58,12 +56,11 @@
import org.teiid.common.buffer.BufferManager;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.types.Streamable;
-import org.teiid.core.util.Assertion;
import org.teiid.dqp.DQPPlugin;
import org.teiid.dqp.internal.cache.DQPContextCache;
import org.teiid.dqp.internal.datamgr.impl.ConnectorManagerRepository;
+import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.RequestID;
import org.teiid.dqp.service.BufferService;
@@ -84,15 +81,17 @@
*/
public class DQPCore implements DQP {
- public final static class FutureWork<T> implements Work, WorkListener {
+ //TODO: replace with FutureTask
+ public final static class FutureWork<T> implements Work, WorkListener,
PrioritizedRunnable {
private final Callable<T> toCall;
- private DQPWorkContext workContext;
private ResultsFuture<T> result = new ResultsFuture<T>();
private ResultsReceiver<T> receiver = result.getResultsReceiver();
+ private int priority;
+ private long creationTime = System.currentTimeMillis();
- public FutureWork(Callable<T> processor) {
- this.workContext = DQPWorkContext.getWorkContext();
+ public FutureWork(Callable<T> processor, int priority) {
this.toCall = processor;
+ this.priority = priority;
}
public ResultsFuture<T> getResult() {
@@ -102,7 +101,7 @@
@Override
public void run() {
try {
- receiver.receiveResults(workContext.runInContext(toCall));
+ receiver.receiveResults(toCall.call());
} catch (Throwable t) {
receiver.exceptionOccurred(t);
}
@@ -132,6 +131,16 @@
public void workStarted(WorkEvent arg0) {
}
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return creationTime;
+ }
}
static class ClientState {
@@ -164,8 +173,7 @@
}
- private WorkManager workManager;
- private StatsCapturingWorkManager processWorkerPool;
+ private ThreadReuseExecutor processWorkerPool;
// System properties for Code Table
private int maxCodeTableRecords = DQPConfiguration.DEFAULT_MAX_CODE_TABLE_RECORDS;
@@ -197,6 +205,11 @@
private Map<String, ClientState> clientState = Collections.synchronizedMap(new
HashMap<String, ClientState>());
private DQPContextCache contextCache;
private boolean useEntitlements = false;
+
+ private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS;
+ private int currentlyActivePlans;
+ private LinkedList<RequestWorkItem> waitingPlans = new
LinkedList<RequestWorkItem>();
+
/**
* perform a full shutdown and wait for 10 seconds for all threads to finish
*/
@@ -330,8 +343,13 @@
RequestWorkItem workItem = new RequestWorkItem(this, requestMsg, request,
resultsFuture.getResultsReceiver(), requestID, workContext);
logMMCommand(workItem, Event.NEW, null);
addRequest(requestID, workItem, state);
-
- this.addWork(workItem);
+ synchronized (waitingPlans) {
+ if (currentlyActivePlans < maxActivePlans) {
+ startActivePlan(workItem);
+ } else {
+ waitingPlans.add(workItem);
+ }
+ }
return resultsFuture;
}
@@ -351,8 +369,28 @@
this.requests.put(requestID, workItem);
state.addRequest(requestID);
}
+
+ private void startActivePlan(RequestWorkItem workItem) {
+ workItem.active = true;
+ this.addWork(workItem);
+ this.currentlyActivePlans++;
+ }
+
+ void finishProcessing(final RequestWorkItem workItem) {
+ synchronized (waitingPlans) {
+ if (!workItem.active) {
+ return;
+ }
+ workItem.active = false;
+ currentlyActivePlans--;
+ if (!waitingPlans.isEmpty()) {
+ startActivePlan(waitingPlans.remove());
+ }
+ }
+ }
void removeRequest(final RequestWorkItem workItem) {
+ finishProcessing(workItem);
this.requests.remove(workItem.requestID);
ClientState state = getClientState(workItem.getDqpWorkContext().getSessionId(),
false);
if (state != null) {
@@ -361,38 +399,20 @@
contextCache.removeRequestScopedCache(workItem.requestID.toString());
}
- void addWork(Work work) {
- try {
- this.processWorkerPool.scheduleWork(work);
- } catch (WorkException e) {
- //TODO: cancel? close?
- throw new TeiidRuntimeException(e);
- }
+ void addWork(Runnable work) {
+ this.processWorkerPool.execute(work);
}
- void scheduleWork(final RequestWorkItem work, long delay) {
- try {
- this.processWorkerPool.scheduleWork(new Work() {
-
- @Override
- public void run() {
- work.moreWork();
- }
-
- @Override
- public void release() {
-
- }
- }, null, delay);
- } catch (WorkException e) {
- throw new TeiidRuntimeException(e);
- }
+ void scheduleWork(final Runnable r, int priority, long delay) {
+ this.processWorkerPool.schedule(new FutureWork<Void>(new Callable<Void>()
{
+ @Override
+ public Void call() throws Exception {
+ r.run();
+ return null;
+ }
+ }, priority), delay, TimeUnit.MILLISECONDS);
}
- public void setWorkManager(WorkManager mgr) {
- this.workManager = mgr;
- }
-
public ResultsFuture<?> closeLobChunkStream(int lobRequestId,
long requestId, String streamId)
throws TeiidProcessingException {
@@ -446,13 +466,8 @@
return this.requests.get(processorID);
}
- /**
- * Returns a list of QueueStats objects that represent the queues in
- * this service.
- * If there are no queues, an empty Collection is returned.
- */
public WorkerPoolStatisticsMetadata getWorkManagerStatistics() {
- return processWorkerPool.getStats();
+ return this.processWorkerPool.getStats();
}
public void terminateSession(String sessionId) {
@@ -627,9 +642,6 @@
}
public void start(DQPConfiguration config) {
-
- Assertion.isNotNull(this.workManager);
-
this.processorTimeslice = config.getTimeSliceInMilli();
this.maxFetchSize = config.getMaxRowsFetchSize();
this.processorDebugAllowed = config.isProcessDebugAllowed();
@@ -658,7 +670,7 @@
this.bufferManager = bufferService.getBufferManager();
this.contextCache = bufferService.getContextCache();
- this.processWorkerPool = new
StatsCapturingWorkManager(DQPConfiguration.PROCESS_PLAN_QUEUE_NAME,
config.getMaxThreads(), this.workManager);
+ this.processWorkerPool = new
ThreadReuseExecutor(DQPConfiguration.PROCESS_PLAN_QUEUE_NAME, config.getMaxThreads());
dataTierMgr = new DataTierManagerImpl(this,
this.connectorManagerRepository,
@@ -706,7 +718,7 @@
return null;
}
};
- return addWork(processor);
+ return addWork(processor, 0);
}
// local txn
@@ -719,7 +731,7 @@
return null;
}
};
- return addWork(processor);
+ return addWork(processor, 0);
}
// global txn
@@ -732,7 +744,7 @@
return null;
}
};
- return addWork(processor);
+ return addWork(processor, 0);
}
// global txn
public ResultsFuture<?> end(XidImpl xid, int flags) throws XATransactionException
{
@@ -756,16 +768,12 @@
return getTransactionService().prepare(workContext.getSessionId(),xid,
workContext.getSession().isEmbedded());
}
};
- return addWork(processor);
+ return addWork(processor, 10);
}
- private <T> ResultsFuture<T> addWork(Callable<T> processor) {
- FutureWork<T> work = new FutureWork<T>(processor);
- try {
- this.workManager.scheduleWork(work);
- } catch (WorkException e) {
- throw new TeiidRuntimeException(e);
- }
+ <T> ResultsFuture<T> addWork(Callable<T> processor, int priority) {
+ FutureWork<T> work = new FutureWork<T>(processor, priority);
+ this.addWork(work);
return work.getResult();
}
@@ -786,7 +794,7 @@
return null;
}
};
- return addWork(processor);
+ return addWork(processor, 0);
}
// global txn
public ResultsFuture<?> start(final XidImpl xid, final int flags, final int
timeout)
@@ -799,7 +807,7 @@
return null;
}
};
- return addWork(processor);
+ return addWork(processor, 100);
}
public MetadataResult getMetadata(long requestID)
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -32,6 +32,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
import javax.security.auth.Subject;
@@ -166,14 +168,24 @@
return session.getVdb();
}
- public <V> V runInContext(Callable<V> callable) throws Exception {
+ public <V> V runInContext(Callable<V> callable) throws Throwable {
+ FutureTask<V> task = new FutureTask<V>(callable);
+ runInContext(task);
+ try {
+ return task.get();
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
+ public void runInContext(final Runnable runnable) {
DQPWorkContext.setWorkContext(this);
boolean associated = false;
if (securityHelper != null && this.getSubject() != null) {
associated = securityHelper.assosiateSecurityContext(this.getSecurityDomain(),
this.getSecurityContext());
}
try {
- return callable.call();
+ runnable.run();
} finally {
if (associated) {
securityHelper.clearSecurityContext(this.getSecurityDomain());
@@ -181,19 +193,6 @@
DQPWorkContext.releaseWorkContext();
}
}
-
- public void runInContext(final Runnable runnable) {
- try {
- runInContext(new Callable<Void>() {
- @Override
- public Void call() {
- runnable.run();
- return null;
- }
- });
- } catch (Exception e) {
- }
- }
public HashMap<String, DataPolicy> getAllowedDataPolicies() {
if (this.policies == null) {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -30,10 +30,12 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import org.teiid.adminapi.impl.ModelMetaData;
import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.client.RequestMessage;
+import org.teiid.client.util.ResultsFuture;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleSource;
@@ -77,9 +79,11 @@
import org.teiid.query.sql.symbol.Constant;
import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.util.CommandContext;
-import org.teiid.translator.TranslatorException;
-
+/**
+ * Full {@link ProcessorDataManager} implementation that
+ * controls access to {@link ConnectorManager}s and handles system queries.
+ */
public class DataTierManagerImpl implements ProcessorDataManager {
private enum SystemTables {
@@ -130,7 +134,8 @@
}
AtomicRequestMessage aqr = createRequest(processorId, command, modelName,
connectorBindingId, nodeID);
- return new DataTierTupleSource(aqr.getCommand().getProjectedSymbols(), aqr, this,
aqr.getConnectorName(), workItem);
+ ConnectorWork work = getCM(aqr.getConnectorName()).registerRequest(aqr);
+ return new DataTierTupleSource(aqr, workItem, work, this);
}
/**
@@ -348,10 +353,6 @@
return aqr;
}
- ConnectorWork executeRequest(AtomicRequestMessage aqr, AbstractWorkItem awi, String
connectorName) throws TranslatorException {
- return getCM(connectorName).executeRequest(aqr, awi);
- }
-
/**
* Notify each waiting request that the code table data is now available.
* @param requests
@@ -421,4 +422,12 @@
this.codeTableCache.clearAll();
}
+ <T> ResultsFuture<T> addWork(Callable<T> callable, int priority) {
+ return requestMgr.addWork(callable, priority);
+ }
+
+ void scheduleWork(Runnable r, int priority, long delay) {
+ requestMgr.scheduleWork(r, priority, delay);
+ }
+
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -24,105 +24,178 @@
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import org.teiid.client.SourceWarning;
+import org.teiid.client.util.ResultsFuture;
+import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.Assertion;
import org.teiid.dqp.internal.datamgr.impl.ConnectorWork;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
+import org.teiid.translator.DataNotAvailableException;
import org.teiid.translator.TranslatorException;
/**
* This tuple source impl can only be used once; once it is closed, it
* cannot be reopened and reused.
+ *
+ * TODO: the handling of DataNotAvailable is awkward.
+ * In the multi-threaded case we'd like to not even
+ * notify the parent plan and just schedule the next poll.
*/
public class DataTierTupleSource implements TupleSource {
// Construction state
- private final List schema;
private final AtomicRequestMessage aqr;
- private final DataTierManagerImpl dataMgr;
- private final String connectorName;
private final RequestWorkItem workItem;
+ private final ConnectorWork cwi;
+ private final DataTierManagerImpl dtm;
// Data state
- private ConnectorWork cwi;
private int index;
private int rowsProcessed;
private volatile AtomicResultsMessage arm;
private boolean closed;
private volatile boolean canceled;
+ private boolean executed;
+ private volatile ResultsFuture<AtomicResultsMessage> futureResult;
private volatile boolean running;
- /**
- * Constructor for DataTierTupleSource.
- */
- public DataTierTupleSource(List schema, AtomicRequestMessage aqr, DataTierManagerImpl
dataMgr, String connectorName, RequestWorkItem workItem) {
- this.schema = schema;
+ public DataTierTupleSource(AtomicRequestMessage aqr, RequestWorkItem workItem,
ConnectorWork cwi, DataTierManagerImpl dtm) {
this.aqr = aqr;
- this.dataMgr = dataMgr;
- this.connectorName = connectorName;
this.workItem = workItem;
+ this.cwi = cwi;
+ this.dtm = dtm;
+ Assertion.isNull(workItem.getConnectorRequest(aqr.getAtomicRequestID()));
+ workItem.addConnectorRequest(aqr.getAtomicRequestID(), this);
+ if (!aqr.isTransactional()) {
+ addWork();
+ }
}
- /**
- * @see TupleSource#getSchema()
- */
+ private void addWork() {
+ futureResult = dtm.addWork(new Callable<AtomicResultsMessage>() {
+ @Override
+ public AtomicResultsMessage call() throws Exception {
+ return getResults();
+ }
+ }, 100);
+ futureResult.addCompletionListener(new
ResultsFuture.CompletionListener<AtomicResultsMessage>() {
+ public void onCompletion(ResultsFuture<AtomicResultsMessage> future) {
+ workItem.moreWork();
+ }
+ });
+ }
+
public List getSchema() {
- return this.schema;
+ return this.aqr.getCommand().getProjectedSymbols();
}
- public List nextTuple() throws TeiidComponentException, TeiidProcessingException {
- if (this.arm == null) {
- open();
- }
+ public List<?> nextTuple() throws TeiidComponentException,
TeiidProcessingException {
while (true) {
+ if (arm == null) {
+ AtomicResultsMessage results = null;
+ try {
+ if (futureResult != null || !aqr.isTransactional()) {
+ results = asynchGet();
+ } else {
+ results = getResults();
+ }
+ } catch (TranslatorException e) {
+ exceptionOccurred(e, true);
+ } catch (DataNotAvailableException e) {
+ dtm.scheduleWork(new Runnable() {
+ @Override
+ public void run() {
+ workItem.moreWork();
+ }
+ }, 10, e.getRetryDelay());
+ throw BlockedException.INSTANCE;
+ }
+ receiveResults(results);
+ }
if (index < arm.getResults().length) {
return this.arm.getResults()[index++];
}
+ arm = null;
if (isDone()) {
return null;
}
- try {
- running = true;
- receiveResults(this.cwi.more());
- } catch (TranslatorException e) {
- exceptionOccurred(e, true);
- } finally {
- running = false;
- }
}
}
+
+ private AtomicResultsMessage asynchGet()
+ throws BlockedException, TeiidProcessingException,
+ TeiidComponentException, TranslatorException {
+ if (futureResult == null) {
+ addWork();
+ }
+ if (!futureResult.isDone()) {
+ throw BlockedException.INSTANCE;
+ }
+ ResultsFuture<AtomicResultsMessage> currentResults = futureResult;
+ futureResult = null;
+ AtomicResultsMessage results = null;
+ try {
+ results = currentResults.get();
+ addWork();
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof TeiidProcessingException) {
+ throw (TeiidProcessingException)e.getCause();
+ }
+ if (e.getCause() instanceof TeiidComponentException) {
+ throw (TeiidComponentException)e.getCause();
+ }
+ if (e.getCause() instanceof TranslatorException) {
+ throw (TranslatorException)e.getCause();
+ }
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException)e.getCause();
+ }
+ //shouldn't happen
+ throw new RuntimeException(e);
+ }
+ return results;
+ }
+
+ private AtomicResultsMessage getResults()
+ throws BlockedException, TeiidComponentException,
+ TranslatorException {
+ AtomicResultsMessage results = null;
+ try {
+ running = true;
+ if (!executed) {
+ results = cwi.execute();
+ executed = true;
+ } else {
+ results = cwi.more();
+ }
+ } finally {
+ running = false;
+ }
+ return results;
+ }
public boolean isQueued() {
- return this.cwi != null && this.cwi.isQueued();
+ ResultsFuture<AtomicResultsMessage> future = futureResult;
+ return !running && future != null && !future.isDone();
}
public boolean isDone() {
- return this.arm != null && this.arm.getFinalRow() >= 0;
+ AtomicResultsMessage results = this.arm;
+ return results != null && results.getFinalRow() >= 0;
}
- void open() throws TeiidComponentException, TeiidProcessingException {
- try {
- if (this.cwi == null) {
- this.cwi = this.dataMgr.executeRequest(aqr, this.workItem,
this.connectorName);
- Assertion.isNull(workItem.getConnectorRequest(aqr.getAtomicRequestID()));
- workItem.addConnectorRequest(aqr.getAtomicRequestID(), this);
- }
- running = true;
- receiveResults(this.cwi.execute());
- } catch (TranslatorException e) {
- exceptionOccurred(e, true);
- } finally {
- running = false;
- }
- }
-
public boolean isRunning() {
return running;
}
@@ -131,7 +204,27 @@
if (!closed) {
if (cwi != null) {
workItem.closeAtomicRequest(this.aqr.getAtomicRequestID());
- this.cwi.close();
+ if (!aqr.isTransactional()) {
+ if (futureResult != null && !futureResult.isDone()) {
+ futureResult.addCompletionListener(new
ResultsFuture.CompletionListener<AtomicResultsMessage>() {
+ @Override
+ public void onCompletion(
+ ResultsFuture<AtomicResultsMessage> future) {
+ cwi.close(); // there is a small chance that this will be done in the
processing thread
+ }
+ });
+ } else {
+ dtm.addWork(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ cwi.close();
+ return null;
+ }
+ }, 0);
+ }
+ } else {
+ this.cwi.close();
+ }
}
closed = true;
}
@@ -165,7 +258,7 @@
AtomicResultsMessage emptyResults = new AtomicResultsMessage(new List[0], null);
emptyResults.setWarnings(Arrays.asList((Exception)exception));
emptyResults.setFinalRow(this.rowsProcessed);
- receiveResults(arm);
+ receiveResults(emptyResults);
} else {
if (exception.getCause() instanceof TeiidComponentException) {
throw (TeiidComponentException)exception.getCause();
@@ -194,14 +287,11 @@
}
public String getConnectorName() {
- return this.connectorName;
+ return this.aqr.getConnectorName();
}
public boolean isTransactional() {
- if (this.arm == null) {
- return false;
- }
- return this.arm.isTransactional();
+ return this.aqr.isTransactional();
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -47,6 +47,7 @@
import org.teiid.core.types.DataTypeManager;
import org.teiid.dqp.DQPPlugin;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
+import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
import org.teiid.dqp.message.AtomicRequestID;
import org.teiid.dqp.message.RequestID;
import org.teiid.dqp.service.TransactionContext;
@@ -64,10 +65,8 @@
import org.teiid.query.sql.lang.SPParameter;
import org.teiid.query.sql.lang.StoredProcedure;
import org.teiid.query.sql.symbol.SingleElementSymbol;
-import org.teiid.translator.DataNotAvailableException;
-
-public class RequestWorkItem extends AbstractWorkItem {
+public class RequestWorkItem extends AbstractWorkItem implements PrioritizedRunnable {
private enum ProcessingState {NEW, PROCESSING, CLOSE}
private ProcessingState state = ProcessingState.NEW;
@@ -86,6 +85,7 @@
private CacheID cid;
private final TransactionService transactionService;
private final DQPWorkContext dqpWorkContext;
+ boolean active;
/*
* obtained during new
@@ -105,16 +105,17 @@
private Map<AtomicRequestID, DataTierTupleSource> connectorInfo = new
ConcurrentHashMap<AtomicRequestID, DataTierTupleSource>(4);
// This exception contains details of all the atomic requests that failed when query
is run in partial results mode.
private List<TeiidException> warnings = new
LinkedList<TeiidException>();
- private boolean doneProducingBatches;
+ private volatile boolean doneProducingBatches;
private volatile boolean isClosed;
private volatile boolean isCanceled;
private volatile boolean closeRequested;
+
//results request
private ResultsReceiver<ResultsMessage> resultsReceiver;
private int begin;
private int end;
private TupleBatch savedBatch;
- private Map<Integer, LobWorkItem> lobStreams = new
ConcurrentHashMap<Integer, LobWorkItem>(4);
+ private Map<Integer, LobWorkItem> lobStreams = new
ConcurrentHashMap<Integer, LobWorkItem>(4);
/**The time when command begins processing on the server.*/
private long processingTimestamp = System.currentTimeMillis();
@@ -155,7 +156,11 @@
@Override
protected void resumeProcessing() {
- dqpCore.addWork(this);
+ if (doneProducingBatches && !closeRequested && !isCanceled) {
+ this.run(); // just run in the IO thread
+ } else {
+ dqpCore.addWork(this);
+ }
}
@Override
@@ -184,9 +189,6 @@
} catch (QueryProcessor.ExpiredTimeSliceException e) {
LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread",
requestID, "- time slice expired"); //$NON-NLS-1$ //$NON-NLS-2$
this.moreWork();
- } catch (DataNotAvailableException e) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread",
requestID, "- data not available"); //$NON-NLS-1$ //$NON-NLS-2$
- this.dqpCore.scheduleWork(this, e.getRetryDelay());
} catch (Throwable e) {
LogManager.logDetail(LogConstants.CTX_DQP, e, "Request Thread",
requestID, "- error occurred"); //$NON-NLS-1$ //$NON-NLS-2$
@@ -227,7 +229,6 @@
private void resume() throws XATransactionException {
if (this.transactionState == TransactionState.ACTIVE &&
this.transactionContext.getTransaction() != null) {
- //there's no need to do this for xa transactions, as that is done by the
workmanager
this.transactionService.resume(this.transactionContext);
}
}
@@ -339,7 +340,7 @@
this.resultsBuffer = cr.getResults();
this.analysisRecord = cr.getAnalysisRecord();
this.originalCommand = cr.getCommand();
- this.doneProducingBatches = true;
+ this.doneProducingBatches();
return;
}
}
@@ -357,7 +358,9 @@
super.flushBatchDirect(batch, add);
added = true;
}
- doneProducingBatches = batch.getTerminationFlag();
+ if (batch.getTerminationFlag()) {
+ doneProducingBatches();
+ }
if (doneProducingBatches && cid != null) {
boolean sessionScope = processor.getContext().isSessionFunctionEvaluated();
CachedResults cr = new CachedResults();
@@ -381,7 +384,7 @@
this.transactionState = TransactionState.ACTIVE;
}
if (requestMsg.isNoExec()) {
- doneProducingBatches = true;
+ doneProducingBatches();
resultsBuffer.close();
this.cid = null;
}
@@ -608,7 +611,9 @@
}
}
this.closeRequested = true;
- this.requestCancel(); //pending work should be canceled for fastest clean up
+ if (!this.doneProducingBatches) {
+ this.requestCancel(); //pending work should be canceled for fastest clean up
+ }
this.moreWork();
}
@@ -692,5 +697,20 @@
LogManager.logWarning(LogConstants.CTX_DQP, e, "Failed to cancel " +
requestID); //$NON-NLS-1$
}
}
+
+ private void doneProducingBatches() {
+ this.doneProducingBatches = true;
+ dqpCore.finishProcessing(this);
+ }
+ @Override
+ public int getPriority() {
+ return (closeRequested || isCanceled) ? 0 : 1000;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return processingTimestamp;
+ }
+
}
\ No newline at end of file
Deleted:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/StatsCapturingWorkManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/StatsCapturingWorkManager.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/StatsCapturingWorkManager.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -1,294 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.dqp.internal.process;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.resource.spi.work.ExecutionContext;
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkEvent;
-import javax.resource.spi.work.WorkException;
-import javax.resource.spi.work.WorkListener;
-import javax.resource.spi.work.WorkManager;
-import javax.resource.spi.work.WorkRejectedException;
-
-import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
-import org.teiid.core.util.NamedThreadFactory;
-import org.teiid.logging.LogConstants;
-import org.teiid.logging.LogManager;
-import org.teiid.logging.MessageLevel;
-import org.teiid.query.QueryPlugin;
-
-
-/**
- * StatsCapturingWorkManager acts as a wrapper to the passed in {@link WorkManager} to
- * capture statistics and implement an unbounded queue of work.
- */
-public class StatsCapturingWorkManager {
-
- private static class WorkContext {
- ExecutionContext context;
- long startTimeout;
- long submitted = System.currentTimeMillis();
-
- public WorkContext(ExecutionContext context, long startTimeout) {
- this.context = context;
- this.startTimeout = startTimeout;
- }
-
- long getStartTimeout() {
- if (startTimeout == 0) {
- return 0;
- }
- return Math.max(1, startTimeout + submitted - System.currentTimeMillis());
- }
-
- }
-
- private final class WorkWrapper implements Work {
- private final Work work;
- private final WorkContext workContext;
- private final DQPWorkContext dqpWorkContext;
-
- private WorkWrapper(Work work, WorkContext workContext, DQPWorkContext dqpWorkContext)
{
- this.work = work;
- this.workContext = workContext;
- this.dqpWorkContext = dqpWorkContext;
- }
-
- @Override
- public void run() {
- Thread t = Thread.currentThread();
- synchronized (poolLock) {
- threads.add(t);
- }
- String name = t.getName();
- t.setName(name + "_" + poolName + threadCounter.getAndIncrement());
//$NON-NLS-1$
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_RUNTIME, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, "Beginning work with virtual
worker", t.getName()); //$NON-NLS-1$
- }
- boolean success = false;
- try {
- dqpWorkContext.runInContext(work);
- success = true;
- } finally {
- synchronized (poolLock) {
- WorkWrapper next = null;
- if (success) {
- completedCount++;
- next = queue.poll();
- }
- threads.remove(t);
- if (next == null) {
- activeCount--;
- if (activeCount == 0 && terminated) {
- poolLock.notifyAll();
- }
- } else {
- try {
- if (next.workContext == null) {
- delegate.scheduleWork(next);
- } else {
- delegate.scheduleWork(next, next.workContext.getStartTimeout(),
next.workContext.context, next.work instanceof
WorkListener?(WorkListener)next.work:null);
- }
- } catch (WorkException e) {
- handleException(next.work, e);
- }
- }
- }
- t.setName(name);
- }
- }
-
- @Override
- public void release() {
- this.work.release();
- }
- }
-
- private static void handleException(Work work, WorkException e) {
- if (work instanceof WorkListener) {
- ((WorkListener)work).workRejected(new WorkEvent(work, WorkEvent.WORK_REJECTED, work,
new WorkRejectedException(e)));
- } else if (LogManager.isMessageToBeRecorded(LogConstants.CTX_RUNTIME,
MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, e, "Exception adding work to the
WorkManager"); //$NON-NLS-1$
- }
- }
-
- private static ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, new
NamedThreadFactory("Scheduler")); //$NON-NLS-1$
-
- private volatile int activeCount;
- private volatile int highestActiveCount;
- private volatile int highestQueueSize;
- private volatile boolean terminated;
- private volatile int submittedCount;
- private volatile int completedCount;
-
- private Object poolLock = new Object();
- private AtomicInteger threadCounter = new AtomicInteger();
-
- private String poolName;
- private int maximumPoolSize;
- private WorkManager delegate;
-
- private Queue<WorkWrapper> queue = new LinkedList<WorkWrapper>();
- private Set<Thread> threads =
Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<Thread,
Boolean>()));
- private Map<Integer, ScheduledFuture<?>> futures = new HashMap<Integer,
ScheduledFuture<?>>();
- private int idCounter;
-
- public StatsCapturingWorkManager(String name, int maximumPoolSize, WorkManager delegate)
{
- this.maximumPoolSize = maximumPoolSize;
- this.poolName = name;
- this.delegate = delegate;
- }
-
- public void scheduleWork(final Work arg0) throws WorkException {
- scheduleWork(arg0, (WorkContext)null, DQPWorkContext.getWorkContext());
- }
-
- private void scheduleWork(final Work work, WorkContext workContext, DQPWorkContext
dqpWorkContext)
- throws WorkRejectedException, WorkException {
- boolean atMaxThreads = false;
- boolean newMaxQueueSize = false;
- synchronized (poolLock) {
- checkForTermination();
- submittedCount++;
- atMaxThreads = activeCount == maximumPoolSize;
- if (atMaxThreads) {
- queue.add(new WorkWrapper(work, workContext, dqpWorkContext));
- int queueSize = queue.size();
- if (queueSize > highestQueueSize) {
- newMaxQueueSize = true;
- highestQueueSize = queueSize;
- }
- } else {
- activeCount++;
- highestActiveCount = Math.max(activeCount, highestActiveCount);
- }
- }
- if (atMaxThreads) {
- if (newMaxQueueSize && maximumPoolSize > 1) {
- LogManager.logWarning(LogConstants.CTX_RUNTIME,
QueryPlugin.Util.getString("WorkerPool.Max_thread", maximumPoolSize, poolName,
highestQueueSize)); //$NON-NLS-1$
- }
- return;
- }
- if (workContext == null) {
- delegate.scheduleWork(new WorkWrapper(work, null, dqpWorkContext));
- } else {
- delegate.scheduleWork(new WorkWrapper(work, null, dqpWorkContext),
workContext.getStartTimeout(), workContext.context, work instanceof
WorkListener?(WorkListener)work:null);
- }
- }
-
- public void scheduleWork(final Work arg0, final ExecutionContext arg2, long delay)
throws WorkException {
- if (delay < 1) {
- scheduleWork(arg0, new WorkContext(arg2, WorkManager.INDEFINITE),
DQPWorkContext.getWorkContext());
- } else {
- synchronized (futures) {
- final int id = idCounter++;
- final DQPWorkContext dqpWorkContext = DQPWorkContext.getWorkContext();
- ScheduledFuture<?> sf = stpe.schedule(new Runnable() {
-
- @Override
- public void run() {
- try {
- futures.remove(id);
- scheduleWork(arg0, new WorkContext(arg2, WorkManager.INDEFINITE),
dqpWorkContext);
- } catch (WorkException e) {
- handleException(arg0, e);
- }
- }
- }, delay, TimeUnit.MILLISECONDS);
- this.futures.put(id, sf);
- }
- }
- }
-
- private void checkForTermination() throws WorkRejectedException {
- if (terminated) {
- throw new WorkRejectedException("Queue has been terminated"); //$NON-NLS-1$
- }
- }
-
- public WorkerPoolStatisticsMetadata getStats() {
- WorkerPoolStatisticsMetadata stats = new WorkerPoolStatisticsMetadata();
- stats.setName(poolName);
- stats.setQueued(queue.size());
- stats.setHighestQueued(highestQueueSize);
- stats.setActiveThreads(this.activeCount);
- stats.setMaxThreads(this.maximumPoolSize);
- stats.setTotalSubmitted(this.submittedCount);
- stats.setHighestActiveThreads(this.highestActiveCount);
- stats.setTotalCompleted(this.completedCount);
- return stats;
- }
-
- public void shutdown() {
- this.terminated = true;
- }
-
- public void shutdownNow() {
- this.shutdown();
- synchronized (poolLock) {
- for (Thread t : threads) {
- t.interrupt();
- }
- queue.clear();
- }
- synchronized (futures) {
- for (ScheduledFuture<?> future : futures.values()) {
- future.cancel(true);
- }
- futures.clear();
- }
- }
-
- public boolean isTerminated() {
- return terminated;
- }
-
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException {
- long timeoutMillis = unit.toMillis(timeout);
- long finalMillis = System.currentTimeMillis() + timeoutMillis;
- synchronized (poolLock) {
- while (this.activeCount > 0 || !terminated) {
- if (timeoutMillis < 1) {
- return false;
- }
- poolLock.wait(timeoutMillis);
- timeoutMillis = finalMillis - System.currentTimeMillis();
- }
- }
- return true;
- }
-
-}
Added: trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -0,0 +1,412 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.process;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
+import org.teiid.core.util.NamedThreadFactory;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.logging.MessageLevel;
+import org.teiid.query.QueryPlugin;
+
+/**
+ * An Executor that:
+ * <ol>
+ * <li>minimizes thread creation</li>
+ * <li>allows for proper timeout of idle threads</li>
+ * <li>allows for queuing</li>
+ * </ol>
+ * <br/>
+ * A non-fifo (lifo) {@link SynchronousQueue} based {@link ThreadPoolExecutor} satisfies
1 and 2, but not 3.
+ * A bounded or unbound queue based {@link ThreadPoolExecutor} allows for 3, but will
tend to create
+ * up to the maximum number of threads and makes no guarantee on thread scheduling.
+ * <br/>
+ * So the approach here is to use a virtual thread pool off of a {@link
SynchronousQueue}
+ * backed {@link ThreadPoolExecutor}.
+ * <br/>
+ * There is also only a single master scheduling thread with actual executions deferred.
+ *
+ * TODO: there is a race condition between retiring threads and adding work, which may
create extra threads.
+ * That is a flaw with attempting to reuse, rather than create threads.
+ * TODO: bounded queuing - we never bothered bounding in the past with our worker pools,
but reasonable
+ * defaults would be a good idea.
+ */
+public class ThreadReuseExecutor implements Executor {
+
+ public interface PrioritizedRunnable extends Runnable {
+
+ int getPriority();
+
+ long getCreationTime();
+
+ }
+
+ static class RunnableWrapper implements PrioritizedRunnable {
+ Runnable r;
+ DQPWorkContext workContext = DQPWorkContext.getWorkContext();
+ long creationTime;
+ int priority;
+
+ public RunnableWrapper(Runnable r) {
+ if (r instanceof PrioritizedRunnable) {
+ PrioritizedRunnable pr = (PrioritizedRunnable)r;
+ creationTime = pr.getCreationTime();
+ priority = pr.getPriority();
+ } else {
+ creationTime = System.currentTimeMillis();
+ priority = Integer.MAX_VALUE;
+ }
+ this.r = r;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+
+ @Override
+ public void run() {
+ workContext.runInContext(r);
+ }
+
+ }
+
+ private final ThreadPoolExecutor tpe;
+
+ private ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, new
NamedThreadFactory("Scheduler")); //$NON-NLS-1$
+
+ class ScheduledFutureTask extends FutureTask<Void> implements
ScheduledFuture<Void>, PrioritizedRunnable {
+ private ScheduledFuture<?> scheduledFuture;
+ private boolean periodic;
+ private volatile boolean running;
+ private PrioritizedRunnable runnable;
+
+ public ScheduledFutureTask(PrioritizedRunnable runnable, boolean periodic) {
+ super(runnable, null);
+ this.periodic = periodic;
+ this.runnable = runnable;
+ }
+
+ public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
+ scheduledTasks.add(this);
+ this.scheduledFuture = scheduledFuture;
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return this.scheduledFuture.getDelay(unit);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ return this.scheduledFuture.compareTo(o);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ this.scheduledFuture.cancel(false);
+ scheduledTasks.remove(this);
+ return super.cancel(mayInterruptIfRunning);
+ }
+
+ public Runnable getParent() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ if (running || terminated) {
+ return;
+ }
+ running = periodic;
+ executeDirect(ScheduledFutureTask.this);
+ }
+ };
+ }
+
+ @Override
+ public void run() {
+ if (periodic) {
+ if (!this.runAndReset()) {
+ this.scheduledFuture.cancel(false);
+ scheduledTasks.remove(this);
+ }
+ running = false;
+ } else {
+ scheduledTasks.remove(this);
+ super.run();
+ }
+ }
+
+ @Override
+ public long getCreationTime() {
+ return runnable.getCreationTime();
+ }
+
+ @Override
+ public int getPriority() {
+ return runnable.getPriority();
+ }
+ }
+
+ private volatile int activeCount;
+ private volatile int highestActiveCount;
+ private volatile int highestQueueSize;
+ private volatile boolean terminated;
+ private volatile int submittedCount;
+ private volatile int completedCount;
+ private Object poolLock = new Object();
+ private AtomicInteger threadCounter = new AtomicInteger();
+ private Set<Thread> threads =
Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<Thread,
Boolean>()));
+ private Set<ScheduledFutureTask> scheduledTasks =
Collections.synchronizedSet(Collections.newSetFromMap(new
IdentityHashMap<ScheduledFutureTask, Boolean>()));
+
+ private String poolName;
+ private int maximumPoolSize;
+ private Queue<PrioritizedRunnable> queue = new
PriorityQueue<PrioritizedRunnable>(11, new Comparator<PrioritizedRunnable>()
{
+ @Override
+ public int compare(PrioritizedRunnable pr1, PrioritizedRunnable pr2) {
+ int result = pr1.getPriority() - pr2.getPriority();
+ if (result == 0) {
+ return Long.signum(pr1.getCreationTime() - pr2.getCreationTime());
+ }
+ return result;
+ }
+ });
+
+ public ThreadReuseExecutor(String name, int maximumPoolSize) {
+ this.maximumPoolSize = maximumPoolSize;
+ this.poolName = name;
+
+ tpe = new ThreadPoolExecutor(0,
+ maximumPoolSize, 2, TimeUnit.MINUTES,
+ new SynchronousQueue<Runnable>(), new NamedThreadFactory("Worker")) {
//$NON-NLS-1$
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ if (t != null) {
+ LogManager.logError(LogConstants.CTX_RUNTIME, t,
QueryPlugin.Util.getString("WorkerPool.uncaughtException")); //$NON-NLS-1$
+ }
+ }
+
+ };
+ }
+
+ public void execute(final Runnable command) {
+ executeDirect(new RunnableWrapper(command));
+ }
+
+ private void executeDirect(final PrioritizedRunnable command) {
+ boolean atMaxThreads = false;
+ boolean newMaxQueueSize = false;
+ synchronized (poolLock) {
+ checkForTermination();
+ submittedCount++;
+ atMaxThreads = activeCount == maximumPoolSize;
+ if (atMaxThreads) {
+ queue.add(command);
+ int queueSize = queue.size();
+ if (queueSize > highestQueueSize) {
+ newMaxQueueSize = true;
+ highestQueueSize = queueSize;
+ }
+ } else {
+ activeCount++;
+ highestActiveCount = Math.max(activeCount, highestActiveCount);
+ }
+ }
+ if (atMaxThreads) {
+ if (newMaxQueueSize && maximumPoolSize > 1) {
+ LogManager.logWarning(LogConstants.CTX_RUNTIME,
QueryPlugin.Util.getString("WorkerPool.Max_thread", maximumPoolSize, poolName,
highestQueueSize)); //$NON-NLS-1$
+ }
+ return;
+ }
+ tpe.execute(new Runnable() {
+ @Override
+ public void run() {
+ Thread t = Thread.currentThread();
+ threads.add(t);
+ String name = t.getName();
+ t.setName(name + "_" + poolName + threadCounter.getAndIncrement());
//$NON-NLS-1$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_RUNTIME, MessageLevel.TRACE))
{
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, "Beginning work with virtual
worker", t.getName()); //$NON-NLS-1$
+ }
+ Runnable r = command;
+ while (r != null) {
+ boolean success = false;
+ try {
+ r.run();
+ success = true;
+ } finally {
+ synchronized (poolLock) {
+ if (success) {
+ completedCount++;
+ r = queue.poll();
+ }
+ if (!success || r == null) {
+ threads.remove(t);
+ activeCount--;
+ if (activeCount == 0 && terminated) {
+ poolLock.notifyAll();
+ }
+ }
+ }
+ t.setName(name);
+ }
+ }
+ };
+ });
+ }
+
+ private void checkForTermination() {
+ if (terminated) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ public int getActiveCount() {
+ return activeCount;
+ }
+
+ public int getSubmittedCount() {
+ return submittedCount;
+ }
+
+ public int getCompletedCount() {
+ return completedCount;
+ }
+
+ public int getPoolSize() {
+ return activeCount;
+ }
+
+ public boolean isTerminated() {
+ return terminated;
+ }
+
+ public void shutdown() {
+ this.terminated = true;
+ synchronized (scheduledTasks) {
+ for (ScheduledFuture<?> future : new
ArrayList<ScheduledFuture<?>>(scheduledTasks)) {
+ future.cancel(false);
+ }
+ scheduledTasks.clear();
+ }
+ }
+
+ public int getLargestPoolSize() {
+ return this.highestActiveCount;
+ }
+
+ public WorkerPoolStatisticsMetadata getStats() {
+ WorkerPoolStatisticsMetadata stats = new WorkerPoolStatisticsMetadata();
+ stats.setName(poolName);
+ stats.setQueued(queue.size());
+ stats.setHighestQueued(highestQueueSize);
+ stats.setActiveThreads(getActiveCount());
+ stats.setMaxThreads(this.maximumPoolSize);
+ stats.setTotalSubmitted(getSubmittedCount());
+ stats.setHighestActiveThreads(getLargestPoolSize());
+ stats.setTotalCompleted(getCompletedCount());
+ return stats;
+ }
+
+ public boolean hasWork() {
+ synchronized (poolLock) {
+ return this.getSubmittedCount() - this.getCompletedCount() > 0 &&
!this.isTerminated();
+ }
+ }
+
+ public List<Runnable> shutdownNow() {
+ this.shutdown();
+ synchronized (poolLock) {
+ synchronized (threads) {
+ for (Thread t : threads) {
+ t.interrupt();
+ }
+ }
+ List<Runnable> result = new ArrayList<Runnable>(queue);
+ queue.clear();
+ return result;
+ }
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long timeoutMillis = unit.toMillis(timeout);
+ long finalMillis = System.currentTimeMillis() + timeoutMillis;
+ synchronized (poolLock) {
+ while (this.activeCount > 0 || !terminated) {
+ if (timeoutMillis < 1) {
+ return false;
+ }
+ poolLock.wait(timeoutMillis);
+ timeoutMillis = finalMillis - System.currentTimeMillis();
+ }
+ }
+ return true;
+ }
+
+ public ScheduledFuture<?> schedule(final Runnable command, long delay,
+ TimeUnit unit) {
+ checkForTermination();
+ ScheduledFutureTask sft = new ScheduledFutureTask(new RunnableWrapper(command),
false);
+ synchronized (scheduledTasks) {
+ ScheduledFuture<?> future = stpe.schedule(sft.getParent(), delay, unit);
+ sft.setScheduledFuture(future);
+ return sft;
+ }
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command,
+ long initialDelay, long period, TimeUnit unit) {
+ checkForTermination();
+ ScheduledFutureTask sft = new ScheduledFutureTask(new RunnableWrapper(command), true);
+ synchronized (scheduledTasks) {
+ ScheduledFuture<?> future = stpe.scheduleAtFixedRate(sft.getParent(),
initialDelay, period, unit);
+ sft.setScheduledFuture(future);
+ return sft;
+ }
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -246,7 +246,7 @@
public Transaction call() throws Exception {
return transactionManager.getTransaction();
}
- });
+ }, 0);
workManager.doWork(work, WorkManager.INDEFINITE, tc, null);
tc.setTransaction(work.getResult().get());
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2010-07-08
15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -31,8 +31,8 @@
import org.teiid.common.buffer.BufferManager.BufferReserveMode;
import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
-import org.teiid.core.TeiidProcessingException;
import org.teiid.core.TeiidException;
+import org.teiid.core.TeiidProcessingException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.Assertion;
import org.teiid.logging.LogConstants;
@@ -41,9 +41,10 @@
import org.teiid.query.execution.QueryExecPlugin;
import org.teiid.query.processor.BatchCollector.BatchProducer;
import org.teiid.query.util.CommandContext;
-import org.teiid.translator.DataNotAvailableException;
-
+/**
+ * Driver for plan processing.
+ */
public class QueryProcessor implements BatchProducer {
public static class ExpiredTimeSliceException extends TeiidRuntimeException {
@@ -110,11 +111,6 @@
throw e;
}
continue;
- } catch (DataNotAvailableException e) {
- if (!nonBlocking) {
- throw e;
- }
- wait = e.getRetryDelay();
} catch (BlockedException e) {
if (!nonBlocking) {
throw e;
@@ -147,6 +143,9 @@
long currentTime = System.currentTimeMillis();
Assertion.assertTrue(!processorClosed);
+
+ //TODO: see if there is pending work before preempting
+
while(currentTime < context.getTimeSliceEnd()) {
if (requestCanceled) {
throw new
TeiidProcessingException(QueryExecPlugin.Util.getString("QueryProcessor.request_cancelled",
getProcessID())); //$NON-NLS-1$
Modified: trunk/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2010-07-08 15:38:23
UTC (rev 2332)
+++ trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2010-07-12 20:59:02
UTC (rev 2333)
@@ -941,4 +941,7 @@
NewCalculateCostUtil.badCost=Unexpected format encountered for max or min value
WorkerPool.Max_thread=Reached maximum thread count "{0}" for worker pool
"{1}" with a queue size of "{2}".
+WorkerPool.New_thread=Created worker thread "{0}".
+WorkerPool.uncaughtException=Uncaught exception processing work
+
XMLSystemFunctions.invalid_namespaces=Invalid namespaces supplied for XPath expression -
''{0}''
\ No newline at end of file
Deleted:
trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -1,179 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.queue;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkManager;
-import javax.resource.spi.work.WorkRejectedException;
-
-import org.junit.Ignore;
-import org.junit.Test;
-import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
-import org.teiid.dqp.internal.process.StatsCapturingWorkManager;
-
-/**
- */
-public class TestStatsCapturingWorkManager {
-
- private WorkManager manager = new FakeWorkManager();
-
- @Test public void testQueuing() throws Exception {
- final long SINGLE_WAIT = 50;
- final int WORK_ITEMS = 10;
- final int MAX_THREADS = 5;
-
- final StatsCapturingWorkManager pool = new
StatsCapturingWorkManager("test", MAX_THREADS, manager); //$NON-NLS-1$
-
- for(int i=0; i<WORK_ITEMS; i++) {
- pool.scheduleWork(new FakeWorkItem(SINGLE_WAIT));
- }
-
- pool.shutdown();
- pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
- assertTrue(pool.isTerminated());
- WorkerPoolStatisticsMetadata stats = pool.getStats();
- assertEquals(10, stats.getTotalCompleted());
- assertEquals("Expected threads to be maxed out", MAX_THREADS,
stats.getHighestActiveThreads()); //$NON-NLS-1$
- }
-
- @Ignore
- @Test public void testThreadReuse() throws Exception {
- final long SINGLE_WAIT = 50;
- final long NUM_THREADS = 5;
-
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test",
5, manager); //$NON-NLS-1$
-
- for(int i=0; i<NUM_THREADS; i++) {
- pool.scheduleWork(new FakeWorkItem(SINGLE_WAIT));
-
- try {
- Thread.sleep(SINGLE_WAIT*2);
- } catch(InterruptedException e) {
- }
- }
-
- pool.shutdown();
-
- WorkerPoolStatisticsMetadata stats = pool.getStats();
- assertEquals("Expected 1 thread for serial execution", 1,
stats.getHighestActiveThreads()); //$NON-NLS-1$
-
- pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
- }
-
- @Test(expected=WorkRejectedException.class) public void testShutdown() throws
Exception {
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5,
manager); //$NON-NLS-1$
- pool.shutdown();
- pool.scheduleWork(new FakeWorkItem(1));
- }
-
- /*@Test public void testScheduleCancel() throws Exception {
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5);
//$NON-NLS-1$
- ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- }
- }, 0, 5, TimeUnit.MILLISECONDS);
- future.cancel(true);
- assertFalse(future.cancel(true));
- }*/
-
- @Test public void testSchedule() throws Exception {
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5,
manager); //$NON-NLS-1$
- final ArrayList<String> result = new ArrayList<String>();
- pool.scheduleWork(new Work() {
-
- @Override
- public void run() {
- result.add("hello"); //$NON-NLS-1$
- }
-
- @Override
- public void release() {
-
- }
- }, null, 5);
- Thread.sleep(100);
- pool.shutdown();
- pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
- assertEquals(1, result.size());
- }
-
- /*(a)Test(expected=ExecutionException.class) public void testScheduleException() throws
Exception {
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5);
//$NON-NLS-1$
- ScheduledFuture<?> future = pool.schedule(new Runnable() {
- @Override
- public void run() {
- throw new RuntimeException();
- }
- }, 0, TimeUnit.MILLISECONDS);
- future.get();
- }*/
-
- /**
- * Here each execution exceeds the period
- */
- /*@Test public void testScheduleRepeated() throws Exception {
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5);
//$NON-NLS-1$
- final ArrayList<String> result = new ArrayList<String>();
- ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- result.add("hello"); //$NON-NLS-1$
- try {
- Thread.sleep(75);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }, 0, 10, TimeUnit.MILLISECONDS);
- Thread.sleep(100);
- future.cancel(true);
- assertEquals(2, result.size());
- }*/
-
- @Test public void testFailingWork() throws Exception {
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5,
manager); //$NON-NLS-1$
- final AtomicInteger count = new AtomicInteger();
- pool.scheduleWork(new Work() {
- @Override
- public void run() {
- count.getAndIncrement();
- throw new RuntimeException();
- }
-
- @Override
- public void release() {
-
- }
- });
- Thread.sleep(100);
- assertEquals(1, count.get());
- }
-
-}
Copied: trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java
(from rev 2323,
trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java)
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java
(rev 0)
+++
trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -0,0 +1,226 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.queue;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.resource.spi.work.Work;
+
+import org.junit.Test;
+import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
+import org.teiid.dqp.internal.process.ThreadReuseExecutor;
+import org.teiid.dqp.internal.process.DQPCore.FutureWork;
+
+/**
+ */
+public class TestThreadReuseExecutor {
+
+ @Test public void testQueuing() throws Exception {
+ final long SINGLE_WAIT = 50;
+ final int WORK_ITEMS = 10;
+ final int MAX_THREADS = 5;
+
+ final ThreadReuseExecutor pool = new ThreadReuseExecutor("test",
MAX_THREADS); //$NON-NLS-1$
+
+ for(int i=0; i<WORK_ITEMS; i++) {
+ pool.execute(new FakeWorkItem(SINGLE_WAIT));
+ }
+
+ pool.shutdown();
+ pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ assertTrue(pool.isTerminated());
+ WorkerPoolStatisticsMetadata stats = pool.getStats();
+ assertEquals(10, stats.getTotalCompleted());
+ assertEquals("Expected threads to be maxed out", MAX_THREADS,
stats.getHighestActiveThreads()); //$NON-NLS-1$
+ }
+
+ @Test public void testThreadReuse() throws Exception {
+ final long SINGLE_WAIT = 50;
+ final long NUM_THREADS = 5;
+
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5);
//$NON-NLS-1$
+
+ for(int i=0; i<NUM_THREADS; i++) {
+ pool.execute(new FakeWorkItem(SINGLE_WAIT));
+
+ try {
+ Thread.sleep(SINGLE_WAIT*2);
+ } catch(InterruptedException e) {
+ }
+ }
+
+ pool.shutdown();
+
+ WorkerPoolStatisticsMetadata stats = pool.getStats();
+ assertEquals("Expected 1 thread for serial execution", 1,
stats.getHighestActiveThreads()); //$NON-NLS-1$
+
+ pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ }
+
+ @Test(expected=RejectedExecutionException.class) public void testShutdown() throws
Exception {
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5);
//$NON-NLS-1$
+ pool.shutdown();
+ pool.execute(new FakeWorkItem(1));
+ }
+
+ @Test public void testScheduleCancel() throws Exception {
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5);
//$NON-NLS-1$
+ ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ }
+ }, 0, 5, TimeUnit.MILLISECONDS);
+ future.cancel(true);
+ assertFalse(future.cancel(true));
+ }
+
+ @Test public void testSchedule() throws Exception {
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5);
//$NON-NLS-1$
+ final ArrayList<String> result = new ArrayList<String>();
+ pool.schedule(new Work() {
+
+ @Override
+ public void run() {
+ result.add("hello"); //$NON-NLS-1$
+ }
+
+ @Override
+ public void release() {
+
+ }
+ }, 5, TimeUnit.MILLISECONDS);
+ Thread.sleep(100);
+ pool.shutdown();
+ pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ assertEquals(1, result.size());
+ }
+
+ @Test(expected=ExecutionException.class) public void testScheduleException() throws
Exception {
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5);
//$NON-NLS-1$
+ ScheduledFuture<?> future = pool.schedule(new Runnable() {
+ @Override
+ public void run() {
+ throw new RuntimeException();
+ }
+ }, 0, TimeUnit.MILLISECONDS);
+ future.get();
+ }
+
+ /**
+ * Here each execution exceeds the period
+ */
+ @Test public void testScheduleRepeated() throws Exception {
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5);
//$NON-NLS-1$
+ final ArrayList<String> result = new ArrayList<String>();
+ ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ result.add("hello"); //$NON-NLS-1$
+ try {
+ Thread.sleep(75);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 0, 30, TimeUnit.MILLISECONDS);
+ Thread.sleep(140);
+ future.cancel(true);
+ assertEquals(2, result.size());
+ }
+
+ @Test public void testFailingWork() throws Exception {
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5);
//$NON-NLS-1$
+ final AtomicInteger count = new AtomicInteger();
+ pool.execute(new Work() {
+ @Override
+ public void run() {
+ count.getAndIncrement();
+ throw new RuntimeException();
+ }
+
+ @Override
+ public void release() {
+
+ }
+ });
+ Thread.sleep(100);
+ assertEquals(1, count.get());
+ }
+
+ @Test public void testPriorities() throws Exception {
+ final ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 1);
//$NON-NLS-1$
+ FutureWork<Boolean> work1 = new FutureWork<Boolean>(new
Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ synchronized (pool) {
+ while (pool.getSubmittedCount() < 4) {
+ pool.wait();
+ }
+ }
+ return true;
+ }
+ }, 0);
+ final ConcurrentLinkedQueue<Integer> order = new
ConcurrentLinkedQueue<Integer>();
+ FutureWork<Boolean> work2 = new FutureWork<Boolean>(new
Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ order.add(2);
+ return true;
+ }
+ }, 2);
+ FutureWork<Boolean> work3 = new FutureWork<Boolean>(new
Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ order.add(3);
+ return false;
+ }
+ }, 1);
+ FutureWork<Boolean> work4 = new FutureWork<Boolean>(new
Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ order.add(4);
+ return false;
+ }
+ }, 2);
+ pool.execute(work1);
+ pool.execute(work2);
+ pool.execute(work3);
+ pool.execute(work4);
+ synchronized (pool) {
+ pool.notifyAll();
+ }
+ work1.getResult().get();
+ work2.getResult().get();
+ work3.getResult().get();
+ work4.getResult().get();
+ assertEquals(Integer.valueOf(3), order.remove());
+ assertEquals(Integer.valueOf(2), order.remove());
+ assertEquals(Integer.valueOf(4), order.remove());
+ }
+
+}
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -28,9 +28,6 @@
import junit.framework.TestCase;
-import org.mockito.Mockito;
-import org.teiid.common.buffer.BlockedException;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.dqp.message.AtomicRequestID;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.RequestID;
@@ -54,7 +51,6 @@
return c.getConnection();
}
};
- cm.setMaxConnections(1);
cm.start();
return cm;
}
@@ -74,7 +70,7 @@
}
void helpAssureOneState() throws Exception {
- csm.executeRequest(request, Mockito.mock(AbstractWorkItem.class));
+ csm.registerRequest(request);
ConnectorWork state = csm.getState(request.getAtomicRequestID());
assertEquals(state, csm.getState(request.getAtomicRequestID()));
}
@@ -106,34 +102,5 @@
assertEquals("Expected size of 1", 1, csm.size()); //$NON-NLS-1$
}
-
- public void testQueuing() throws Exception {
- ConnectorWork workItem = csm.executeRequest(request,
Mockito.mock(AbstractWorkItem.class));
- workItem.execute();
-
- AbstractWorkItem awi1 = Mockito.mock(AbstractWorkItem.class);
- ConnectorWork workItem1 =
csm.executeRequest(TestConnectorWorkItem.createNewAtomicRequestMessage(2, 1), awi1);
-
- AbstractWorkItem awi2 = Mockito.mock(AbstractWorkItem.class);
- ConnectorWork workItem2 =
csm.executeRequest(TestConnectorWorkItem.createNewAtomicRequestMessage(3, 1), awi2);
-
- try {
- workItem1.execute();
- fail("expected exception"); //$NON-NLS-1$
- } catch (BlockedException e) {
-
- }
- workItem.close();
-
- try {
- workItem2.execute(); //ensure that another item cannot jump in the queue
- fail("expected exception"); //$NON-NLS-1$
- } catch (BlockedException e) {
-
- }
-
- Mockito.verify(awi1).moreWork();
- workItem1.execute();
- }
-
+
}
\ No newline at end of file
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -22,8 +22,7 @@
package org.teiid.dqp.internal.datamgr.impl;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.fail;
+import static junit.framework.Assert.*;
import java.util.Arrays;
import java.util.List;
@@ -35,7 +34,6 @@
import org.mockito.Mockito;
import org.teiid.client.RequestMessage;
import org.teiid.dqp.internal.datamgr.language.LanguageBridgeFactory;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.dqp.internal.process.DQPWorkContext;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
@@ -49,8 +47,8 @@
import org.teiid.query.sql.lang.StoredProcedure;
import org.teiid.query.sql.symbol.Constant;
import org.teiid.query.unittest.FakeMetadataFactory;
-import org.teiid.translator.TranslatorException;
import org.teiid.translator.ProcedureExecution;
+import org.teiid.translator.TranslatorException;
public class TestConnectorWorkItem {
@@ -117,8 +115,7 @@
Command command = helpGetCommand("update bqt1.smalla set stringkey = 1 where
stringkey = 2", EXAMPLE_BQT); //$NON-NLS-1$
AtomicRequestMessage arm = createNewAtomicRequestMessage(1, 1);
arm.setCommand(command);
- ConnectorWorkItem synchConnectorWorkItem = new ConnectorWorkItem(arm,
Mockito.mock(AbstractWorkItem.class),
- TestConnectorManager.getConnectorManager());
+ ConnectorWorkItem synchConnectorWorkItem = new ConnectorWorkItem(arm,
TestConnectorManager.getConnectorManager());
return synchConnectorWorkItem.execute();
}
@@ -152,7 +149,7 @@
return Mockito.mock(Xid.class);
}} );
- new ConnectorWorkItem(requestMsg, Mockito.mock(AbstractWorkItem.class), cm);
+ new ConnectorWorkItem(requestMsg, cm);
}
@Ignore
@@ -180,7 +177,7 @@
return Mockito.mock(Xid.class);
}} );
- new ConnectorWorkItem(requestMsg, Mockito.mock(AbstractWorkItem.class), cm);
+ new ConnectorWorkItem(requestMsg, cm);
}
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2010-07-08
15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -56,7 +56,6 @@
Mockito.stub(repo.getConnectorManager(Mockito.anyString())).toReturn(new
AutoGenDataService());
core = new DQPCore();
- core.setWorkManager(new FakeWorkManager());
core.setBufferService(new FakeBufferService());
core.setConnectorManagerRepository(repo);
core.setTransactionService(new FakeTransactionService());
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -23,11 +23,13 @@
package org.teiid.dqp.internal.process;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import junit.framework.TestCase;
+import org.mockito.Mockito;
import org.teiid.adminapi.impl.RequestMetadata;
import org.teiid.client.RequestMessage;
import org.teiid.client.SourceWarning;
@@ -50,12 +52,12 @@
}
private void compareReqInfos(Collection<RequestID> reqs1,
Collection<RequestMetadata> reqs2) {
- Set reqIDs2 = new HashSet();
+ Set<RequestID> reqIDs2 = new HashSet<RequestID>();
for (RequestMetadata requestInfo : reqs2) {
reqIDs2.add(new RequestID(requestInfo.getSessionId(),
requestInfo.getExecutionId()));
}
- assertEquals("Collections of request infos are not the same: ", new
HashSet(reqs1), reqIDs2); //$NON-NLS-1$
+ assertEquals("Collections of request infos are not the same: ", new
HashSet<RequestID>(reqs1), reqIDs2); //$NON-NLS-1$
}
/**
@@ -63,8 +65,8 @@
*/
public void testGetRequestsSessionToken1() {
DQPCore rm = new DQPCore();
- Set reqs = new HashSet();
- Collection actualReqs = rm.getRequestsForSession(SESSION_STRING);
+ Set<RequestID> reqs = Collections.emptySet();
+ Collection<RequestMetadata> actualReqs =
rm.getRequestsForSession(SESSION_STRING);
compareReqInfos(reqs, actualReqs);
}
@@ -74,7 +76,7 @@
public void testGetRequestsSessionToken2() {
DQPCore rm = new DQPCore();
rm.setTransactionService(new FakeTransactionService());
- Set reqs = new HashSet();
+ Set<RequestID> reqs = new HashSet<RequestID>();
RequestID id = addRequest(rm, SESSION_STRING, 1);
reqs.add(id);
@@ -95,13 +97,13 @@
public void testGetRequestsSessionToken3() {
DQPCore rm = new DQPCore();
rm.setTransactionService(new FakeTransactionService());
- Set reqs = new HashSet();
+ Set<RequestID> reqs = new HashSet<RequestID>();
reqs.add(addRequest(rm, SESSION_STRING, 0));
reqs.add(addRequest(rm, SESSION_STRING, 1));
reqs.add(addRequest(rm, SESSION_STRING, 2));
- Collection actualReqs = rm.getRequestsForSession(SESSION_STRING);
+ Collection<RequestMetadata> actualReqs =
rm.getRequestsForSession(SESSION_STRING);
compareReqInfos(reqs, actualReqs);
}
@@ -158,7 +160,7 @@
RequestWorkItem workItem = addRequest(rm, r0, requestID, null, null);
AtomicRequestMessage atomicReq = new AtomicRequestMessage(workItem.requestMsg,
workItem.getDqpWorkContext(), 1);
- DataTierTupleSource info = new DataTierTupleSource(null, atomicReq, null,
"connID", workItem); //$NON-NLS-1$
+ DataTierTupleSource info = Mockito.mock(DataTierTupleSource.class);
workItem.addConnectorRequest(atomicReq.getAtomicRequestID(), info);
DataTierTupleSource arInfo =
workItem.getConnectorRequest(atomicReq.getAtomicRequestID());
@@ -173,7 +175,7 @@
RequestWorkItem workItem = addRequest(rm, r0, requestID, null, null);
AtomicRequestMessage atomicReq = new AtomicRequestMessage(workItem.requestMsg,
workItem.getDqpWorkContext(), 1);
- DataTierTupleSource info = new DataTierTupleSource(null, atomicReq,
null,"connID", workItem); //$NON-NLS-1$
+ DataTierTupleSource info = Mockito.mock(DataTierTupleSource.class);
workItem.addConnectorRequest(atomicReq.getAtomicRequestID(), info);
workItem.closeAtomicRequest(atomicReq.getAtomicRequestID());
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -22,8 +22,9 @@
package org.teiid.dqp.internal.process;
-import junit.framework.TestCase;
+import static org.junit.Assert.*;
+import org.junit.Test;
import org.mockito.Mockito;
import org.teiid.client.RequestMessage;
import org.teiid.core.TeiidException;
@@ -33,6 +34,8 @@
import org.teiid.dqp.message.RequestID;
import org.teiid.dqp.service.AutoGenDataService;
import org.teiid.dqp.service.FakeBufferService;
+import org.teiid.dqp.service.TransactionContext;
+import org.teiid.dqp.service.TransactionContext.Scope;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.optimizer.capabilities.DefaultCapabilitiesFinder;
import org.teiid.query.parser.QueryParser;
@@ -40,10 +43,9 @@
import org.teiid.query.sql.lang.Command;
import org.teiid.query.unittest.FakeMetadataFactory;
import org.teiid.query.util.CommandContext;
+import org.teiid.translator.TranslatorException;
-
-
-public class TestDataTierManager extends TestCase {
+public class TestDataTierManager {
private DQPCore rm;
private DataTierManagerImpl dtm;
@@ -54,10 +56,6 @@
private AutoGenDataService connectorManager;
private RequestWorkItem workItem;
- public TestDataTierManager(String name) {
- super(name);
- }
-
private static Command helpGetCommand(String sql, QueryMetadataInterface metadata)
throws Exception {
Command command = QueryParser.getQueryParser().parseCommand(sql);
QueryResolver.resolveCommand(command, metadata);
@@ -75,7 +73,8 @@
connectorManager = new AutoGenDataService();
rm = new DQPCore();
rm.setTransactionService(new FakeTransactionService());
-
+ rm.setBufferService(new FakeBufferService());
+ rm.start(new DQPConfiguration());
FakeBufferService bs = new FakeBufferService();
ConnectorManagerRepository repo =
Mockito.mock(ConnectorManagerRepository.class);
@@ -92,7 +91,7 @@
RequestMessage original = new RequestMessage();
original.setExecutionId(1);
-
+ original.setPartialResults(true);
RequestID requestID = workContext.getRequestID(original.getExecutionId());
context = new CommandContext();
@@ -105,11 +104,13 @@
request = new AtomicRequestMessage(original, workContext, nodeId);
request.setCommand(command);
request.setConnectorName("FakeConnectorID"); //$NON-NLS-1$
-
- info = new DataTierTupleSource(command.getProjectedSymbols(), request, dtm,
request.getConnectorName(), workItem);
+ TransactionContext tc = new TransactionContext();
+ tc.setTransactionType(Scope.GLOBAL);
+ request.setTransactionContext(tc);
+ info = new DataTierTupleSource(request, workItem,
connectorManager.registerRequest(request), dtm);
}
- public void testDataTierTupleSource() throws Exception {
+ @Test public void testDataTierTupleSource() throws Exception {
helpSetup(1);
info.nextTuple();
assertNotNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
@@ -117,7 +118,13 @@
assertNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
}
- public void testCodeTableResponseException() throws Exception {
+ @Test public void testPartialResults() throws Exception {
+ helpSetup(1);
+ info.exceptionOccurred(new TranslatorException(), true);
+ assertNull(info.nextTuple());
+ }
+
+ @Test public void testCodeTableResponseException() throws Exception {
helpSetup(3);
this.connectorManager.throwExceptionOnExecute = true;
@@ -129,13 +136,13 @@
}
}
- public void testNoRowsException() throws Exception {
+ @Test public void testNoRowsException() throws Exception {
helpSetup(3);
this.connectorManager.setRows(0);
assertNull(info.nextTuple());
}
- public void testCodeTableResponseDataNotAvailable() throws Exception {
+ @Test public void testCodeTableResponseDataNotAvailable() throws Exception {
helpSetup(3);
this.connectorManager.dataNotAvailable = 5;
Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2010-07-08
15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -28,11 +28,11 @@
import java.util.Iterator;
import java.util.List;
+import org.teiid.core.TeiidComponentException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
import org.teiid.dqp.internal.datamgr.impl.ConnectorWork;
import org.teiid.dqp.internal.datamgr.impl.ConnectorWorkItem;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
@@ -68,10 +68,10 @@
}
@Override
- public ConnectorWork executeRequest(AtomicRequestMessage message, AbstractWorkItem
awi)
- throws TranslatorException {
+ public ConnectorWork registerRequest(AtomicRequestMessage message)
+ throws TeiidComponentException {
if (throwExceptionOnExecute) {
- throw new TranslatorException("Connector Exception"); //$NON-NLS-1$
+ throw new TeiidComponentException("Connector Exception"); //$NON-NLS-1$
}
List projectedSymbols = (message.getCommand()).getProjectedSymbols();
List[] results = createResults(projectedSymbols);
@@ -105,10 +105,6 @@
}
- @Override
- public boolean isQueued() {
- return false;
- }
};
}
Modified:
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestUnionAllNode.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestUnionAllNode.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestUnionAllNode.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -22,46 +22,38 @@
package org.teiid.query.processor.relational;
+import static org.junit.Assert.*;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.junit.Test;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
-import org.teiid.query.processor.relational.RelationalNode;
-import org.teiid.query.processor.relational.UnionAllNode;
+import org.teiid.query.processor.FakeDataManager;
import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.util.CommandContext;
-import junit.framework.TestCase;
-
/**
*/
-public class TestUnionAllNode extends TestCase {
+public class TestUnionAllNode {
- /**
- * Constructor for TestUnionAllNode.
- * @param arg0
- */
- public TestUnionAllNode(String arg0) {
- super(arg0);
- }
-
public void helpTestUnion(RelationalNode[] children, RelationalNode union, List[]
expected) throws TeiidComponentException, TeiidProcessingException {
BufferManager mgr = NodeTestUtil.getTestBufferManager(1, 2);
CommandContext context = new CommandContext("pid", "test",
null, null, 1); //$NON-NLS-1$ //$NON-NLS-2$
-
+ FakeDataManager fdm = new FakeDataManager();
for(int i=0; i<children.length; i++) {
union.addChild(children[i]);
- children[i].initialize(context, mgr, null);
+ children[i].initialize(context, mgr, fdm);
}
- union.initialize(context, mgr, null);
+ union.initialize(context, mgr, fdm);
union.open();
@@ -90,7 +82,7 @@
assertEquals("Didn't match expected counts", expected.length,
currentRow-1); //$NON-NLS-1$
}
- public void testNoRows() throws TeiidComponentException, TeiidProcessingException {
+ @Test public void testNoRows() throws TeiidComponentException,
TeiidProcessingException {
ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
@@ -151,7 +143,7 @@
helpTestUnion(nodes, union, expected);
}
- public void testBasicUnion() throws TeiidComponentException, TeiidProcessingException
{
+ @Test public void testBasicUnion() throws TeiidComponentException,
TeiidProcessingException {
List expected[] = new List[] {
Arrays.asList(new Object[] { new Integer(0) }),
Arrays.asList(new Object[] { new Integer(0) }),
@@ -164,7 +156,7 @@
}
- public void testBasicUnionMultipleSources() throws TeiidComponentException,
TeiidProcessingException {
+ @Test public void testBasicUnionMultipleSources() throws TeiidComponentException,
TeiidProcessingException {
List expected[] = new List[] {
Arrays.asList(new Object[] { new Integer(0) }),
Arrays.asList(new Object[] { new Integer(0) }),
@@ -181,7 +173,7 @@
helpTestUnionConfigs(5, -1, 2, 50, expected);
}
- public void testMultipleSourcesHalfBlockingNodes() throws TeiidComponentException,
TeiidProcessingException {
+ @Test public void testMultipleSourcesHalfBlockingNodes() throws
TeiidComponentException, TeiidProcessingException {
List expected[] = new List[] {
Arrays.asList(new Object[] { new Integer(1) }),
Arrays.asList(new Object[] { new Integer(0) }),
@@ -193,7 +185,7 @@
helpTestUnionConfigs(5, 2, 1, 50, expected);
}
- public void testMultipleSourcesAllBlockingNodes() throws TeiidComponentException,
TeiidProcessingException {
+ @Test public void testMultipleSourcesAllBlockingNodes() throws
TeiidComponentException, TeiidProcessingException {
List expected[] = new List[] {
Arrays.asList(new Object[] { new Integer(0) }),
Arrays.asList(new Object[] { new Integer(1) }),
@@ -205,7 +197,7 @@
helpTestUnionConfigs(5, 1, 1, 50, expected);
}
- public void testMultipleSourceMultiBatchAllBlocking() throws TeiidComponentException,
TeiidProcessingException {
+ @Test public void testMultipleSourceMultiBatchAllBlocking() throws
TeiidComponentException, TeiidProcessingException {
List expected[] = new List[] {
Arrays.asList(new Object[] { new Integer(0) }),
Arrays.asList(new Object[] { new Integer(1) }),
Modified:
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -29,8 +29,6 @@
import org.jboss.resource.metadata.mcf.ManagedConnectionFactoryDeploymentGroup;
import org.jboss.resource.metadata.mcf.ManagedConnectionFactoryDeploymentMetaData;
import org.teiid.deployers.VDBStatusChecker;
-import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
-import org.teiid.dqp.internal.datamgr.impl.ConnectorManagerRepository;
/**
* This deployer listens to the data source load and unload events and manages the
connectionManager status based
@@ -38,7 +36,6 @@
*/
public class ConnectionFactoryDeployer extends
AbstractSimpleRealDeployer<ManagedConnectionFactoryDeploymentGroup> {
- private ConnectorManagerRepository connectorManagerRepository;
private VDBStatusChecker vdbChecker;
public ConnectionFactoryDeployer() {
@@ -52,13 +49,6 @@
for (ManagedConnectionFactoryDeploymentMetaData data : deployments) {
this.vdbChecker.dataSourceAdded(data.getJndiName());
-
- // set the number of available connections on the cm
- for (ConnectorManager
cm:this.connectorManagerRepository.getConnectorManagers()) {
- if (cm.getConnectionName().equals(data.getJndiName())) {
- cm.setMaxConnections(data.getMaxSize());
- }
- }
}
}
@@ -72,10 +62,6 @@
}
}
- public void setConnectorManagerRepository(ConnectorManagerRepository repo) {
- this.connectorManagerRepository = repo;
- }
-
public void setVDBStatusChecker(VDBStatusChecker checker) {
this.vdbChecker = checker;
}
Modified:
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-07-08
15:38:23 UTC (rev 2332)
+++
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -210,7 +210,6 @@
}
public void setWorkManager(WorkManager mgr) {
- this.dqpCore.setWorkManager(mgr);
this.transactionServerImpl.setWorkManager(mgr);
}
Modified: trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2010-07-08
15:38:23 UTC (rev 2332)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2010-07-12
20:59:02 UTC (rev 2333)
@@ -32,7 +32,6 @@
import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.client.DQP;
import org.teiid.client.security.ILogon;
-import org.teiid.common.queue.FakeWorkManager;
import org.teiid.deployers.MetadataStoreGroup;
import org.teiid.deployers.VDBRepository;
import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
@@ -48,7 +47,6 @@
import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
import org.teiid.query.optimizer.capabilities.SourceCapabilities;
import org.teiid.services.SessionServiceImpl;
-import org.teiid.translator.TranslatorException;
import org.teiid.transport.ClientServiceRegistry;
import org.teiid.transport.ClientServiceRegistryImpl;
import org.teiid.transport.LocalServerConnection;
@@ -70,15 +68,13 @@
this.repo.setSystemStore(systemStore);
this.sessionService.setVDBRepository(repo);
- this.dqp.setWorkManager(new FakeWorkManager());
this.dqp.setBufferService(new FakeBufferService());
this.dqp.setTransactionService(new FakeTransactionService());
ConnectorManagerRepository cmr = Mockito.mock(ConnectorManagerRepository.class);
Mockito.stub(cmr.getConnectorManager("source")).toReturn(new
ConnectorManager("x", "x") {
@Override
- public SourceCapabilities getCapabilities()
- throws TranslatorException {
+ public SourceCapabilities getCapabilities() {
return new BasicSourceCapabilities();
}
});