teiid SVN: r2333 - in trunk: build/kits/jboss-container/deployers/teiid.deployer and 14 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2010-07-12 16:59:02 -0400 (Mon, 12 Jul 2010)
New Revision: 2333
Added:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java
Removed:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/StatsCapturingWorkManager.java
trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java
Modified:
trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
trunk/build/kits/jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml
trunk/common-core/src/main/resources/org/teiid/core/i18n.properties
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/CapabilitiesConverter.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManagerRepository.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
trunk/engine/src/main/resources/org/teiid/query/i18n.properties
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestUnionAllNode.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
Log:
TEIID-1151 re-adding support for multi-threaded source queries. also fixing partial results and issameconnector check
Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2010-07-12 20:59:02 UTC (rev 2333)
@@ -75,8 +75,10 @@
<property name="securityHelper"><inject bean="SecurityHelper"/></property>
<property name="VDBRepository"><inject bean="VDBRepository"/></property>
- <!-- Process pool maximum thread count. (default 32) Increase this value if the system's available processors is larger than 8. -->
- <property name="maxThreads">32</property>
+ <!-- Process pool maximum thread count. (default 64) -->
+ <property name="maxThreads">64</property>
+ <!-- Max active plans (default 20). Increase this value on highly concurrent systems - but ensure that the underlying pools can handle the increased load without timeouts. -->
+ <property name="maxActivePlans">20</property>
<!-- Query processor time slice, in milliseconds. (default 2000) -->
<property name="timeSliceInMilli">2000</property>
<!-- Maximum allowed fetch size, set via JDBC. User requested value ignored above this value. (default 20480) -->
Modified: trunk/build/kits/jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/build/kits/jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml 2010-07-12 20:59:02 UTC (rev 2333)
@@ -57,7 +57,6 @@
</bean>
<bean name="ConnectionFactoryDeployer" class="org.teiid.jboss.deployers.ConnectionFactoryDeployer">
- <property name="connectorManagerRepository"><inject bean="ConnectorManagerRepository"/></property>
<property name="VDBStatusChecker"><inject bean="VDBStatusChecker"/></property>
</bean>
Modified: trunk/common-core/src/main/resources/org/teiid/core/i18n.properties
===================================================================
--- trunk/common-core/src/main/resources/org/teiid/core/i18n.properties 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/common-core/src/main/resources/org/teiid/core/i18n.properties 2010-07-12 20:59:02 UTC (rev 2333)
@@ -328,9 +328,6 @@
Streamable.isNUll=Streamable object argument can not be null
Streamable.InvalidReference=Streamable contents are not available, use the Streaming interface to get the contents.
-WorkerPool.New_thread=Created worker thread "{0}".
-WorkerPool.uncaughtException=Uncaught exception processing work
-
FloatToBooleanTransform.Failed_transform=Failed to transform Float to Boolean. Expected 0 or 1 for {0}
NullToAnyTransform.Invalid_value=Invalid value for type {0}: {1} of type {2}
ObjectToAnyTransform.Invalid_value=Invalid conversion from type {0} with value ''{2}'' to type {1}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -23,12 +23,11 @@
package org.teiid.common.buffer;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.teiid.core.TeiidComponentException;
@@ -295,7 +294,7 @@
lob.setReferenceStreamId(id);
}
if (this.lobReferences == null) {
- this.lobReferences = Collections.synchronizedMap(new HashMap<String, Streamable<?>>());
+ this.lobReferences = new ConcurrentHashMap<String, Streamable<?>>();
}
this.lobReferences.put(id, lob);
if (lob.getReference() == null) {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/CapabilitiesConverter.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/CapabilitiesConverter.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/CapabilitiesConverter.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -44,7 +44,7 @@
return convertCapabilities(srcCaps, null);
}
- public static BasicSourceCapabilities convertCapabilities(ExecutionFactory srcCaps, String connectorID) {
+ public static BasicSourceCapabilities convertCapabilities(ExecutionFactory srcCaps, Object connectorID) {
BasicSourceCapabilities tgtCaps = new BasicSourceCapabilities();
tgtCaps.setCapabilitySupport(Capability.QUERY_SELECT_EXPRESSION, srcCaps.supportsSelectExpression());
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -26,21 +26,18 @@
*/
package org.teiid.dqp.internal.datamgr.impl;
-import java.util.LinkedList;
+import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import org.teiid.common.buffer.BlockedException;
+import org.teiid.core.TeiidComponentException;
import org.teiid.core.util.Assertion;
import org.teiid.dqp.DQPPlugin;
import org.teiid.dqp.internal.cache.DQPContextCache;
-import org.teiid.dqp.internal.datamgr.impl.ConnectorWorkItem.PermitMode;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.dqp.message.AtomicRequestID;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.service.BufferService;
@@ -69,13 +66,8 @@
private static final String JAVA_CONTEXT = "java:"; //$NON-NLS-1$
- public static final int DEFAULT_MAX_THREADS = 20;
-
- private static AtomicInteger ID_SEQUENCE = new AtomicInteger();
-
private String translatorName;
private String connectionName;
- private String connectorId = String.valueOf(ID_SEQUENCE.getAndIncrement());
//services acquired in start
private BufferService bufferService;
@@ -85,10 +77,6 @@
private SourceCapabilities cachedCapabilities;
- private int currentConnections;
- private int maxConnections = DEFAULT_MAX_THREADS;
- private LinkedList<ConnectorWorkItem> queuedRequests = new LinkedList<ConnectorWorkItem>();
-
private volatile boolean stopped;
private ExecutionFactory<Object, Object> executionFactory;
@@ -115,24 +103,8 @@
return sb.toString();
}
- public synchronized void acquireConnectionLock(ConnectorWorkItem item) throws BlockedException {
- switch (item.getPermitMode()) {
- case NOT_ACQUIRED:
- if (currentConnections < maxConnections) {
- currentConnections++;
- item.setPermitMode(PermitMode.ACQUIRED);
- return;
- }
- queuedRequests.add(item);
- item.setPermitMode(PermitMode.BLOCKED);
- case BLOCKED:
- throw BlockedException.INSTANCE;
- }
- }
-
public MetadataStore getMetadata(String modelName, Map<String, Datatype> datatypes, Properties importProperties) throws TranslatorException {
MetadataFactory factory = new MetadataFactory(modelName, datatypes, importProperties);
- ExecutionFactory<Object, Object> executionFactory = getExecutionFactory();
Object connectionFactory = getConnectionFactory();
Object connection = executionFactory.getConnection(connectionFactory);
try {
@@ -143,26 +115,26 @@
return factory.getMetadataStore();
}
- public SourceCapabilities getCapabilities() throws TranslatorException {
+ public SourceCapabilities getCapabilities() throws TeiidComponentException {
if (cachedCapabilities != null) {
return cachedCapabilities;
}
checkStatus();
ExecutionFactory<Object, Object> translator = getExecutionFactory();
- BasicSourceCapabilities resultCaps = CapabilitiesConverter.convertCapabilities(translator, this.connectorId);
+ BasicSourceCapabilities resultCaps = CapabilitiesConverter.convertCapabilities(translator, Arrays.asList(translatorName, connectionName));
resultCaps.setScope(Scope.SCOPE_GLOBAL);
cachedCapabilities = resultCaps;
return resultCaps;
}
- public ConnectorWork executeRequest(AtomicRequestMessage message, AbstractWorkItem awi) throws TranslatorException {
+ public ConnectorWork registerRequest(AtomicRequestMessage message) throws TeiidComponentException {
// Set the connector ID to be used; if not already set.
checkStatus();
AtomicRequestID atomicRequestId = message.getAtomicRequestID();
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {atomicRequestId, "Create State"}); //$NON-NLS-1$
- ConnectorWorkItem item = new ConnectorWorkItem(message, awi, this);
+ ConnectorWorkItem item = new ConnectorWorkItem(message, this);
Assertion.isNull(requestStates.put(atomicRequestId, item), "State already existed"); //$NON-NLS-1$
return item;
}
@@ -178,17 +150,6 @@
void removeState(AtomicRequestID id) {
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {id, "Remove State"}); //$NON-NLS-1$
ConnectorWorkItem cwi = requestStates.remove(id);
- if (cwi != null && cwi.getPermitMode() == PermitMode.ACQUIRED) {
- synchronized (this) {
- ConnectorWorkItem next = queuedRequests.pollFirst();
- if (next == null) {
- currentConnections--;
- return;
- }
- next.setPermitMode(PermitMode.ACQUIRED);
- next.getParent().moreWork();
- }
- }
}
int size() {
@@ -257,13 +218,6 @@
* @return the <code>ExecutionFactory</code>.
*/
protected ExecutionFactory<Object, Object> getExecutionFactory() {
- if (this.executionFactory == null) {
- try {
- InitialContext ic = new InitialContext();
- return (ExecutionFactory<Object, Object>)ic.lookup(this.translatorName);
- } catch (NamingException e) {
- }
- }
return this.executionFactory;
}
@@ -306,16 +260,12 @@
return null;
}
- private void checkStatus() throws TranslatorException {
+ private void checkStatus() throws TeiidComponentException {
if (stopped) {
- throw new TranslatorException(DQPPlugin.Util.getString("ConnectorManager.not_in_valid_state", this.translatorName)); //$NON-NLS-1$
+ throw new TeiidComponentException(DQPPlugin.Util.getString("ConnectorManager.not_in_valid_state", this.translatorName)); //$NON-NLS-1$
}
}
- public void setMaxConnections(int value) {
- this.maxConnections = value;
- }
-
public String getTranslatorName() {
return this.translatorName;
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManagerRepository.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManagerRepository.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManagerRepository.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -43,7 +43,7 @@
}
public List<ConnectorManager> getConnectorManagers() {
- return new ArrayList(this.repo.values());
+ return new ArrayList<ConnectorManager>(this.repo.values());
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -34,12 +34,10 @@
void cancel();
- AtomicResultsMessage more() throws TranslatorException;
+ AtomicResultsMessage more() throws TranslatorException, BlockedException;
void close();
AtomicResultsMessage execute() throws TranslatorException, BlockedException;
- boolean isQueued();
-
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -37,7 +37,6 @@
import org.teiid.dqp.DQPPlugin;
import org.teiid.dqp.internal.datamgr.language.LanguageBridgeFactory;
import org.teiid.dqp.internal.datamgr.metadata.RuntimeMetadataImpl;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.dqp.message.AtomicRequestID;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
@@ -64,18 +63,12 @@
public class ConnectorWorkItem implements ConnectorWork {
- enum PermitMode {
- BLOCKED, ACQUIRED, NOT_ACQUIRED
- }
-
/* Permanent state members */
private AtomicRequestID id;
private ConnectorManager manager;
private AtomicRequestMessage requestMsg;
private ExecutionFactory connector;
private QueryMetadataInterface queryMetadata;
- private PermitMode permitMode = PermitMode.NOT_ACQUIRED;
- private AbstractWorkItem awi;
/* Created on new request */
private Object connection;
@@ -95,7 +88,7 @@
private AtomicBoolean isCancelled = new AtomicBoolean();
- ConnectorWorkItem(AtomicRequestMessage message, AbstractWorkItem awi, ConnectorManager manager) {
+ ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager) {
this.id = message.getAtomicRequestID();
this.requestMsg = message;
this.manager = manager;
@@ -118,30 +111,12 @@
this.queryMetadata = vdb.getAttachment(QueryMetadataInterface.class);
this.queryMetadata = new TempMetadataAdapter(this.queryMetadata, new TempMetadataStore());
this.securityContext.setTransactional(requestMsg.isTransactional());
- this.awi = awi;
}
public AtomicRequestID getId() {
return id;
}
- public PermitMode getPermitMode() {
- return permitMode;
- }
-
- public void setPermitMode(PermitMode permitMode) {
- this.permitMode = permitMode;
- }
-
- public AbstractWorkItem getParent() {
- return awi;
- }
-
- @Override
- public boolean isQueued() {
- return this.permitMode == PermitMode.BLOCKED;
- }
-
public void cancel() {
try {
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Processing CANCEL request"}); //$NON-NLS-1$
@@ -189,8 +164,6 @@
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Closed connection"}); //$NON-NLS-1$
}
}
-
-
private TranslatorException handleError(Throwable t) {
if (t instanceof DataNotAvailableException) {
@@ -224,10 +197,6 @@
throw new TranslatorException("Request canceled"); //$NON-NLS-1$
}
- if (!this.securityContext.isTransactional()) {
- this.manager.acquireConnectionLock(this);
- }
-
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.requestMsg.getAtomicRequestID(), "Processing NEW request:", this.requestMsg.getCommand()}); //$NON-NLS-1$
try {
this.connectionFactory = this.manager.getConnectionFactory();
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -37,8 +37,9 @@
static final int DEFAULT_MAX_RESULTSET_CACHE_ENTRIES = 1024;
static final int DEFAULT_QUERY_THRESHOLD = 600;
static final String PROCESS_PLAN_QUEUE_NAME = "QueryProcessorQueue"; //$NON-NLS-1$
- public static final int DEFAULT_MAX_PROCESS_WORKERS = 32;
+ public static final int DEFAULT_MAX_PROCESS_WORKERS = 64;
public static final int DEFAULT_MAX_SOURCE_ROWS = -1;
+ public static final int DEFAULT_MAX_ACTIVE_PLANS = 20;
private int maxThreads = DEFAULT_MAX_PROCESS_WORKERS;
private int timeSliceInMilli = DEFAULT_PROCESSOR_TIMESLICE;
@@ -55,8 +56,18 @@
private int queryThresholdInSecs = DEFAULT_QUERY_THRESHOLD;
private boolean exceptionOnMaxSourceRows = true;
private int maxSourceRows = -1;
+ private int maxActivePlans = DEFAULT_MAX_ACTIVE_PLANS;
+
+ @ManagementProperty(description="Max active plans (default 20). Increase this value, and max threads, on highly concurrent systems - but ensure that the underlying pools can handle the increased load without timeouts.")
+ public int getMaxActivePlans() {
+ return maxActivePlans;
+ }
- @ManagementProperty(description="Process pool maximum thread count. (default 32) Increase this value if the system's available processors is larger than 8")
+ public void setMaxActivePlans(int maxActivePlans) {
+ this.maxActivePlans = maxActivePlans;
+ }
+
+ @ManagementProperty(description="Process pool maximum thread count. (default 64)")
public int getMaxThreads() {
return maxThreads;
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -35,9 +35,7 @@
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkEvent;
-import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
-import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.Xid;
import org.teiid.adminapi.Admin;
@@ -58,12 +56,11 @@
import org.teiid.common.buffer.BufferManager;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.types.Streamable;
-import org.teiid.core.util.Assertion;
import org.teiid.dqp.DQPPlugin;
import org.teiid.dqp.internal.cache.DQPContextCache;
import org.teiid.dqp.internal.datamgr.impl.ConnectorManagerRepository;
+import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.RequestID;
import org.teiid.dqp.service.BufferService;
@@ -84,15 +81,17 @@
*/
public class DQPCore implements DQP {
- public final static class FutureWork<T> implements Work, WorkListener {
+ //TODO: replace with FutureTask
+ public final static class FutureWork<T> implements Work, WorkListener, PrioritizedRunnable {
private final Callable<T> toCall;
- private DQPWorkContext workContext;
private ResultsFuture<T> result = new ResultsFuture<T>();
private ResultsReceiver<T> receiver = result.getResultsReceiver();
+ private int priority;
+ private long creationTime = System.currentTimeMillis();
- public FutureWork(Callable<T> processor) {
- this.workContext = DQPWorkContext.getWorkContext();
+ public FutureWork(Callable<T> processor, int priority) {
this.toCall = processor;
+ this.priority = priority;
}
public ResultsFuture<T> getResult() {
@@ -102,7 +101,7 @@
@Override
public void run() {
try {
- receiver.receiveResults(workContext.runInContext(toCall));
+ receiver.receiveResults(toCall.call());
} catch (Throwable t) {
receiver.exceptionOccurred(t);
}
@@ -132,6 +131,16 @@
public void workStarted(WorkEvent arg0) {
}
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return creationTime;
+ }
}
static class ClientState {
@@ -164,8 +173,7 @@
}
- private WorkManager workManager;
- private StatsCapturingWorkManager processWorkerPool;
+ private ThreadReuseExecutor processWorkerPool;
// System properties for Code Table
private int maxCodeTableRecords = DQPConfiguration.DEFAULT_MAX_CODE_TABLE_RECORDS;
@@ -197,6 +205,11 @@
private Map<String, ClientState> clientState = Collections.synchronizedMap(new HashMap<String, ClientState>());
private DQPContextCache contextCache;
private boolean useEntitlements = false;
+
+ private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS;
+ private int currentlyActivePlans;
+ private LinkedList<RequestWorkItem> waitingPlans = new LinkedList<RequestWorkItem>();
+
/**
* perform a full shutdown and wait for 10 seconds for all threads to finish
*/
@@ -330,8 +343,13 @@
RequestWorkItem workItem = new RequestWorkItem(this, requestMsg, request, resultsFuture.getResultsReceiver(), requestID, workContext);
logMMCommand(workItem, Event.NEW, null);
addRequest(requestID, workItem, state);
-
- this.addWork(workItem);
+ synchronized (waitingPlans) {
+ if (currentlyActivePlans < maxActivePlans) {
+ startActivePlan(workItem);
+ } else {
+ waitingPlans.add(workItem);
+ }
+ }
return resultsFuture;
}
@@ -351,8 +369,28 @@
this.requests.put(requestID, workItem);
state.addRequest(requestID);
}
+
+ private void startActivePlan(RequestWorkItem workItem) {
+ workItem.active = true;
+ this.addWork(workItem);
+ this.currentlyActivePlans++;
+ }
+
+ void finishProcessing(final RequestWorkItem workItem) {
+ synchronized (waitingPlans) {
+ if (!workItem.active) {
+ return;
+ }
+ workItem.active = false;
+ currentlyActivePlans--;
+ if (!waitingPlans.isEmpty()) {
+ startActivePlan(waitingPlans.remove());
+ }
+ }
+ }
void removeRequest(final RequestWorkItem workItem) {
+ finishProcessing(workItem);
this.requests.remove(workItem.requestID);
ClientState state = getClientState(workItem.getDqpWorkContext().getSessionId(), false);
if (state != null) {
@@ -361,38 +399,20 @@
contextCache.removeRequestScopedCache(workItem.requestID.toString());
}
- void addWork(Work work) {
- try {
- this.processWorkerPool.scheduleWork(work);
- } catch (WorkException e) {
- //TODO: cancel? close?
- throw new TeiidRuntimeException(e);
- }
+ void addWork(Runnable work) {
+ this.processWorkerPool.execute(work);
}
- void scheduleWork(final RequestWorkItem work, long delay) {
- try {
- this.processWorkerPool.scheduleWork(new Work() {
-
- @Override
- public void run() {
- work.moreWork();
- }
-
- @Override
- public void release() {
-
- }
- }, null, delay);
- } catch (WorkException e) {
- throw new TeiidRuntimeException(e);
- }
+ void scheduleWork(final Runnable r, int priority, long delay) {
+ this.processWorkerPool.schedule(new FutureWork<Void>(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ r.run();
+ return null;
+ }
+ }, priority), delay, TimeUnit.MILLISECONDS);
}
- public void setWorkManager(WorkManager mgr) {
- this.workManager = mgr;
- }
-
public ResultsFuture<?> closeLobChunkStream(int lobRequestId,
long requestId, String streamId)
throws TeiidProcessingException {
@@ -446,13 +466,8 @@
return this.requests.get(processorID);
}
- /**
- * Returns a list of QueueStats objects that represent the queues in
- * this service.
- * If there are no queues, an empty Collection is returned.
- */
public WorkerPoolStatisticsMetadata getWorkManagerStatistics() {
- return processWorkerPool.getStats();
+ return this.processWorkerPool.getStats();
}
public void terminateSession(String sessionId) {
@@ -627,9 +642,6 @@
}
public void start(DQPConfiguration config) {
-
- Assertion.isNotNull(this.workManager);
-
this.processorTimeslice = config.getTimeSliceInMilli();
this.maxFetchSize = config.getMaxRowsFetchSize();
this.processorDebugAllowed = config.isProcessDebugAllowed();
@@ -658,7 +670,7 @@
this.bufferManager = bufferService.getBufferManager();
this.contextCache = bufferService.getContextCache();
- this.processWorkerPool = new StatsCapturingWorkManager(DQPConfiguration.PROCESS_PLAN_QUEUE_NAME, config.getMaxThreads(), this.workManager);
+ this.processWorkerPool = new ThreadReuseExecutor(DQPConfiguration.PROCESS_PLAN_QUEUE_NAME, config.getMaxThreads());
dataTierMgr = new DataTierManagerImpl(this,
this.connectorManagerRepository,
@@ -706,7 +718,7 @@
return null;
}
};
- return addWork(processor);
+ return addWork(processor, 0);
}
// local txn
@@ -719,7 +731,7 @@
return null;
}
};
- return addWork(processor);
+ return addWork(processor, 0);
}
// global txn
@@ -732,7 +744,7 @@
return null;
}
};
- return addWork(processor);
+ return addWork(processor, 0);
}
// global txn
public ResultsFuture<?> end(XidImpl xid, int flags) throws XATransactionException {
@@ -756,16 +768,12 @@
return getTransactionService().prepare(workContext.getSessionId(),xid, workContext.getSession().isEmbedded());
}
};
- return addWork(processor);
+ return addWork(processor, 10);
}
- private <T> ResultsFuture<T> addWork(Callable<T> processor) {
- FutureWork<T> work = new FutureWork<T>(processor);
- try {
- this.workManager.scheduleWork(work);
- } catch (WorkException e) {
- throw new TeiidRuntimeException(e);
- }
+ <T> ResultsFuture<T> addWork(Callable<T> processor, int priority) {
+ FutureWork<T> work = new FutureWork<T>(processor, priority);
+ this.addWork(work);
return work.getResult();
}
@@ -786,7 +794,7 @@
return null;
}
};
- return addWork(processor);
+ return addWork(processor, 0);
}
// global txn
public ResultsFuture<?> start(final XidImpl xid, final int flags, final int timeout)
@@ -799,7 +807,7 @@
return null;
}
};
- return addWork(processor);
+ return addWork(processor, 100);
}
public MetadataResult getMetadata(long requestID)
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -32,6 +32,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
import javax.security.auth.Subject;
@@ -166,14 +168,24 @@
return session.getVdb();
}
- public <V> V runInContext(Callable<V> callable) throws Exception {
+ public <V> V runInContext(Callable<V> callable) throws Throwable {
+ FutureTask<V> task = new FutureTask<V>(callable);
+ runInContext(task);
+ try {
+ return task.get();
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
+ public void runInContext(final Runnable runnable) {
DQPWorkContext.setWorkContext(this);
boolean associated = false;
if (securityHelper != null && this.getSubject() != null) {
associated = securityHelper.assosiateSecurityContext(this.getSecurityDomain(), this.getSecurityContext());
}
try {
- return callable.call();
+ runnable.run();
} finally {
if (associated) {
securityHelper.clearSecurityContext(this.getSecurityDomain());
@@ -181,19 +193,6 @@
DQPWorkContext.releaseWorkContext();
}
}
-
- public void runInContext(final Runnable runnable) {
- try {
- runInContext(new Callable<Void>() {
- @Override
- public Void call() {
- runnable.run();
- return null;
- }
- });
- } catch (Exception e) {
- }
- }
public HashMap<String, DataPolicy> getAllowedDataPolicies() {
if (this.policies == null) {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -30,10 +30,12 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import org.teiid.adminapi.impl.ModelMetaData;
import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.client.RequestMessage;
+import org.teiid.client.util.ResultsFuture;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleSource;
@@ -77,9 +79,11 @@
import org.teiid.query.sql.symbol.Constant;
import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.util.CommandContext;
-import org.teiid.translator.TranslatorException;
-
+/**
+ * Full {@link ProcessorDataManager} implementation that
+ * controls access to {@link ConnectorManager}s and handles system queries.
+ */
public class DataTierManagerImpl implements ProcessorDataManager {
private enum SystemTables {
@@ -130,7 +134,8 @@
}
AtomicRequestMessage aqr = createRequest(processorId, command, modelName, connectorBindingId, nodeID);
- return new DataTierTupleSource(aqr.getCommand().getProjectedSymbols(), aqr, this, aqr.getConnectorName(), workItem);
+ ConnectorWork work = getCM(aqr.getConnectorName()).registerRequest(aqr);
+ return new DataTierTupleSource(aqr, workItem, work, this);
}
/**
@@ -348,10 +353,6 @@
return aqr;
}
- ConnectorWork executeRequest(AtomicRequestMessage aqr, AbstractWorkItem awi, String connectorName) throws TranslatorException {
- return getCM(connectorName).executeRequest(aqr, awi);
- }
-
/**
* Notify each waiting request that the code table data is now available.
* @param requests
@@ -421,4 +422,12 @@
this.codeTableCache.clearAll();
}
+ <T> ResultsFuture<T> addWork(Callable<T> callable, int priority) {
+ return requestMgr.addWork(callable, priority);
+ }
+
+ void scheduleWork(Runnable r, int priority, long delay) {
+ requestMgr.scheduleWork(r, priority, delay);
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -24,105 +24,178 @@
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import org.teiid.client.SourceWarning;
+import org.teiid.client.util.ResultsFuture;
+import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.Assertion;
import org.teiid.dqp.internal.datamgr.impl.ConnectorWork;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
+import org.teiid.translator.DataNotAvailableException;
import org.teiid.translator.TranslatorException;
/**
* This tuple source impl can only be used once; once it is closed, it
* cannot be reopened and reused.
+ *
+ * TODO: the handling of DataNotAvailable is awkward.
+ * In the multi-threaded case we'd like to not even
+ * notify the parent plan and just schedule the next poll.
*/
public class DataTierTupleSource implements TupleSource {
// Construction state
- private final List schema;
private final AtomicRequestMessage aqr;
- private final DataTierManagerImpl dataMgr;
- private final String connectorName;
private final RequestWorkItem workItem;
+ private final ConnectorWork cwi;
+ private final DataTierManagerImpl dtm;
// Data state
- private ConnectorWork cwi;
private int index;
private int rowsProcessed;
private volatile AtomicResultsMessage arm;
private boolean closed;
private volatile boolean canceled;
+ private boolean executed;
+ private volatile ResultsFuture<AtomicResultsMessage> futureResult;
private volatile boolean running;
- /**
- * Constructor for DataTierTupleSource.
- */
- public DataTierTupleSource(List schema, AtomicRequestMessage aqr, DataTierManagerImpl dataMgr, String connectorName, RequestWorkItem workItem) {
- this.schema = schema;
+ public DataTierTupleSource(AtomicRequestMessage aqr, RequestWorkItem workItem, ConnectorWork cwi, DataTierManagerImpl dtm) {
this.aqr = aqr;
- this.dataMgr = dataMgr;
- this.connectorName = connectorName;
this.workItem = workItem;
+ this.cwi = cwi;
+ this.dtm = dtm;
+ Assertion.isNull(workItem.getConnectorRequest(aqr.getAtomicRequestID()));
+ workItem.addConnectorRequest(aqr.getAtomicRequestID(), this);
+ if (!aqr.isTransactional()) {
+ addWork();
+ }
}
- /**
- * @see TupleSource#getSchema()
- */
+ private void addWork() {
+ futureResult = dtm.addWork(new Callable<AtomicResultsMessage>() {
+ @Override
+ public AtomicResultsMessage call() throws Exception {
+ return getResults();
+ }
+ }, 100);
+ futureResult.addCompletionListener(new ResultsFuture.CompletionListener<AtomicResultsMessage>() {
+ public void onCompletion(ResultsFuture<AtomicResultsMessage> future) {
+ workItem.moreWork();
+ }
+ });
+ }
+
public List getSchema() {
- return this.schema;
+ return this.aqr.getCommand().getProjectedSymbols();
}
- public List nextTuple() throws TeiidComponentException, TeiidProcessingException {
- if (this.arm == null) {
- open();
- }
+ public List<?> nextTuple() throws TeiidComponentException, TeiidProcessingException {
while (true) {
+ if (arm == null) {
+ AtomicResultsMessage results = null;
+ try {
+ if (futureResult != null || !aqr.isTransactional()) {
+ results = asynchGet();
+ } else {
+ results = getResults();
+ }
+ } catch (TranslatorException e) {
+ exceptionOccurred(e, true);
+ } catch (DataNotAvailableException e) {
+ dtm.scheduleWork(new Runnable() {
+ @Override
+ public void run() {
+ workItem.moreWork();
+ }
+ }, 10, e.getRetryDelay());
+ throw BlockedException.INSTANCE;
+ }
+ receiveResults(results);
+ }
if (index < arm.getResults().length) {
return this.arm.getResults()[index++];
}
+ arm = null;
if (isDone()) {
return null;
}
- try {
- running = true;
- receiveResults(this.cwi.more());
- } catch (TranslatorException e) {
- exceptionOccurred(e, true);
- } finally {
- running = false;
- }
}
}
+
+ private AtomicResultsMessage asynchGet()
+ throws BlockedException, TeiidProcessingException,
+ TeiidComponentException, TranslatorException {
+ if (futureResult == null) {
+ addWork();
+ }
+ if (!futureResult.isDone()) {
+ throw BlockedException.INSTANCE;
+ }
+ ResultsFuture<AtomicResultsMessage> currentResults = futureResult;
+ futureResult = null;
+ AtomicResultsMessage results = null;
+ try {
+ results = currentResults.get();
+ addWork();
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof TeiidProcessingException) {
+ throw (TeiidProcessingException)e.getCause();
+ }
+ if (e.getCause() instanceof TeiidComponentException) {
+ throw (TeiidComponentException)e.getCause();
+ }
+ if (e.getCause() instanceof TranslatorException) {
+ throw (TranslatorException)e.getCause();
+ }
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException)e.getCause();
+ }
+ //shouldn't happen
+ throw new RuntimeException(e);
+ }
+ return results;
+ }
+
+ private AtomicResultsMessage getResults()
+ throws BlockedException, TeiidComponentException,
+ TranslatorException {
+ AtomicResultsMessage results = null;
+ try {
+ running = true;
+ if (!executed) {
+ results = cwi.execute();
+ executed = true;
+ } else {
+ results = cwi.more();
+ }
+ } finally {
+ running = false;
+ }
+ return results;
+ }
public boolean isQueued() {
- return this.cwi != null && this.cwi.isQueued();
+ ResultsFuture<AtomicResultsMessage> future = futureResult;
+ return !running && future != null && !future.isDone();
}
public boolean isDone() {
- return this.arm != null && this.arm.getFinalRow() >= 0;
+ AtomicResultsMessage results = this.arm;
+ return results != null && results.getFinalRow() >= 0;
}
- void open() throws TeiidComponentException, TeiidProcessingException {
- try {
- if (this.cwi == null) {
- this.cwi = this.dataMgr.executeRequest(aqr, this.workItem, this.connectorName);
- Assertion.isNull(workItem.getConnectorRequest(aqr.getAtomicRequestID()));
- workItem.addConnectorRequest(aqr.getAtomicRequestID(), this);
- }
- running = true;
- receiveResults(this.cwi.execute());
- } catch (TranslatorException e) {
- exceptionOccurred(e, true);
- } finally {
- running = false;
- }
- }
-
public boolean isRunning() {
return running;
}
@@ -131,7 +204,27 @@
if (!closed) {
if (cwi != null) {
workItem.closeAtomicRequest(this.aqr.getAtomicRequestID());
- this.cwi.close();
+ if (!aqr.isTransactional()) {
+ if (futureResult != null && !futureResult.isDone()) {
+ futureResult.addCompletionListener(new ResultsFuture.CompletionListener<AtomicResultsMessage>() {
+ @Override
+ public void onCompletion(
+ ResultsFuture<AtomicResultsMessage> future) {
+ cwi.close(); // there is a small chance that this will be done in the processing thread
+ }
+ });
+ } else {
+ dtm.addWork(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ cwi.close();
+ return null;
+ }
+ }, 0);
+ }
+ } else {
+ this.cwi.close();
+ }
}
closed = true;
}
@@ -165,7 +258,7 @@
AtomicResultsMessage emptyResults = new AtomicResultsMessage(new List[0], null);
emptyResults.setWarnings(Arrays.asList((Exception)exception));
emptyResults.setFinalRow(this.rowsProcessed);
- receiveResults(arm);
+ receiveResults(emptyResults);
} else {
if (exception.getCause() instanceof TeiidComponentException) {
throw (TeiidComponentException)exception.getCause();
@@ -194,14 +287,11 @@
}
public String getConnectorName() {
- return this.connectorName;
+ return this.aqr.getConnectorName();
}
public boolean isTransactional() {
- if (this.arm == null) {
- return false;
- }
- return this.arm.isTransactional();
+ return this.aqr.isTransactional();
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -47,6 +47,7 @@
import org.teiid.core.types.DataTypeManager;
import org.teiid.dqp.DQPPlugin;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
+import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
import org.teiid.dqp.message.AtomicRequestID;
import org.teiid.dqp.message.RequestID;
import org.teiid.dqp.service.TransactionContext;
@@ -64,10 +65,8 @@
import org.teiid.query.sql.lang.SPParameter;
import org.teiid.query.sql.lang.StoredProcedure;
import org.teiid.query.sql.symbol.SingleElementSymbol;
-import org.teiid.translator.DataNotAvailableException;
-
-public class RequestWorkItem extends AbstractWorkItem {
+public class RequestWorkItem extends AbstractWorkItem implements PrioritizedRunnable {
private enum ProcessingState {NEW, PROCESSING, CLOSE}
private ProcessingState state = ProcessingState.NEW;
@@ -86,6 +85,7 @@
private CacheID cid;
private final TransactionService transactionService;
private final DQPWorkContext dqpWorkContext;
+ boolean active;
/*
* obtained during new
@@ -105,16 +105,17 @@
private Map<AtomicRequestID, DataTierTupleSource> connectorInfo = new ConcurrentHashMap<AtomicRequestID, DataTierTupleSource>(4);
// This exception contains details of all the atomic requests that failed when query is run in partial results mode.
private List<TeiidException> warnings = new LinkedList<TeiidException>();
- private boolean doneProducingBatches;
+ private volatile boolean doneProducingBatches;
private volatile boolean isClosed;
private volatile boolean isCanceled;
private volatile boolean closeRequested;
+
//results request
private ResultsReceiver<ResultsMessage> resultsReceiver;
private int begin;
private int end;
private TupleBatch savedBatch;
- private Map<Integer, LobWorkItem> lobStreams = new ConcurrentHashMap<Integer, LobWorkItem>(4);
+ private Map<Integer, LobWorkItem> lobStreams = new ConcurrentHashMap<Integer, LobWorkItem>(4);
/**The time when command begins processing on the server.*/
private long processingTimestamp = System.currentTimeMillis();
@@ -155,7 +156,11 @@
@Override
protected void resumeProcessing() {
- dqpCore.addWork(this);
+ if (doneProducingBatches && !closeRequested && !isCanceled) {
+ this.run(); // just run in the IO thread
+ } else {
+ dqpCore.addWork(this);
+ }
}
@Override
@@ -184,9 +189,6 @@
} catch (QueryProcessor.ExpiredTimeSliceException e) {
LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread", requestID, "- time slice expired"); //$NON-NLS-1$ //$NON-NLS-2$
this.moreWork();
- } catch (DataNotAvailableException e) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread", requestID, "- data not available"); //$NON-NLS-1$ //$NON-NLS-2$
- this.dqpCore.scheduleWork(this, e.getRetryDelay());
} catch (Throwable e) {
LogManager.logDetail(LogConstants.CTX_DQP, e, "Request Thread", requestID, "- error occurred"); //$NON-NLS-1$ //$NON-NLS-2$
@@ -227,7 +229,6 @@
private void resume() throws XATransactionException {
if (this.transactionState == TransactionState.ACTIVE && this.transactionContext.getTransaction() != null) {
- //there's no need to do this for xa transactions, as that is done by the workmanager
this.transactionService.resume(this.transactionContext);
}
}
@@ -339,7 +340,7 @@
this.resultsBuffer = cr.getResults();
this.analysisRecord = cr.getAnalysisRecord();
this.originalCommand = cr.getCommand();
- this.doneProducingBatches = true;
+ this.doneProducingBatches();
return;
}
}
@@ -357,7 +358,9 @@
super.flushBatchDirect(batch, add);
added = true;
}
- doneProducingBatches = batch.getTerminationFlag();
+ if (batch.getTerminationFlag()) {
+ doneProducingBatches();
+ }
if (doneProducingBatches && cid != null) {
boolean sessionScope = processor.getContext().isSessionFunctionEvaluated();
CachedResults cr = new CachedResults();
@@ -381,7 +384,7 @@
this.transactionState = TransactionState.ACTIVE;
}
if (requestMsg.isNoExec()) {
- doneProducingBatches = true;
+ doneProducingBatches();
resultsBuffer.close();
this.cid = null;
}
@@ -608,7 +611,9 @@
}
}
this.closeRequested = true;
- this.requestCancel(); //pending work should be canceled for fastest clean up
+ if (!this.doneProducingBatches) {
+ this.requestCancel(); //pending work should be canceled for fastest clean up
+ }
this.moreWork();
}
@@ -692,5 +697,20 @@
LogManager.logWarning(LogConstants.CTX_DQP, e, "Failed to cancel " + requestID); //$NON-NLS-1$
}
}
+
+ private void doneProducingBatches() {
+ this.doneProducingBatches = true;
+ dqpCore.finishProcessing(this);
+ }
+ @Override
+ public int getPriority() {
+ return (closeRequested || isCanceled) ? 0 : 1000;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return processingTimestamp;
+ }
+
}
\ No newline at end of file
Deleted: trunk/engine/src/main/java/org/teiid/dqp/internal/process/StatsCapturingWorkManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/StatsCapturingWorkManager.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/StatsCapturingWorkManager.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -1,294 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.dqp.internal.process;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.resource.spi.work.ExecutionContext;
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkEvent;
-import javax.resource.spi.work.WorkException;
-import javax.resource.spi.work.WorkListener;
-import javax.resource.spi.work.WorkManager;
-import javax.resource.spi.work.WorkRejectedException;
-
-import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
-import org.teiid.core.util.NamedThreadFactory;
-import org.teiid.logging.LogConstants;
-import org.teiid.logging.LogManager;
-import org.teiid.logging.MessageLevel;
-import org.teiid.query.QueryPlugin;
-
-
-/**
- * StatsCapturingWorkManager acts as a wrapper to the passed in {@link WorkManager} to
- * capture statistics and implement an unbounded queue of work.
- */
-public class StatsCapturingWorkManager {
-
- private static class WorkContext {
- ExecutionContext context;
- long startTimeout;
- long submitted = System.currentTimeMillis();
-
- public WorkContext(ExecutionContext context, long startTimeout) {
- this.context = context;
- this.startTimeout = startTimeout;
- }
-
- long getStartTimeout() {
- if (startTimeout == 0) {
- return 0;
- }
- return Math.max(1, startTimeout + submitted - System.currentTimeMillis());
- }
-
- }
-
- private final class WorkWrapper implements Work {
- private final Work work;
- private final WorkContext workContext;
- private final DQPWorkContext dqpWorkContext;
-
- private WorkWrapper(Work work, WorkContext workContext, DQPWorkContext dqpWorkContext) {
- this.work = work;
- this.workContext = workContext;
- this.dqpWorkContext = dqpWorkContext;
- }
-
- @Override
- public void run() {
- Thread t = Thread.currentThread();
- synchronized (poolLock) {
- threads.add(t);
- }
- String name = t.getName();
- t.setName(name + "_" + poolName + threadCounter.getAndIncrement()); //$NON-NLS-1$
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_RUNTIME, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, "Beginning work with virtual worker", t.getName()); //$NON-NLS-1$
- }
- boolean success = false;
- try {
- dqpWorkContext.runInContext(work);
- success = true;
- } finally {
- synchronized (poolLock) {
- WorkWrapper next = null;
- if (success) {
- completedCount++;
- next = queue.poll();
- }
- threads.remove(t);
- if (next == null) {
- activeCount--;
- if (activeCount == 0 && terminated) {
- poolLock.notifyAll();
- }
- } else {
- try {
- if (next.workContext == null) {
- delegate.scheduleWork(next);
- } else {
- delegate.scheduleWork(next, next.workContext.getStartTimeout(), next.workContext.context, next.work instanceof WorkListener?(WorkListener)next.work:null);
- }
- } catch (WorkException e) {
- handleException(next.work, e);
- }
- }
- }
- t.setName(name);
- }
- }
-
- @Override
- public void release() {
- this.work.release();
- }
- }
-
- private static void handleException(Work work, WorkException e) {
- if (work instanceof WorkListener) {
- ((WorkListener)work).workRejected(new WorkEvent(work, WorkEvent.WORK_REJECTED, work, new WorkRejectedException(e)));
- } else if (LogManager.isMessageToBeRecorded(LogConstants.CTX_RUNTIME, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, e, "Exception adding work to the WorkManager"); //$NON-NLS-1$
- }
- }
-
- private static ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("Scheduler")); //$NON-NLS-1$
-
- private volatile int activeCount;
- private volatile int highestActiveCount;
- private volatile int highestQueueSize;
- private volatile boolean terminated;
- private volatile int submittedCount;
- private volatile int completedCount;
-
- private Object poolLock = new Object();
- private AtomicInteger threadCounter = new AtomicInteger();
-
- private String poolName;
- private int maximumPoolSize;
- private WorkManager delegate;
-
- private Queue<WorkWrapper> queue = new LinkedList<WorkWrapper>();
- private Set<Thread> threads = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<Thread, Boolean>()));
- private Map<Integer, ScheduledFuture<?>> futures = new HashMap<Integer, ScheduledFuture<?>>();
- private int idCounter;
-
- public StatsCapturingWorkManager(String name, int maximumPoolSize, WorkManager delegate) {
- this.maximumPoolSize = maximumPoolSize;
- this.poolName = name;
- this.delegate = delegate;
- }
-
- public void scheduleWork(final Work arg0) throws WorkException {
- scheduleWork(arg0, (WorkContext)null, DQPWorkContext.getWorkContext());
- }
-
- private void scheduleWork(final Work work, WorkContext workContext, DQPWorkContext dqpWorkContext)
- throws WorkRejectedException, WorkException {
- boolean atMaxThreads = false;
- boolean newMaxQueueSize = false;
- synchronized (poolLock) {
- checkForTermination();
- submittedCount++;
- atMaxThreads = activeCount == maximumPoolSize;
- if (atMaxThreads) {
- queue.add(new WorkWrapper(work, workContext, dqpWorkContext));
- int queueSize = queue.size();
- if (queueSize > highestQueueSize) {
- newMaxQueueSize = true;
- highestQueueSize = queueSize;
- }
- } else {
- activeCount++;
- highestActiveCount = Math.max(activeCount, highestActiveCount);
- }
- }
- if (atMaxThreads) {
- if (newMaxQueueSize && maximumPoolSize > 1) {
- LogManager.logWarning(LogConstants.CTX_RUNTIME, QueryPlugin.Util.getString("WorkerPool.Max_thread", maximumPoolSize, poolName, highestQueueSize)); //$NON-NLS-1$
- }
- return;
- }
- if (workContext == null) {
- delegate.scheduleWork(new WorkWrapper(work, null, dqpWorkContext));
- } else {
- delegate.scheduleWork(new WorkWrapper(work, null, dqpWorkContext), workContext.getStartTimeout(), workContext.context, work instanceof WorkListener?(WorkListener)work:null);
- }
- }
-
- public void scheduleWork(final Work arg0, final ExecutionContext arg2, long delay) throws WorkException {
- if (delay < 1) {
- scheduleWork(arg0, new WorkContext(arg2, WorkManager.INDEFINITE), DQPWorkContext.getWorkContext());
- } else {
- synchronized (futures) {
- final int id = idCounter++;
- final DQPWorkContext dqpWorkContext = DQPWorkContext.getWorkContext();
- ScheduledFuture<?> sf = stpe.schedule(new Runnable() {
-
- @Override
- public void run() {
- try {
- futures.remove(id);
- scheduleWork(arg0, new WorkContext(arg2, WorkManager.INDEFINITE), dqpWorkContext);
- } catch (WorkException e) {
- handleException(arg0, e);
- }
- }
- }, delay, TimeUnit.MILLISECONDS);
- this.futures.put(id, sf);
- }
- }
- }
-
- private void checkForTermination() throws WorkRejectedException {
- if (terminated) {
- throw new WorkRejectedException("Queue has been terminated"); //$NON-NLS-1$
- }
- }
-
- public WorkerPoolStatisticsMetadata getStats() {
- WorkerPoolStatisticsMetadata stats = new WorkerPoolStatisticsMetadata();
- stats.setName(poolName);
- stats.setQueued(queue.size());
- stats.setHighestQueued(highestQueueSize);
- stats.setActiveThreads(this.activeCount);
- stats.setMaxThreads(this.maximumPoolSize);
- stats.setTotalSubmitted(this.submittedCount);
- stats.setHighestActiveThreads(this.highestActiveCount);
- stats.setTotalCompleted(this.completedCount);
- return stats;
- }
-
- public void shutdown() {
- this.terminated = true;
- }
-
- public void shutdownNow() {
- this.shutdown();
- synchronized (poolLock) {
- for (Thread t : threads) {
- t.interrupt();
- }
- queue.clear();
- }
- synchronized (futures) {
- for (ScheduledFuture<?> future : futures.values()) {
- future.cancel(true);
- }
- futures.clear();
- }
- }
-
- public boolean isTerminated() {
- return terminated;
- }
-
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException {
- long timeoutMillis = unit.toMillis(timeout);
- long finalMillis = System.currentTimeMillis() + timeoutMillis;
- synchronized (poolLock) {
- while (this.activeCount > 0 || !terminated) {
- if (timeoutMillis < 1) {
- return false;
- }
- poolLock.wait(timeoutMillis);
- timeoutMillis = finalMillis - System.currentTimeMillis();
- }
- }
- return true;
- }
-
-}
Added: trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java (rev 0)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -0,0 +1,412 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.process;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
+import org.teiid.core.util.NamedThreadFactory;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.logging.MessageLevel;
+import org.teiid.query.QueryPlugin;
+
+/**
+ * An Executor that:
+ * <ol>
+ * <li>minimizes thread creation</li>
+ * <li>allows for proper timeout of idle threads</li>
+ * <li>allows for queuing</li>
+ * </ol>
+ * <br/>
+ * A non-fifo (lifo) {@link SynchronousQueue} based {@link ThreadPoolExecutor} satisfies 1 and 2, but not 3.
+ * A bounded or unbound queue based {@link ThreadPoolExecutor} allows for 3, but will tend to create
+ * up to the maximum number of threads and makes no guarantee on thread scheduling.
+ * <br/>
+ * So the approach here is to use a virtual thread pool off of a {@link SynchronousQueue}
+ * backed {@link ThreadPoolExecutor}.
+ * <br/>
+ * There is also only a single master scheduling thread with actual executions deferred.
+ *
+ * TODO: there is a race condition between retiring threads and adding work, which may create extra threads.
+ * That is a flaw with attempting to reuse, rather than create threads.
+ * TODO: bounded queuing - we never bothered bounding in the past with our worker pools, but reasonable
+ * defaults would be a good idea.
+ */
+public class ThreadReuseExecutor implements Executor {
+
+ public interface PrioritizedRunnable extends Runnable {
+
+ int getPriority();
+
+ long getCreationTime();
+
+ }
+
+ static class RunnableWrapper implements PrioritizedRunnable {
+ Runnable r;
+ DQPWorkContext workContext = DQPWorkContext.getWorkContext();
+ long creationTime;
+ int priority;
+
+ public RunnableWrapper(Runnable r) {
+ if (r instanceof PrioritizedRunnable) {
+ PrioritizedRunnable pr = (PrioritizedRunnable)r;
+ creationTime = pr.getCreationTime();
+ priority = pr.getPriority();
+ } else {
+ creationTime = System.currentTimeMillis();
+ priority = Integer.MAX_VALUE;
+ }
+ this.r = r;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+
+ @Override
+ public void run() {
+ workContext.runInContext(r);
+ }
+
+ }
+
+ private final ThreadPoolExecutor tpe;
+
+ private ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("Scheduler")); //$NON-NLS-1$
+
+ class ScheduledFutureTask extends FutureTask<Void> implements ScheduledFuture<Void>, PrioritizedRunnable {
+ private ScheduledFuture<?> scheduledFuture;
+ private boolean periodic;
+ private volatile boolean running;
+ private PrioritizedRunnable runnable;
+
+ public ScheduledFutureTask(PrioritizedRunnable runnable, boolean periodic) {
+ super(runnable, null);
+ this.periodic = periodic;
+ this.runnable = runnable;
+ }
+
+ public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
+ scheduledTasks.add(this);
+ this.scheduledFuture = scheduledFuture;
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return this.scheduledFuture.getDelay(unit);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ return this.scheduledFuture.compareTo(o);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ this.scheduledFuture.cancel(false);
+ scheduledTasks.remove(this);
+ return super.cancel(mayInterruptIfRunning);
+ }
+
+ public Runnable getParent() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ if (running || terminated) {
+ return;
+ }
+ running = periodic;
+ executeDirect(ScheduledFutureTask.this);
+ }
+ };
+ }
+
+ @Override
+ public void run() {
+ if (periodic) {
+ if (!this.runAndReset()) {
+ this.scheduledFuture.cancel(false);
+ scheduledTasks.remove(this);
+ }
+ running = false;
+ } else {
+ scheduledTasks.remove(this);
+ super.run();
+ }
+ }
+
+ @Override
+ public long getCreationTime() {
+ return runnable.getCreationTime();
+ }
+
+ @Override
+ public int getPriority() {
+ return runnable.getPriority();
+ }
+ }
+
+ private volatile int activeCount;
+ private volatile int highestActiveCount;
+ private volatile int highestQueueSize;
+ private volatile boolean terminated;
+ private volatile int submittedCount;
+ private volatile int completedCount;
+ private Object poolLock = new Object();
+ private AtomicInteger threadCounter = new AtomicInteger();
+ private Set<Thread> threads = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<Thread, Boolean>()));
+ private Set<ScheduledFutureTask> scheduledTasks = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<ScheduledFutureTask, Boolean>()));
+
+ private String poolName;
+ private int maximumPoolSize;
+ private Queue<PrioritizedRunnable> queue = new PriorityQueue<PrioritizedRunnable>(11, new Comparator<PrioritizedRunnable>() {
+ @Override
+ public int compare(PrioritizedRunnable pr1, PrioritizedRunnable pr2) {
+ int result = pr1.getPriority() - pr2.getPriority();
+ if (result == 0) {
+ return Long.signum(pr1.getCreationTime() - pr2.getCreationTime());
+ }
+ return result;
+ }
+ });
+
+ public ThreadReuseExecutor(String name, int maximumPoolSize) {
+ this.maximumPoolSize = maximumPoolSize;
+ this.poolName = name;
+
+ tpe = new ThreadPoolExecutor(0,
+ maximumPoolSize, 2, TimeUnit.MINUTES,
+ new SynchronousQueue<Runnable>(), new NamedThreadFactory("Worker")) { //$NON-NLS-1$
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ if (t != null) {
+ LogManager.logError(LogConstants.CTX_RUNTIME, t, QueryPlugin.Util.getString("WorkerPool.uncaughtException")); //$NON-NLS-1$
+ }
+ }
+
+ };
+ }
+
+ public void execute(final Runnable command) {
+ executeDirect(new RunnableWrapper(command));
+ }
+
+ private void executeDirect(final PrioritizedRunnable command) {
+ boolean atMaxThreads = false;
+ boolean newMaxQueueSize = false;
+ synchronized (poolLock) {
+ checkForTermination();
+ submittedCount++;
+ atMaxThreads = activeCount == maximumPoolSize;
+ if (atMaxThreads) {
+ queue.add(command);
+ int queueSize = queue.size();
+ if (queueSize > highestQueueSize) {
+ newMaxQueueSize = true;
+ highestQueueSize = queueSize;
+ }
+ } else {
+ activeCount++;
+ highestActiveCount = Math.max(activeCount, highestActiveCount);
+ }
+ }
+ if (atMaxThreads) {
+ if (newMaxQueueSize && maximumPoolSize > 1) {
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, QueryPlugin.Util.getString("WorkerPool.Max_thread", maximumPoolSize, poolName, highestQueueSize)); //$NON-NLS-1$
+ }
+ return;
+ }
+ tpe.execute(new Runnable() {
+ @Override
+ public void run() {
+ Thread t = Thread.currentThread();
+ threads.add(t);
+ String name = t.getName();
+ t.setName(name + "_" + poolName + threadCounter.getAndIncrement()); //$NON-NLS-1$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_RUNTIME, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, "Beginning work with virtual worker", t.getName()); //$NON-NLS-1$
+ }
+ Runnable r = command;
+ while (r != null) {
+ boolean success = false;
+ try {
+ r.run();
+ success = true;
+ } finally {
+ synchronized (poolLock) {
+ if (success) {
+ completedCount++;
+ r = queue.poll();
+ }
+ if (!success || r == null) {
+ threads.remove(t);
+ activeCount--;
+ if (activeCount == 0 && terminated) {
+ poolLock.notifyAll();
+ }
+ }
+ }
+ t.setName(name);
+ }
+ }
+ };
+ });
+ }
+
+ private void checkForTermination() {
+ if (terminated) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ public int getActiveCount() {
+ return activeCount;
+ }
+
+ public int getSubmittedCount() {
+ return submittedCount;
+ }
+
+ public int getCompletedCount() {
+ return completedCount;
+ }
+
+ public int getPoolSize() {
+ return activeCount;
+ }
+
+ public boolean isTerminated() {
+ return terminated;
+ }
+
+ public void shutdown() {
+ this.terminated = true;
+ synchronized (scheduledTasks) {
+ for (ScheduledFuture<?> future : new ArrayList<ScheduledFuture<?>>(scheduledTasks)) {
+ future.cancel(false);
+ }
+ scheduledTasks.clear();
+ }
+ }
+
+ public int getLargestPoolSize() {
+ return this.highestActiveCount;
+ }
+
+ public WorkerPoolStatisticsMetadata getStats() {
+ WorkerPoolStatisticsMetadata stats = new WorkerPoolStatisticsMetadata();
+ stats.setName(poolName);
+ stats.setQueued(queue.size());
+ stats.setHighestQueued(highestQueueSize);
+ stats.setActiveThreads(getActiveCount());
+ stats.setMaxThreads(this.maximumPoolSize);
+ stats.setTotalSubmitted(getSubmittedCount());
+ stats.setHighestActiveThreads(getLargestPoolSize());
+ stats.setTotalCompleted(getCompletedCount());
+ return stats;
+ }
+
+ public boolean hasWork() {
+ synchronized (poolLock) {
+ return this.getSubmittedCount() - this.getCompletedCount() > 0 && !this.isTerminated();
+ }
+ }
+
+ public List<Runnable> shutdownNow() {
+ this.shutdown();
+ synchronized (poolLock) {
+ synchronized (threads) {
+ for (Thread t : threads) {
+ t.interrupt();
+ }
+ }
+ List<Runnable> result = new ArrayList<Runnable>(queue);
+ queue.clear();
+ return result;
+ }
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long timeoutMillis = unit.toMillis(timeout);
+ long finalMillis = System.currentTimeMillis() + timeoutMillis;
+ synchronized (poolLock) {
+ while (this.activeCount > 0 || !terminated) {
+ if (timeoutMillis < 1) {
+ return false;
+ }
+ poolLock.wait(timeoutMillis);
+ timeoutMillis = finalMillis - System.currentTimeMillis();
+ }
+ }
+ return true;
+ }
+
+ public ScheduledFuture<?> schedule(final Runnable command, long delay,
+ TimeUnit unit) {
+ checkForTermination();
+ ScheduledFutureTask sft = new ScheduledFutureTask(new RunnableWrapper(command), false);
+ synchronized (scheduledTasks) {
+ ScheduledFuture<?> future = stpe.schedule(sft.getParent(), delay, unit);
+ sft.setScheduledFuture(future);
+ return sft;
+ }
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command,
+ long initialDelay, long period, TimeUnit unit) {
+ checkForTermination();
+ ScheduledFutureTask sft = new ScheduledFutureTask(new RunnableWrapper(command), true);
+ synchronized (scheduledTasks) {
+ ScheduledFuture<?> future = stpe.scheduleAtFixedRate(sft.getParent(), initialDelay, period, unit);
+ sft.setScheduledFuture(future);
+ return sft;
+ }
+ }
+
+}
Property changes on: trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -246,7 +246,7 @@
public Transaction call() throws Exception {
return transactionManager.getTransaction();
}
- });
+ }, 0);
workManager.doWork(work, WorkManager.INDEFINITE, tc, null);
tc.setTransaction(work.getResult().get());
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -31,8 +31,8 @@
import org.teiid.common.buffer.BufferManager.BufferReserveMode;
import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
-import org.teiid.core.TeiidProcessingException;
import org.teiid.core.TeiidException;
+import org.teiid.core.TeiidProcessingException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.Assertion;
import org.teiid.logging.LogConstants;
@@ -41,9 +41,10 @@
import org.teiid.query.execution.QueryExecPlugin;
import org.teiid.query.processor.BatchCollector.BatchProducer;
import org.teiid.query.util.CommandContext;
-import org.teiid.translator.DataNotAvailableException;
-
+/**
+ * Driver for plan processing.
+ */
public class QueryProcessor implements BatchProducer {
public static class ExpiredTimeSliceException extends TeiidRuntimeException {
@@ -110,11 +111,6 @@
throw e;
}
continue;
- } catch (DataNotAvailableException e) {
- if (!nonBlocking) {
- throw e;
- }
- wait = e.getRetryDelay();
} catch (BlockedException e) {
if (!nonBlocking) {
throw e;
@@ -147,6 +143,9 @@
long currentTime = System.currentTimeMillis();
Assertion.assertTrue(!processorClosed);
+
+ //TODO: see if there is pending work before preempting
+
while(currentTime < context.getTimeSliceEnd()) {
if (requestCanceled) {
throw new TeiidProcessingException(QueryExecPlugin.Util.getString("QueryProcessor.request_cancelled", getProcessID())); //$NON-NLS-1$
Modified: trunk/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2010-07-12 20:59:02 UTC (rev 2333)
@@ -941,4 +941,7 @@
NewCalculateCostUtil.badCost=Unexpected format encountered for max or min value
WorkerPool.Max_thread=Reached maximum thread count "{0}" for worker pool "{1}" with a queue size of "{2}".
+WorkerPool.New_thread=Created worker thread "{0}".
+WorkerPool.uncaughtException=Uncaught exception processing work
+
XMLSystemFunctions.invalid_namespaces=Invalid namespaces supplied for XPath expression - ''{0}''
\ No newline at end of file
Deleted: trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -1,179 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.queue;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkManager;
-import javax.resource.spi.work.WorkRejectedException;
-
-import org.junit.Ignore;
-import org.junit.Test;
-import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
-import org.teiid.dqp.internal.process.StatsCapturingWorkManager;
-
-/**
- */
-public class TestStatsCapturingWorkManager {
-
- private WorkManager manager = new FakeWorkManager();
-
- @Test public void testQueuing() throws Exception {
- final long SINGLE_WAIT = 50;
- final int WORK_ITEMS = 10;
- final int MAX_THREADS = 5;
-
- final StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", MAX_THREADS, manager); //$NON-NLS-1$
-
- for(int i=0; i<WORK_ITEMS; i++) {
- pool.scheduleWork(new FakeWorkItem(SINGLE_WAIT));
- }
-
- pool.shutdown();
- pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
- assertTrue(pool.isTerminated());
- WorkerPoolStatisticsMetadata stats = pool.getStats();
- assertEquals(10, stats.getTotalCompleted());
- assertEquals("Expected threads to be maxed out", MAX_THREADS, stats.getHighestActiveThreads()); //$NON-NLS-1$
- }
-
- @Ignore
- @Test public void testThreadReuse() throws Exception {
- final long SINGLE_WAIT = 50;
- final long NUM_THREADS = 5;
-
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5, manager); //$NON-NLS-1$
-
- for(int i=0; i<NUM_THREADS; i++) {
- pool.scheduleWork(new FakeWorkItem(SINGLE_WAIT));
-
- try {
- Thread.sleep(SINGLE_WAIT*2);
- } catch(InterruptedException e) {
- }
- }
-
- pool.shutdown();
-
- WorkerPoolStatisticsMetadata stats = pool.getStats();
- assertEquals("Expected 1 thread for serial execution", 1, stats.getHighestActiveThreads()); //$NON-NLS-1$
-
- pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
- }
-
- @Test(expected=WorkRejectedException.class) public void testShutdown() throws Exception {
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5, manager); //$NON-NLS-1$
- pool.shutdown();
- pool.scheduleWork(new FakeWorkItem(1));
- }
-
- /*@Test public void testScheduleCancel() throws Exception {
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5); //$NON-NLS-1$
- ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- }
- }, 0, 5, TimeUnit.MILLISECONDS);
- future.cancel(true);
- assertFalse(future.cancel(true));
- }*/
-
- @Test public void testSchedule() throws Exception {
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5, manager); //$NON-NLS-1$
- final ArrayList<String> result = new ArrayList<String>();
- pool.scheduleWork(new Work() {
-
- @Override
- public void run() {
- result.add("hello"); //$NON-NLS-1$
- }
-
- @Override
- public void release() {
-
- }
- }, null, 5);
- Thread.sleep(100);
- pool.shutdown();
- pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
- assertEquals(1, result.size());
- }
-
- /*(a)Test(expected=ExecutionException.class) public void testScheduleException() throws Exception {
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5); //$NON-NLS-1$
- ScheduledFuture<?> future = pool.schedule(new Runnable() {
- @Override
- public void run() {
- throw new RuntimeException();
- }
- }, 0, TimeUnit.MILLISECONDS);
- future.get();
- }*/
-
- /**
- * Here each execution exceeds the period
- */
- /*@Test public void testScheduleRepeated() throws Exception {
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5); //$NON-NLS-1$
- final ArrayList<String> result = new ArrayList<String>();
- ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- result.add("hello"); //$NON-NLS-1$
- try {
- Thread.sleep(75);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }, 0, 10, TimeUnit.MILLISECONDS);
- Thread.sleep(100);
- future.cancel(true);
- assertEquals(2, result.size());
- }*/
-
- @Test public void testFailingWork() throws Exception {
- StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5, manager); //$NON-NLS-1$
- final AtomicInteger count = new AtomicInteger();
- pool.scheduleWork(new Work() {
- @Override
- public void run() {
- count.getAndIncrement();
- throw new RuntimeException();
- }
-
- @Override
- public void release() {
-
- }
- });
- Thread.sleep(100);
- assertEquals(1, count.get());
- }
-
-}
Copied: trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java (from rev 2323, trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java)
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java (rev 0)
+++ trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -0,0 +1,226 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.queue;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.resource.spi.work.Work;
+
+import org.junit.Test;
+import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
+import org.teiid.dqp.internal.process.ThreadReuseExecutor;
+import org.teiid.dqp.internal.process.DQPCore.FutureWork;
+
+/**
+ */
+public class TestThreadReuseExecutor {
+
+ @Test public void testQueuing() throws Exception {
+ final long SINGLE_WAIT = 50;
+ final int WORK_ITEMS = 10;
+ final int MAX_THREADS = 5;
+
+ final ThreadReuseExecutor pool = new ThreadReuseExecutor("test", MAX_THREADS); //$NON-NLS-1$
+
+ for(int i=0; i<WORK_ITEMS; i++) {
+ pool.execute(new FakeWorkItem(SINGLE_WAIT));
+ }
+
+ pool.shutdown();
+ pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ assertTrue(pool.isTerminated());
+ WorkerPoolStatisticsMetadata stats = pool.getStats();
+ assertEquals(10, stats.getTotalCompleted());
+ assertEquals("Expected threads to be maxed out", MAX_THREADS, stats.getHighestActiveThreads()); //$NON-NLS-1$
+ }
+
+ @Test public void testThreadReuse() throws Exception {
+ final long SINGLE_WAIT = 50;
+ final long NUM_THREADS = 5;
+
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+
+ for(int i=0; i<NUM_THREADS; i++) {
+ pool.execute(new FakeWorkItem(SINGLE_WAIT));
+
+ try {
+ Thread.sleep(SINGLE_WAIT*2);
+ } catch(InterruptedException e) {
+ }
+ }
+
+ pool.shutdown();
+
+ WorkerPoolStatisticsMetadata stats = pool.getStats();
+ assertEquals("Expected 1 thread for serial execution", 1, stats.getHighestActiveThreads()); //$NON-NLS-1$
+
+ pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ }
+
+ @Test(expected=RejectedExecutionException.class) public void testShutdown() throws Exception {
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+ pool.shutdown();
+ pool.execute(new FakeWorkItem(1));
+ }
+
+ @Test public void testScheduleCancel() throws Exception {
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+ ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ }
+ }, 0, 5, TimeUnit.MILLISECONDS);
+ future.cancel(true);
+ assertFalse(future.cancel(true));
+ }
+
+ @Test public void testSchedule() throws Exception {
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+ final ArrayList<String> result = new ArrayList<String>();
+ pool.schedule(new Work() {
+
+ @Override
+ public void run() {
+ result.add("hello"); //$NON-NLS-1$
+ }
+
+ @Override
+ public void release() {
+
+ }
+ }, 5, TimeUnit.MILLISECONDS);
+ Thread.sleep(100);
+ pool.shutdown();
+ pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ assertEquals(1, result.size());
+ }
+
+ @Test(expected=ExecutionException.class) public void testScheduleException() throws Exception {
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+ ScheduledFuture<?> future = pool.schedule(new Runnable() {
+ @Override
+ public void run() {
+ throw new RuntimeException();
+ }
+ }, 0, TimeUnit.MILLISECONDS);
+ future.get();
+ }
+
+ /**
+ * Here each execution exceeds the period
+ */
+ @Test public void testScheduleRepeated() throws Exception {
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+ final ArrayList<String> result = new ArrayList<String>();
+ ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ result.add("hello"); //$NON-NLS-1$
+ try {
+ Thread.sleep(75);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 0, 30, TimeUnit.MILLISECONDS);
+ Thread.sleep(140);
+ future.cancel(true);
+ assertEquals(2, result.size());
+ }
+
+ @Test public void testFailingWork() throws Exception {
+ ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+ final AtomicInteger count = new AtomicInteger();
+ pool.execute(new Work() {
+ @Override
+ public void run() {
+ count.getAndIncrement();
+ throw new RuntimeException();
+ }
+
+ @Override
+ public void release() {
+
+ }
+ });
+ Thread.sleep(100);
+ assertEquals(1, count.get());
+ }
+
+ @Test public void testPriorities() throws Exception {
+ final ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 1); //$NON-NLS-1$
+ FutureWork<Boolean> work1 = new FutureWork<Boolean>(new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ synchronized (pool) {
+ while (pool.getSubmittedCount() < 4) {
+ pool.wait();
+ }
+ }
+ return true;
+ }
+ }, 0);
+ final ConcurrentLinkedQueue<Integer> order = new ConcurrentLinkedQueue<Integer>();
+ FutureWork<Boolean> work2 = new FutureWork<Boolean>(new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ order.add(2);
+ return true;
+ }
+ }, 2);
+ FutureWork<Boolean> work3 = new FutureWork<Boolean>(new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ order.add(3);
+ return false;
+ }
+ }, 1);
+ FutureWork<Boolean> work4 = new FutureWork<Boolean>(new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ order.add(4);
+ return false;
+ }
+ }, 2);
+ pool.execute(work1);
+ pool.execute(work2);
+ pool.execute(work3);
+ pool.execute(work4);
+ synchronized (pool) {
+ pool.notifyAll();
+ }
+ work1.getResult().get();
+ work2.getResult().get();
+ work3.getResult().get();
+ work4.getResult().get();
+ assertEquals(Integer.valueOf(3), order.remove());
+ assertEquals(Integer.valueOf(2), order.remove());
+ assertEquals(Integer.valueOf(4), order.remove());
+ }
+
+}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -28,9 +28,6 @@
import junit.framework.TestCase;
-import org.mockito.Mockito;
-import org.teiid.common.buffer.BlockedException;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.dqp.message.AtomicRequestID;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.RequestID;
@@ -54,7 +51,6 @@
return c.getConnection();
}
};
- cm.setMaxConnections(1);
cm.start();
return cm;
}
@@ -74,7 +70,7 @@
}
void helpAssureOneState() throws Exception {
- csm.executeRequest(request, Mockito.mock(AbstractWorkItem.class));
+ csm.registerRequest(request);
ConnectorWork state = csm.getState(request.getAtomicRequestID());
assertEquals(state, csm.getState(request.getAtomicRequestID()));
}
@@ -106,34 +102,5 @@
assertEquals("Expected size of 1", 1, csm.size()); //$NON-NLS-1$
}
-
- public void testQueuing() throws Exception {
- ConnectorWork workItem = csm.executeRequest(request, Mockito.mock(AbstractWorkItem.class));
- workItem.execute();
-
- AbstractWorkItem awi1 = Mockito.mock(AbstractWorkItem.class);
- ConnectorWork workItem1 = csm.executeRequest(TestConnectorWorkItem.createNewAtomicRequestMessage(2, 1), awi1);
-
- AbstractWorkItem awi2 = Mockito.mock(AbstractWorkItem.class);
- ConnectorWork workItem2 = csm.executeRequest(TestConnectorWorkItem.createNewAtomicRequestMessage(3, 1), awi2);
-
- try {
- workItem1.execute();
- fail("expected exception"); //$NON-NLS-1$
- } catch (BlockedException e) {
-
- }
- workItem.close();
-
- try {
- workItem2.execute(); //ensure that another item cannot jump in the queue
- fail("expected exception"); //$NON-NLS-1$
- } catch (BlockedException e) {
-
- }
-
- Mockito.verify(awi1).moreWork();
- workItem1.execute();
- }
-
+
}
\ No newline at end of file
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -22,8 +22,7 @@
package org.teiid.dqp.internal.datamgr.impl;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.fail;
+import static junit.framework.Assert.*;
import java.util.Arrays;
import java.util.List;
@@ -35,7 +34,6 @@
import org.mockito.Mockito;
import org.teiid.client.RequestMessage;
import org.teiid.dqp.internal.datamgr.language.LanguageBridgeFactory;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.dqp.internal.process.DQPWorkContext;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
@@ -49,8 +47,8 @@
import org.teiid.query.sql.lang.StoredProcedure;
import org.teiid.query.sql.symbol.Constant;
import org.teiid.query.unittest.FakeMetadataFactory;
-import org.teiid.translator.TranslatorException;
import org.teiid.translator.ProcedureExecution;
+import org.teiid.translator.TranslatorException;
public class TestConnectorWorkItem {
@@ -117,8 +115,7 @@
Command command = helpGetCommand("update bqt1.smalla set stringkey = 1 where stringkey = 2", EXAMPLE_BQT); //$NON-NLS-1$
AtomicRequestMessage arm = createNewAtomicRequestMessage(1, 1);
arm.setCommand(command);
- ConnectorWorkItem synchConnectorWorkItem = new ConnectorWorkItem(arm, Mockito.mock(AbstractWorkItem.class),
- TestConnectorManager.getConnectorManager());
+ ConnectorWorkItem synchConnectorWorkItem = new ConnectorWorkItem(arm, TestConnectorManager.getConnectorManager());
return synchConnectorWorkItem.execute();
}
@@ -152,7 +149,7 @@
return Mockito.mock(Xid.class);
}} );
- new ConnectorWorkItem(requestMsg, Mockito.mock(AbstractWorkItem.class), cm);
+ new ConnectorWorkItem(requestMsg, cm);
}
@Ignore
@@ -180,7 +177,7 @@
return Mockito.mock(Xid.class);
}} );
- new ConnectorWorkItem(requestMsg, Mockito.mock(AbstractWorkItem.class), cm);
+ new ConnectorWorkItem(requestMsg, cm);
}
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -56,7 +56,6 @@
Mockito.stub(repo.getConnectorManager(Mockito.anyString())).toReturn(new AutoGenDataService());
core = new DQPCore();
- core.setWorkManager(new FakeWorkManager());
core.setBufferService(new FakeBufferService());
core.setConnectorManagerRepository(repo);
core.setTransactionService(new FakeTransactionService());
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -23,11 +23,13 @@
package org.teiid.dqp.internal.process;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import junit.framework.TestCase;
+import org.mockito.Mockito;
import org.teiid.adminapi.impl.RequestMetadata;
import org.teiid.client.RequestMessage;
import org.teiid.client.SourceWarning;
@@ -50,12 +52,12 @@
}
private void compareReqInfos(Collection<RequestID> reqs1, Collection<RequestMetadata> reqs2) {
- Set reqIDs2 = new HashSet();
+ Set<RequestID> reqIDs2 = new HashSet<RequestID>();
for (RequestMetadata requestInfo : reqs2) {
reqIDs2.add(new RequestID(requestInfo.getSessionId(), requestInfo.getExecutionId()));
}
- assertEquals("Collections of request infos are not the same: ", new HashSet(reqs1), reqIDs2); //$NON-NLS-1$
+ assertEquals("Collections of request infos are not the same: ", new HashSet<RequestID>(reqs1), reqIDs2); //$NON-NLS-1$
}
/**
@@ -63,8 +65,8 @@
*/
public void testGetRequestsSessionToken1() {
DQPCore rm = new DQPCore();
- Set reqs = new HashSet();
- Collection actualReqs = rm.getRequestsForSession(SESSION_STRING);
+ Set<RequestID> reqs = Collections.emptySet();
+ Collection<RequestMetadata> actualReqs = rm.getRequestsForSession(SESSION_STRING);
compareReqInfos(reqs, actualReqs);
}
@@ -74,7 +76,7 @@
public void testGetRequestsSessionToken2() {
DQPCore rm = new DQPCore();
rm.setTransactionService(new FakeTransactionService());
- Set reqs = new HashSet();
+ Set<RequestID> reqs = new HashSet<RequestID>();
RequestID id = addRequest(rm, SESSION_STRING, 1);
reqs.add(id);
@@ -95,13 +97,13 @@
public void testGetRequestsSessionToken3() {
DQPCore rm = new DQPCore();
rm.setTransactionService(new FakeTransactionService());
- Set reqs = new HashSet();
+ Set<RequestID> reqs = new HashSet<RequestID>();
reqs.add(addRequest(rm, SESSION_STRING, 0));
reqs.add(addRequest(rm, SESSION_STRING, 1));
reqs.add(addRequest(rm, SESSION_STRING, 2));
- Collection actualReqs = rm.getRequestsForSession(SESSION_STRING);
+ Collection<RequestMetadata> actualReqs = rm.getRequestsForSession(SESSION_STRING);
compareReqInfos(reqs, actualReqs);
}
@@ -158,7 +160,7 @@
RequestWorkItem workItem = addRequest(rm, r0, requestID, null, null);
AtomicRequestMessage atomicReq = new AtomicRequestMessage(workItem.requestMsg, workItem.getDqpWorkContext(), 1);
- DataTierTupleSource info = new DataTierTupleSource(null, atomicReq, null, "connID", workItem); //$NON-NLS-1$
+ DataTierTupleSource info = Mockito.mock(DataTierTupleSource.class);
workItem.addConnectorRequest(atomicReq.getAtomicRequestID(), info);
DataTierTupleSource arInfo = workItem.getConnectorRequest(atomicReq.getAtomicRequestID());
@@ -173,7 +175,7 @@
RequestWorkItem workItem = addRequest(rm, r0, requestID, null, null);
AtomicRequestMessage atomicReq = new AtomicRequestMessage(workItem.requestMsg, workItem.getDqpWorkContext(), 1);
- DataTierTupleSource info = new DataTierTupleSource(null, atomicReq, null,"connID", workItem); //$NON-NLS-1$
+ DataTierTupleSource info = Mockito.mock(DataTierTupleSource.class);
workItem.addConnectorRequest(atomicReq.getAtomicRequestID(), info);
workItem.closeAtomicRequest(atomicReq.getAtomicRequestID());
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -22,8 +22,9 @@
package org.teiid.dqp.internal.process;
-import junit.framework.TestCase;
+import static org.junit.Assert.*;
+import org.junit.Test;
import org.mockito.Mockito;
import org.teiid.client.RequestMessage;
import org.teiid.core.TeiidException;
@@ -33,6 +34,8 @@
import org.teiid.dqp.message.RequestID;
import org.teiid.dqp.service.AutoGenDataService;
import org.teiid.dqp.service.FakeBufferService;
+import org.teiid.dqp.service.TransactionContext;
+import org.teiid.dqp.service.TransactionContext.Scope;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.optimizer.capabilities.DefaultCapabilitiesFinder;
import org.teiid.query.parser.QueryParser;
@@ -40,10 +43,9 @@
import org.teiid.query.sql.lang.Command;
import org.teiid.query.unittest.FakeMetadataFactory;
import org.teiid.query.util.CommandContext;
+import org.teiid.translator.TranslatorException;
-
-
-public class TestDataTierManager extends TestCase {
+public class TestDataTierManager {
private DQPCore rm;
private DataTierManagerImpl dtm;
@@ -54,10 +56,6 @@
private AutoGenDataService connectorManager;
private RequestWorkItem workItem;
- public TestDataTierManager(String name) {
- super(name);
- }
-
private static Command helpGetCommand(String sql, QueryMetadataInterface metadata) throws Exception {
Command command = QueryParser.getQueryParser().parseCommand(sql);
QueryResolver.resolveCommand(command, metadata);
@@ -75,7 +73,8 @@
connectorManager = new AutoGenDataService();
rm = new DQPCore();
rm.setTransactionService(new FakeTransactionService());
-
+ rm.setBufferService(new FakeBufferService());
+ rm.start(new DQPConfiguration());
FakeBufferService bs = new FakeBufferService();
ConnectorManagerRepository repo = Mockito.mock(ConnectorManagerRepository.class);
@@ -92,7 +91,7 @@
RequestMessage original = new RequestMessage();
original.setExecutionId(1);
-
+ original.setPartialResults(true);
RequestID requestID = workContext.getRequestID(original.getExecutionId());
context = new CommandContext();
@@ -105,11 +104,13 @@
request = new AtomicRequestMessage(original, workContext, nodeId);
request.setCommand(command);
request.setConnectorName("FakeConnectorID"); //$NON-NLS-1$
-
- info = new DataTierTupleSource(command.getProjectedSymbols(), request, dtm, request.getConnectorName(), workItem);
+ TransactionContext tc = new TransactionContext();
+ tc.setTransactionType(Scope.GLOBAL);
+ request.setTransactionContext(tc);
+ info = new DataTierTupleSource(request, workItem, connectorManager.registerRequest(request), dtm);
}
- public void testDataTierTupleSource() throws Exception {
+ @Test public void testDataTierTupleSource() throws Exception {
helpSetup(1);
info.nextTuple();
assertNotNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
@@ -117,7 +118,13 @@
assertNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
}
- public void testCodeTableResponseException() throws Exception {
+ @Test public void testPartialResults() throws Exception {
+ helpSetup(1);
+ info.exceptionOccurred(new TranslatorException(), true);
+ assertNull(info.nextTuple());
+ }
+
+ @Test public void testCodeTableResponseException() throws Exception {
helpSetup(3);
this.connectorManager.throwExceptionOnExecute = true;
@@ -129,13 +136,13 @@
}
}
- public void testNoRowsException() throws Exception {
+ @Test public void testNoRowsException() throws Exception {
helpSetup(3);
this.connectorManager.setRows(0);
assertNull(info.nextTuple());
}
- public void testCodeTableResponseDataNotAvailable() throws Exception {
+ @Test public void testCodeTableResponseDataNotAvailable() throws Exception {
helpSetup(3);
this.connectorManager.dataNotAvailable = 5;
Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -28,11 +28,11 @@
import java.util.Iterator;
import java.util.List;
+import org.teiid.core.TeiidComponentException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
import org.teiid.dqp.internal.datamgr.impl.ConnectorWork;
import org.teiid.dqp.internal.datamgr.impl.ConnectorWorkItem;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
@@ -68,10 +68,10 @@
}
@Override
- public ConnectorWork executeRequest(AtomicRequestMessage message, AbstractWorkItem awi)
- throws TranslatorException {
+ public ConnectorWork registerRequest(AtomicRequestMessage message)
+ throws TeiidComponentException {
if (throwExceptionOnExecute) {
- throw new TranslatorException("Connector Exception"); //$NON-NLS-1$
+ throw new TeiidComponentException("Connector Exception"); //$NON-NLS-1$
}
List projectedSymbols = (message.getCommand()).getProjectedSymbols();
List[] results = createResults(projectedSymbols);
@@ -105,10 +105,6 @@
}
- @Override
- public boolean isQueued() {
- return false;
- }
};
}
Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestUnionAllNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestUnionAllNode.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestUnionAllNode.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -22,46 +22,38 @@
package org.teiid.query.processor.relational;
+import static org.junit.Assert.*;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.junit.Test;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
-import org.teiid.query.processor.relational.RelationalNode;
-import org.teiid.query.processor.relational.UnionAllNode;
+import org.teiid.query.processor.FakeDataManager;
import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.util.CommandContext;
-import junit.framework.TestCase;
-
/**
*/
-public class TestUnionAllNode extends TestCase {
+public class TestUnionAllNode {
- /**
- * Constructor for TestUnionAllNode.
- * @param arg0
- */
- public TestUnionAllNode(String arg0) {
- super(arg0);
- }
-
public void helpTestUnion(RelationalNode[] children, RelationalNode union, List[] expected) throws TeiidComponentException, TeiidProcessingException {
BufferManager mgr = NodeTestUtil.getTestBufferManager(1, 2);
CommandContext context = new CommandContext("pid", "test", null, null, 1); //$NON-NLS-1$ //$NON-NLS-2$
-
+ FakeDataManager fdm = new FakeDataManager();
for(int i=0; i<children.length; i++) {
union.addChild(children[i]);
- children[i].initialize(context, mgr, null);
+ children[i].initialize(context, mgr, fdm);
}
- union.initialize(context, mgr, null);
+ union.initialize(context, mgr, fdm);
union.open();
@@ -90,7 +82,7 @@
assertEquals("Didn't match expected counts", expected.length, currentRow-1); //$NON-NLS-1$
}
- public void testNoRows() throws TeiidComponentException, TeiidProcessingException {
+ @Test public void testNoRows() throws TeiidComponentException, TeiidProcessingException {
ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
@@ -151,7 +143,7 @@
helpTestUnion(nodes, union, expected);
}
- public void testBasicUnion() throws TeiidComponentException, TeiidProcessingException {
+ @Test public void testBasicUnion() throws TeiidComponentException, TeiidProcessingException {
List expected[] = new List[] {
Arrays.asList(new Object[] { new Integer(0) }),
Arrays.asList(new Object[] { new Integer(0) }),
@@ -164,7 +156,7 @@
}
- public void testBasicUnionMultipleSources() throws TeiidComponentException, TeiidProcessingException {
+ @Test public void testBasicUnionMultipleSources() throws TeiidComponentException, TeiidProcessingException {
List expected[] = new List[] {
Arrays.asList(new Object[] { new Integer(0) }),
Arrays.asList(new Object[] { new Integer(0) }),
@@ -181,7 +173,7 @@
helpTestUnionConfigs(5, -1, 2, 50, expected);
}
- public void testMultipleSourcesHalfBlockingNodes() throws TeiidComponentException, TeiidProcessingException {
+ @Test public void testMultipleSourcesHalfBlockingNodes() throws TeiidComponentException, TeiidProcessingException {
List expected[] = new List[] {
Arrays.asList(new Object[] { new Integer(1) }),
Arrays.asList(new Object[] { new Integer(0) }),
@@ -193,7 +185,7 @@
helpTestUnionConfigs(5, 2, 1, 50, expected);
}
- public void testMultipleSourcesAllBlockingNodes() throws TeiidComponentException, TeiidProcessingException {
+ @Test public void testMultipleSourcesAllBlockingNodes() throws TeiidComponentException, TeiidProcessingException {
List expected[] = new List[] {
Arrays.asList(new Object[] { new Integer(0) }),
Arrays.asList(new Object[] { new Integer(1) }),
@@ -205,7 +197,7 @@
helpTestUnionConfigs(5, 1, 1, 50, expected);
}
- public void testMultipleSourceMultiBatchAllBlocking() throws TeiidComponentException, TeiidProcessingException {
+ @Test public void testMultipleSourceMultiBatchAllBlocking() throws TeiidComponentException, TeiidProcessingException {
List expected[] = new List[] {
Arrays.asList(new Object[] { new Integer(0) }),
Arrays.asList(new Object[] { new Integer(1) }),
Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -29,8 +29,6 @@
import org.jboss.resource.metadata.mcf.ManagedConnectionFactoryDeploymentGroup;
import org.jboss.resource.metadata.mcf.ManagedConnectionFactoryDeploymentMetaData;
import org.teiid.deployers.VDBStatusChecker;
-import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
-import org.teiid.dqp.internal.datamgr.impl.ConnectorManagerRepository;
/**
* This deployer listens to the data source load and unload events and manages the connectionManager status based
@@ -38,7 +36,6 @@
*/
public class ConnectionFactoryDeployer extends AbstractSimpleRealDeployer<ManagedConnectionFactoryDeploymentGroup> {
- private ConnectorManagerRepository connectorManagerRepository;
private VDBStatusChecker vdbChecker;
public ConnectionFactoryDeployer() {
@@ -52,13 +49,6 @@
for (ManagedConnectionFactoryDeploymentMetaData data : deployments) {
this.vdbChecker.dataSourceAdded(data.getJndiName());
-
- // set the number of available connections on the cm
- for (ConnectorManager cm:this.connectorManagerRepository.getConnectorManagers()) {
- if (cm.getConnectionName().equals(data.getJndiName())) {
- cm.setMaxConnections(data.getMaxSize());
- }
- }
}
}
@@ -72,10 +62,6 @@
}
}
- public void setConnectorManagerRepository(ConnectorManagerRepository repo) {
- this.connectorManagerRepository = repo;
- }
-
public void setVDBStatusChecker(VDBStatusChecker checker) {
this.vdbChecker = checker;
}
Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -210,7 +210,6 @@
}
public void setWorkManager(WorkManager mgr) {
- this.dqpCore.setWorkManager(mgr);
this.transactionServerImpl.setWorkManager(mgr);
}
Modified: trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2010-07-12 20:59:02 UTC (rev 2333)
@@ -32,7 +32,6 @@
import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.client.DQP;
import org.teiid.client.security.ILogon;
-import org.teiid.common.queue.FakeWorkManager;
import org.teiid.deployers.MetadataStoreGroup;
import org.teiid.deployers.VDBRepository;
import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
@@ -48,7 +47,6 @@
import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
import org.teiid.query.optimizer.capabilities.SourceCapabilities;
import org.teiid.services.SessionServiceImpl;
-import org.teiid.translator.TranslatorException;
import org.teiid.transport.ClientServiceRegistry;
import org.teiid.transport.ClientServiceRegistryImpl;
import org.teiid.transport.LocalServerConnection;
@@ -70,15 +68,13 @@
this.repo.setSystemStore(systemStore);
this.sessionService.setVDBRepository(repo);
- this.dqp.setWorkManager(new FakeWorkManager());
this.dqp.setBufferService(new FakeBufferService());
this.dqp.setTransactionService(new FakeTransactionService());
ConnectorManagerRepository cmr = Mockito.mock(ConnectorManagerRepository.class);
Mockito.stub(cmr.getConnectorManager("source")).toReturn(new ConnectorManager("x", "x") {
@Override
- public SourceCapabilities getCapabilities()
- throws TranslatorException {
+ public SourceCapabilities getCapabilities() {
return new BasicSourceCapabilities();
}
});
15 years, 9 months
teiid SVN: r2332 - trunk/build/kits/jboss-container/teiid-examples/dynamicvdb-ws-weather.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2010-07-08 11:38:23 -0400 (Thu, 08 Jul 2010)
New Revision: 2332
Modified:
trunk/build/kits/jboss-container/teiid-examples/dynamicvdb-ws-weather/weather-ds.xml
Log:
temporary fix for incorrect version, will be fixed fully by removing versions from rars
Modified: trunk/build/kits/jboss-container/teiid-examples/dynamicvdb-ws-weather/weather-ds.xml
===================================================================
--- trunk/build/kits/jboss-container/teiid-examples/dynamicvdb-ws-weather/weather-ds.xml 2010-07-08 01:49:34 UTC (rev 2331)
+++ trunk/build/kits/jboss-container/teiid-examples/dynamicvdb-ws-weather/weather-ds.xml 2010-07-08 15:38:23 UTC (rev 2332)
@@ -4,7 +4,7 @@
<no-tx-connection-factory>
<jndi-name>WeatherDS</jndi-name>
- <rar-name>connector-ws-7.0.0-SNAPSHOT.rar</rar-name>
+ <rar-name>connector-ws-${project.version}.rar</rar-name>
<connection-definition>javax.resource.cci.ConnectionFactory</connection-definition>
<config-property name="EndPoint">http://www.weather.gov/forecasts/xml/sample_products/browser_interface/nd...</config-property>
15 years, 9 months
teiid SVN: r2331 - in trunk/connectors/translator-jdbc/src: main/java/org/teiid/translator/jdbc/sqlserver and 2 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2010-07-07 21:49:34 -0400 (Wed, 07 Jul 2010)
New Revision: 2331
Modified:
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/oracle/ConcatFunctionModifier.java
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/sqlserver/SQLServerExecutionFactory.java
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/sybase/SybaseExecutionFactory.java
trunk/connectors/translator-jdbc/src/test/java/org/teiid/translator/jdbc/sybase/TestSybaseSQLConversionVisitor.java
Log:
TEIID-1147 fix for sybase concat
Modified: trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/oracle/ConcatFunctionModifier.java
===================================================================
--- trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/oracle/ConcatFunctionModifier.java 2010-07-07 17:10:31 UTC (rev 2330)
+++ trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/oracle/ConcatFunctionModifier.java 2010-07-08 01:49:34 UTC (rev 2331)
@@ -86,7 +86,7 @@
return Arrays.asList(langFactory.createSearchedCaseExpression(cases, function, TypeFacility.RUNTIME_TYPES.STRING));
}
- private boolean isNotNull(Expression expr) {
+ public static boolean isNotNull(Expression expr) {
if (expr instanceof Literal) {
Literal literal = (Literal)expr;
return literal.getValue() != null;
Modified: trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/sqlserver/SQLServerExecutionFactory.java
===================================================================
--- trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/sqlserver/SQLServerExecutionFactory.java 2010-07-07 17:10:31 UTC (rev 2330)
+++ trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/sqlserver/SQLServerExecutionFactory.java 2010-07-08 01:49:34 UTC (rev 2331)
@@ -188,5 +188,10 @@
public boolean supportsAggregatesEnhancedNumeric() {
return true;
}
-
+
+ @Override
+ public boolean nullPlusNonNullIsNull() {
+ return true;
+ }
+
}
Modified: trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/sybase/SybaseExecutionFactory.java
===================================================================
--- trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/sybase/SybaseExecutionFactory.java 2010-07-07 17:10:31 UTC (rev 2330)
+++ trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/sybase/SybaseExecutionFactory.java 2010-07-08 01:49:34 UTC (rev 2331)
@@ -43,6 +43,7 @@
import org.teiid.translator.jdbc.FunctionModifier;
import org.teiid.translator.jdbc.JDBCExecutionFactory;
import org.teiid.translator.jdbc.ModFunctionModifier;
+import org.teiid.translator.jdbc.oracle.ConcatFunctionModifier;
@Translator(name="sybase")
@@ -59,8 +60,18 @@
public void start() throws TranslatorException {
super.start();
- registerFunctionModifier(SourceSystemFunctions.MOD, new ModFunctionModifier("%", getLanguageFactory())); //$NON-NLS-1$
- registerFunctionModifier(SourceSystemFunctions.CONCAT, new AliasModifier("+")); //$NON-NLS-1$
+ registerFunctionModifier(SourceSystemFunctions.MOD, new ModFunctionModifier("%", getLanguageFactory())); //$NON-NLS-1$
+ if (nullPlusNonNullIsNull()) {
+ registerFunctionModifier(SourceSystemFunctions.CONCAT, new AliasModifier("+")); //$NON-NLS-1$
+ } else {
+ registerFunctionModifier(SourceSystemFunctions.CONCAT, new ConcatFunctionModifier(getLanguageFactory()) {
+ @Override
+ public List<?> translate(Function function) {
+ function.setName("+"); //$NON-NLS-1$
+ return super.translate(function);
+ }
+ });
+ }
registerFunctionModifier(SourceSystemFunctions.LCASE, new AliasModifier("lower")); //$NON-NLS-1$
registerFunctionModifier(SourceSystemFunctions.IFNULL, new AliasModifier("isnull")); //$NON-NLS-1$
registerFunctionModifier(SourceSystemFunctions.UCASE, new AliasModifier("upper")); //$NON-NLS-1$
@@ -100,8 +111,18 @@
public List<?> translate(Function function) {
List<Object> result = new ArrayList<Object>();
result.add("cast("); //$NON-NLS-1$
+ boolean needsEnd = false;
+ if (!nullPlusNonNullIsNull() && !ConcatFunctionModifier.isNotNull(function.getParameters().get(0))) {
+ result.add("CASE WHEN "); //$NON-NLS-1$
+ result.add(function.getParameters().get(0));
+ result.add(" IS NOT NULL THEN "); //$NON-NLS-1$
+ needsEnd = true;
+ }
result.add("'1970-01-01 ' + "); //$NON-NLS-1$
result.addAll(convertTimeToString(function));
+ if (needsEnd) {
+ result.add(" END"); //$NON-NLS-1$
+ }
result.add(" AS datetime)"); //$NON-NLS-1$
return result;
}
@@ -302,4 +323,8 @@
public boolean supportsAggregatesEnhancedNumeric() {
return getDatabaseVersion().compareTo(FIFTEEN_0_2) >= 0;
}
+
+ public boolean nullPlusNonNullIsNull() {
+ return false;
+ }
}
Modified: trunk/connectors/translator-jdbc/src/test/java/org/teiid/translator/jdbc/sybase/TestSybaseSQLConversionVisitor.java
===================================================================
--- trunk/connectors/translator-jdbc/src/test/java/org/teiid/translator/jdbc/sybase/TestSybaseSQLConversionVisitor.java 2010-07-07 17:10:31 UTC (rev 2330)
+++ trunk/connectors/translator-jdbc/src/test/java/org/teiid/translator/jdbc/sybase/TestSybaseSQLConversionVisitor.java 2010-07-08 01:49:34 UTC (rev 2331)
@@ -80,7 +80,7 @@
@Test
public void testConcatFunction() {
String input = "SELECT concat(part_name, 'b') FROM PARTS"; //$NON-NLS-1$
- String output = "SELECT (PARTS.PART_NAME + 'b') FROM PARTS"; //$NON-NLS-1$
+ String output = "SELECT CASE WHEN PARTS.PART_NAME IS NULL THEN NULL ELSE (PARTS.PART_NAME + 'b') END FROM PARTS"; //$NON-NLS-1$
helpTestVisitor(getTestVDB(),
input,
@@ -146,6 +146,15 @@
output);
}
+ @Test public void testConvertTimestampTime() {
+ String input = "SELECT convert(TIMESTAMPVALUE, time) FROM BQT1.SMALLA"; //$NON-NLS-1$
+ String output = "SELECT cast(CASE WHEN SmallA.TimestampValue IS NOT NULL THEN '1970-01-01 ' + convert(varchar, SmallA.TimestampValue, 8) END AS datetime) FROM SmallA"; //$NON-NLS-1$
+
+ helpTestVisitor(getBQTVDB(),
+ input,
+ output);
+ }
+
@Test
public void testConvertFunctionChar() {
String input = "SELECT convert(PART_NAME, char) FROM PARTS"; //$NON-NLS-1$
15 years, 9 months
teiid SVN: r2330 - in trunk: build/assembly/adminshell and 3 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2010-07-07 13:10:31 -0400 (Wed, 07 Jul 2010)
New Revision: 2330
Modified:
trunk/adminshell/src/main/java/org/teiid/adminshell/AdminShell.java
trunk/build/assembly/adminshell/adminshell-dist.xml
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
trunk/runtime/src/main/java/org/teiid/transport/ObjectEncoder.java
trunk/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
Log:
TEIID-1145 fix for vdb truncation.
Modified: trunk/adminshell/src/main/java/org/teiid/adminshell/AdminShell.java
===================================================================
--- trunk/adminshell/src/main/java/org/teiid/adminshell/AdminShell.java 2010-07-07 01:34:45 UTC (rev 2329)
+++ trunk/adminshell/src/main/java/org/teiid/adminshell/AdminShell.java 2010-07-07 17:10:31 UTC (rev 2330)
@@ -26,7 +26,6 @@
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.Properties;
@@ -37,7 +36,6 @@
import org.teiid.adminapi.Admin;
import org.teiid.adminapi.AdminException;
import org.teiid.adminapi.AdminFactory;
-import org.teiid.adminapi.AdminProcessingException;
import org.teiid.adminapi.ProcessObject;
import org.teiid.adminapi.PropertyDefinition;
import org.teiid.adminapi.Request;
@@ -47,7 +45,6 @@
import org.teiid.adminapi.VDB;
import org.teiid.adminapi.WorkerPoolStatistics;
import org.teiid.adminshell.Help.Doc;
-import org.teiid.core.util.ObjectConverterUtil;
/**
@@ -302,13 +299,13 @@
return false;
}
- private static void writeFile(String deployedName, String fileName,
+ /*private static void writeFile(String deployedName, String fileName,
InputStream contents) throws IOException, AdminProcessingException {
if (contents == null) {
throw new AdminProcessingException(deployedName + " not found for exporting");//$NON-NLS-1$
}
ObjectConverterUtil.write(contents, fileName);
- }
+ }*/
@Doc(text = "Deploy a VDB from file")
public static void deployVDB(
Modified: trunk/build/assembly/adminshell/adminshell-dist.xml
===================================================================
--- trunk/build/assembly/adminshell/adminshell-dist.xml 2010-07-07 01:34:45 UTC (rev 2329)
+++ trunk/build/assembly/adminshell/adminshell-dist.xml 2010-07-07 17:10:31 UTC (rev 2330)
@@ -1,4 +1,3 @@
-<!--This script builds a JAR for the CDK -->
<assembly>
<id>adminshell-dist</id>
Modified: trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
===================================================================
--- trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java 2010-07-07 01:34:45 UTC (rev 2329)
+++ trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectEncoderOutputStream.java 2010-07-07 17:10:31 UTC (rev 2330)
@@ -63,7 +63,7 @@
out.writeInt(baos.getCount()); //includes the lob references
out.write(baos.getBuffer(), 0, baos.getCount());
- byte[] chunk = new byte[1 << 16];
+ byte[] chunk = new byte[(1 << 16) - 1];
for (InputStream is : oout.getStreams()) {
while (true) {
int bytes = is.read(chunk);
Modified: trunk/runtime/src/main/java/org/teiid/transport/ObjectEncoder.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/ObjectEncoder.java 2010-07-07 01:34:45 UTC (rev 2329)
+++ trunk/runtime/src/main/java/org/teiid/transport/ObjectEncoder.java 2010-07-07 17:10:31 UTC (rev 2330)
@@ -63,7 +63,7 @@
@ChannelPipelineCoverage("all")
public class ObjectEncoder implements ChannelDownstreamHandler {
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
- private static final int CHUNK_SIZE = 1 << 16;
+ private static final int CHUNK_SIZE = (1 << 16) - 1;
private final int estimatedLength;
Modified: trunk/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/transport/TestCommSockets.java 2010-07-07 01:34:45 UTC (rev 2329)
+++ trunk/runtime/src/test/java/org/teiid/transport/TestCommSockets.java 2010-07-07 17:10:31 UTC (rev 2330)
@@ -128,6 +128,7 @@
FakeService fs = conn.getService(FakeService.class);
assertEquals(150, fs.lobMethod(new ByteArrayInputStream(new byte[100]), new StringReader(new String(new char[50]))));
assertEquals(0, fs.lobMethod(new ByteArrayInputStream(new byte[0]), new StringReader(new String(new char[0]))));
+ assertEquals((1 << 17) + 50, fs.lobMethod(new ByteArrayInputStream(new byte[1 << 17]), new StringReader(new String(new char[50]))));
}
@Test public void testServerRemoteStreaming() throws Exception {
15 years, 9 months
teiid SVN: r2329 - in trunk: engine/src/main/java/org/teiid/query/processor/relational and 2 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2010-07-06 21:34:45 -0400 (Tue, 06 Jul 2010)
New Revision: 2329
Modified:
trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java
trunk/engine/src/main/java/org/teiid/query/processor/xml/IfInstruction.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSelectNode.java
Log:
TEIID-1144 fix for blockedexception handling that causes problems with timeslicing. also updating the ping logic.
Modified: trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java
===================================================================
--- trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java 2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java 2010-07-07 01:34:45 UTC (rev 2329)
@@ -107,13 +107,16 @@
@Override
public void run() {
- if (ping == null || !ping.isDone()) {
+ if (ping == null) {
ping = isOpen();
}
- if (ping != null && ping.isDone()) {
+ if (ping != null) {
try {
- ping.get();
+ ping.get(1, TimeUnit.SECONDS);
+ ping = null;
return;
+ } catch (TimeoutException e) {
+ return;
} catch (Throwable e) {
handlePingError(e);
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java 2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java 2010-07-07 01:34:45 UTC (rev 2329)
@@ -23,7 +23,6 @@
package org.teiid.query.processor.relational;
import java.util.List;
-import java.util.Map;
import org.teiid.common.buffer.BlockedException;
import org.teiid.core.TeiidComponentException;
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java 2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java 2010-07-07 01:34:45 UTC (rev 2329)
@@ -61,7 +61,7 @@
// Saved state when blocked on evaluating a row - must be reset
private TupleBatch currentBatch;
- private int currentRow;
+ private int currentRow = 1;
public ProjectNode(int nodeID) {
super(nodeID);
@@ -71,7 +71,7 @@
super.reset();
currentBatch = null;
- currentRow = 0;
+ currentRow = 1;
}
/**
@@ -147,70 +147,50 @@
public TupleBatch nextBatchDirect()
throws BlockedException, TeiidComponentException, TeiidProcessingException {
-
- // currentBatch and currentRow hold temporary state saved in the case
- // of a BlockedException while evaluating an expression. If that has
- // not occurred, currentBatch will be null and currentRow will be < 0.
- // blockedOnPrepare indicates that the BlockedException happened
- // during the call to prepareToProcessTuple
-
- TupleBatch batch = this.currentBatch;
- int beginRow = this.currentRow;
-
- if(batch == null) {
+
+ if(currentBatch == null) {
// There was no saved batch, so get a new one
//in the case of select with no from, should return only
//one batch with one row
if(this.getChildren()[0] == null){
- batch = new TupleBatch(0, new List[]{Arrays.asList(new Object[] {})});
- batch.setTerminationFlag(true);
+ currentBatch = new TupleBatch(1, new List[]{Arrays.asList(new Object[] {})});
+ currentBatch.setTerminationFlag(true);
}else{
- batch = this.getChildren()[0].nextBatch();
+ currentBatch = this.getChildren()[0].nextBatch();
}
// Check for no project needed and pass through
- if(batch.getRowCount() == 0 || !needsProject) {
- // Just pass the batch through without processing
- return batch;
+ if(!needsProject) {
+ TupleBatch result = currentBatch;
+ currentBatch = null;
+ return result;
}
-
- // Set the beginRow based on beginning row of the batch
- beginRow = batch.getBeginRow();
-
- } else {
- // There was a saved batch, but we grabbed the state so it can now be removed
- this.currentBatch = null;
- this.currentRow = 0;
}
- for(int row = beginRow; row <= batch.getEndRow(); row++) {
- List tuple = batch.getTuple(row);
+ while (currentRow <= currentBatch.getEndRow() && !isBatchFull()) {
+ List tuple = currentBatch.getTuple(currentRow);
List projectedTuple = new ArrayList(selectSymbols.size());
// Walk through symbols
- try {
- for(int i=0; i<selectSymbols.size(); i++) {
- SelectSymbol symbol = (SelectSymbol) selectSymbols.get(i);
- updateTuple(symbol, tuple, projectedTuple);
- }
- } catch(BlockedException e) {
- // Expression blocked, so save state and rethrow
- this.currentBatch = batch;
- this.currentRow = row;
- throw e;
- }
+ for(int i=0; i<selectSymbols.size(); i++) {
+ SelectSymbol symbol = (SelectSymbol) selectSymbols.get(i);
+ updateTuple(symbol, tuple, projectedTuple);
+ }
// Add to batch
addBatchRow(projectedTuple);
+ currentRow++;
}
-
- // Check for termination tuple
- if(batch.getTerminationFlag()) {
- terminateBatches();
+
+ if (currentRow > currentBatch.getEndRow()) {
+ if(currentBatch.getTerminationFlag()) {
+ terminateBatches();
+ }
+ currentBatch = null;
}
-
- return pullBatch();
+
+ return pullBatch();
}
private void updateTuple(SelectSymbol symbol, List values, List tuple)
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2010-07-07 01:34:45 UTC (rev 2329)
@@ -41,6 +41,7 @@
import org.teiid.logging.MessageLevel;
import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.processor.ProcessorDataManager;
+import org.teiid.query.processor.QueryProcessor;
import org.teiid.query.processor.BatchCollector.BatchProducer;
import org.teiid.query.sql.symbol.AliasSymbol;
import org.teiid.query.sql.symbol.SingleElementSymbol;
@@ -287,6 +288,11 @@
this.nodeStatistics.collectCumulativeNodeStats(null, RelationalNodeStatistics.BLOCKEDEXCEPTION_STOP);
}
throw e;
+ } catch (QueryProcessor.ExpiredTimeSliceException e) {
+ if(recordStats && this.context.getCollectNodeStatistics()) {
+ this.nodeStatistics.stopBatchTimer();
+ }
+ throw e;
} catch (TeiidComponentException e) {
// stop timer for this batch (MetaMatrixComponentException)
if(recordStats && this.context.getCollectNodeStatistics()) {
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java 2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java 2010-07-07 01:34:45 UTC (rev 2329)
@@ -49,10 +49,8 @@
private int[] projectionIndexes;
// State if blocked on evaluating a criteria
- private boolean blockedOnCriteria = false;
- private boolean blockedOnPrepare = false;
- private TupleBatch blockedBatch = null;
- private int blockedRow = 0;
+ private TupleBatch currentBatch;
+ private int currentRow = 1;
public SelectNode(int nodeID) {
super(nodeID);
@@ -61,10 +59,8 @@
public void reset() {
super.reset();
- blockedOnCriteria = false;
- blockedOnPrepare = false;
- blockedBatch = null;
- blockedRow = 0;
+ currentBatch = null;
+ currentRow = 1;
}
public void setCriteria(Criteria criteria) {
@@ -91,45 +87,28 @@
*/
public TupleBatch nextBatchDirect()
throws BlockedException, TeiidComponentException, TeiidProcessingException {
-
- TupleBatch batch = blockedBatch;
- if(! blockedOnCriteria && ! blockedOnPrepare) {
- batch = this.getChildren()[0].nextBatch();
+
+ if(currentBatch == null) {
+ currentBatch = this.getChildren()[0].nextBatch();
}
-
- int row = blockedRow;
- if(! blockedOnCriteria && ! blockedOnPrepare) {
- row = batch.getBeginRow();
- } else {
- // Reset blocked state
- blockedOnCriteria = false;
- blockedOnPrepare = false;
- blockedBatch = null;
- blockedRow = 0;
- }
-
- for(; row <= batch.getEndRow(); row++) {
- List tuple = batch.getTuple(row);
-
- // Evaluate criteria with tuple
- try {
- if(getEvaluator(this.elementMap).evaluate(this.criteria, tuple)) {
- addBatchRow(projectTuple(this.projectionIndexes, tuple));
- }
- } catch(BlockedException e) {
- // Save state and rethrow
- blockedOnCriteria = true;
- blockedBatch = batch;
- blockedRow = row;
- throw e;
- }
- }
- if(batch.getTerminationFlag()) {
- terminateBatches();
- }
+ while (currentRow <= currentBatch.getEndRow() && !isBatchFull()) {
+ List tuple = currentBatch.getTuple(currentRow);
- return pullBatch();
+ if(getEvaluator(this.elementMap).evaluate(this.criteria, tuple)) {
+ addBatchRow(projectTuple(this.projectionIndexes, tuple));
+ }
+ currentRow++;
+ }
+
+ if (currentRow > currentBatch.getEndRow()) {
+ if(currentBatch.getTerminationFlag()) {
+ terminateBatches();
+ }
+ currentBatch = null;
+ }
+
+ return pullBatch();
}
protected void getNodeString(StringBuffer str) {
Modified: trunk/engine/src/main/java/org/teiid/query/processor/xml/IfInstruction.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/xml/IfInstruction.java 2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/engine/src/main/java/org/teiid/query/processor/xml/IfInstruction.java 2010-07-07 01:34:45 UTC (rev 2329)
@@ -54,8 +54,7 @@
private DefaultCondition defaultCondition;
// State if condition evaluation blocked
- private boolean blockedOnCondition = false;
- private int blockedConditionIndex = 0;
+ private int conditionIndex = 0;
/**
* Constructor for IfInstruction.
@@ -148,37 +147,22 @@
thens.add(defaultCondition);
}
- int conditionIndex = this.blockedConditionIndex;
- if(blockedOnCondition) {
- // Remove state - we have recovered and will reset if necessary
- this.blockedOnCondition = false;
- this.blockedConditionIndex = 0;
- } else{
- conditionIndex = 0;
- }
-
Condition condition = null;
boolean foundTrueCondition = false;
for(; conditionIndex < thens.size(); conditionIndex++){
condition = (Condition)thens.get(conditionIndex);
- // evaluate may block if criteria evaluation blocks
- try {
- if(condition.evaluate(env, context)) {
- foundTrueCondition = true;
- //break from the loop; only the first "then" Program
- //whose criteria evaluates to true will be executed
- break;
- }
- } catch(BlockedException e) {
- // Save state and rethrow
- this.blockedOnCondition = true;
- this.blockedConditionIndex = conditionIndex;
- throw e;
+ // evaluate may block if criteria evaluation blocks
+ if(condition.evaluate(env, context)) {
+ foundTrueCondition = true;
+ //break from the loop; only the first "then" Program
+ //whose criteria evaluates to true will be executed
+ break;
}
}
+ conditionIndex = 0;
// This IF instruction should be processed exactly once, so the
// program containing the IF instruction needs to have it's
Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSelectNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSelectNode.java 2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSelectNode.java 2010-07-07 01:34:45 UTC (rev 2329)
@@ -22,12 +22,15 @@
package org.teiid.query.processor.relational;
+import static org.junit.Assert.*;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.junit.Test;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.BufferManagerFactory;
@@ -35,13 +38,13 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.query.eval.Evaluator;
import org.teiid.query.function.FunctionDescriptor;
import org.teiid.query.function.SystemFunctionManager;
import org.teiid.query.processor.BatchIterator;
import org.teiid.query.processor.FakeDataManager;
import org.teiid.query.processor.ProcessorDataManager;
-import org.teiid.query.processor.relational.RelationalNode;
-import org.teiid.query.processor.relational.SelectNode;
+import org.teiid.query.processor.QueryProcessor;
import org.teiid.query.sql.lang.CompareCriteria;
import org.teiid.query.sql.lang.Criteria;
import org.teiid.query.sql.symbol.Constant;
@@ -50,34 +53,28 @@
import org.teiid.query.sql.symbol.Function;
import org.teiid.query.util.CommandContext;
-import junit.framework.TestCase;
+public class TestSelectNode {
-
-/**
- */
-public class TestSelectNode extends TestCase {
-
- /**
- * Constructor for TestSelectNode.
- * @param arg0
- */
- public TestSelectNode(String arg0) {
- super(arg0);
- }
-
public void helpTestSelect(List elements, Criteria criteria, List[] data, List childElements, ProcessorDataManager dataMgr, List[] expected) throws TeiidComponentException, TeiidProcessingException {
helpTestSelect(elements, criteria, childElements, dataMgr, expected, new FakeRelationalNode(2, data));
}
public void helpTestSelect(List elements, Criteria criteria, List childElements, ProcessorDataManager dataMgr, List[] expected, RelationalNode child) throws TeiidComponentException, TeiidProcessingException {
- BufferManager mgr = BufferManagerFactory.getStandaloneBufferManager();
+ SelectNode selectNode = new SelectNode(1);
+ helpTestSelect(elements, criteria, childElements, dataMgr, expected, child, selectNode);
+ }
+
+ private void helpTestSelect(List elements, Criteria criteria, List childElements,
+ ProcessorDataManager dataMgr, List[] expected,
+ RelationalNode child,
+ SelectNode selectNode) throws TeiidComponentException,
+ TeiidProcessingException {
+ BufferManager mgr = BufferManagerFactory.getStandaloneBufferManager();
CommandContext context = new CommandContext("pid", "test", null, null, 1); //$NON-NLS-1$ //$NON-NLS-2$
child.setElements(childElements);
- child.initialize(context, mgr, dataMgr);
-
- SelectNode selectNode = new SelectNode(1);
- selectNode.setCriteria(criteria);
+ child.initialize(context, mgr, dataMgr);
+ selectNode.setCriteria(criteria);
selectNode.setElements(elements);
selectNode.addChild(child);
selectNode.initialize(context, mgr, dataMgr);
@@ -93,16 +90,18 @@
break;
} catch (BlockedException e) {
continue;
+ } catch (QueryProcessor.ExpiredTimeSliceException e) {
+ continue;
}
}
}
assertFalse(iterator.hasNext());
- }
+ }
/**
* Ensures that a final empty batch is reindexed so that the batch iterator works correctly
*/
- public void testEmptyBatchIndexing() throws TeiidComponentException, TeiidProcessingException {
+ @Test public void testEmptyBatchIndexing() throws TeiidComponentException, TeiidProcessingException {
ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
@@ -138,10 +137,41 @@
helpTestSelect(elements, crit, childElements, null, new List[0], child);
}
- public void testNoRows() throws TeiidComponentException, TeiidProcessingException {
+ @Test public void testTimeslicing() throws TeiidComponentException, TeiidProcessingException {
ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
+ List elements = new ArrayList();
+ elements.add(es1);
+
+ CompareCriteria crit = new CompareCriteria(es1, CompareCriteria.EQ, new Constant(new Integer(1)));
+
+ List[] data = new List[] {
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1)
+ };
+
+ List childElements = new ArrayList();
+ childElements.add(es1);
+
+ helpTestSelect(elements, crit, childElements, null, data, new FakeRelationalNode(2, data), new SelectNode(3) {
+ int i = 0;
+
+ @Override
+ protected Evaluator getEvaluator(Map elementMap) {
+ if (i++ == 1) {
+ throw new QueryProcessor.ExpiredTimeSliceException();
+ }
+ return super.getEvaluator(elementMap);
+ }
+ });
+ }
+
+ @Test public void testNoRows() throws TeiidComponentException, TeiidProcessingException {
+ ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
+ es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
+
ElementSymbol es2 = new ElementSymbol("e2"); //$NON-NLS-1$
es2.setType(DataTypeManager.DefaultDataClasses.STRING);
@@ -160,7 +190,7 @@
}
- public void testSimpleSelect() throws TeiidComponentException, TeiidProcessingException {
+ @Test public void testSimpleSelect() throws TeiidComponentException, TeiidProcessingException {
ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
@@ -195,7 +225,7 @@
}
- public void testSelectWithLookup() throws TeiidComponentException, TeiidProcessingException {
+ @Test public void testSelectWithLookup() throws TeiidComponentException, TeiidProcessingException {
ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
15 years, 9 months
teiid SVN: r2328 - in trunk: test-integration/common/src/test/resources/TestMMDatabaseMetaData and 1 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2010-07-06 14:36:27 -0400 (Tue, 06 Jul 2010)
New Revision: 2328
Modified:
trunk/client/src/main/java/org/teiid/jdbc/DatabaseMetaDataImpl.java
trunk/test-integration/common/src/test/resources/TestMMDatabaseMetaData/testGetCatalogs.expected
trunk/test-integration/common/src/test/resources/TestPartsDatabaseMetadata/testCatalogs.expected
Log:
TEIID-1141 adding support for getCatalogs
Modified: trunk/client/src/main/java/org/teiid/jdbc/DatabaseMetaDataImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/DatabaseMetaDataImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
+++ trunk/client/src/main/java/org/teiid/jdbc/DatabaseMetaDataImpl.java 2010-07-06 18:36:27 UTC (rev 2328)
@@ -433,7 +433,8 @@
public ResultSet getCatalogs() throws SQLException {
// list containing records/rows in the ResultSet
- List records = new ArrayList (0);
+ List<List<String>> records = new ArrayList<List<String>> (1);
+ records.add(Arrays.asList(this.driverConnection.getCatalog()));
/***********************************************************************
* Hardcoding JDBC column names for the columns returned in results object
Modified: trunk/test-integration/common/src/test/resources/TestMMDatabaseMetaData/testGetCatalogs.expected
===================================================================
--- trunk/test-integration/common/src/test/resources/TestMMDatabaseMetaData/testGetCatalogs.expected 2010-07-01 14:26:20 UTC (rev 2327)
+++ trunk/test-integration/common/src/test/resources/TestMMDatabaseMetaData/testGetCatalogs.expected 2010-07-06 18:36:27 UTC (rev 2328)
@@ -1,5 +1,6 @@
string
TABLE_CAT
-Row Count : 0
+QT_Ora9DS
+Row Count : 1
getColumnName getColumnType getCatalogName getColumnClassName getColumnLabel getColumnTypeName getSchemaName getTableName getColumnDisplaySize getPrecision getScale isAutoIncrement isCaseSensitive isCurrency isDefinitelyWritable isNullable isReadOnly isSearchable isSigned isWritable
TABLE_CAT 12 QT_Ora9DS java.lang.String <null> string <null> <null> 4000 4000 0 false false false false 1 true false false false
Modified: trunk/test-integration/common/src/test/resources/TestPartsDatabaseMetadata/testCatalogs.expected
===================================================================
--- trunk/test-integration/common/src/test/resources/TestPartsDatabaseMetadata/testCatalogs.expected 2010-07-01 14:26:20 UTC (rev 2327)
+++ trunk/test-integration/common/src/test/resources/TestPartsDatabaseMetadata/testCatalogs.expected 2010-07-06 18:36:27 UTC (rev 2328)
@@ -1,5 +1,6 @@
string
TABLE_CAT
-Row Count : 0
+PartsSupplier
+Row Count : 1
getColumnName getColumnType getCatalogName getColumnClassName getColumnLabel getColumnTypeName getSchemaName getTableName getColumnDisplaySize getPrecision getScale isAutoIncrement isCaseSensitive isCurrency isDefinitelyWritable isNullable isReadOnly isSearchable isSigned isWritable
TABLE_CAT 12 PartsSupplier java.lang.String <null> string <null> <null> 4000 4000 0 false false false false 1 true false false false
15 years, 9 months
teiid SVN: r2327 - in trunk: client/src/main/java/org/teiid/client/util and 11 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2010-07-01 10:26:20 -0400 (Thu, 01 Jul 2010)
New Revision: 2327
Modified:
trunk/client/src/main/java/org/teiid/adminapi/AdminFactory.java
trunk/client/src/main/java/org/teiid/client/util/ExceptionHolder.java
trunk/client/src/main/java/org/teiid/jdbc/CallableStatementImpl.java
trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java
trunk/client/src/main/java/org/teiid/jdbc/DataTypeTransformer.java
trunk/client/src/main/java/org/teiid/jdbc/DatabaseMetaDataImpl.java
trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java
trunk/client/src/main/java/org/teiid/jdbc/ResultSetImpl.java
trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java
trunk/client/src/main/java/org/teiid/jdbc/WarningUtil.java
trunk/client/src/main/java/org/teiid/jdbc/WrapperImpl.java
trunk/client/src/main/java/org/teiid/jdbc/XAConnectionImpl.java
trunk/client/src/main/java/org/teiid/net/socket/OioOjbectChannelFactory.java
trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java
trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnectionFactory.java
trunk/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java
trunk/client/src/main/java/org/teiid/net/socket/SocketUtil.java
trunk/client/src/main/java/org/teiid/net/socket/UrlServerDiscovery.java
trunk/client/src/test/java/org/teiid/client/util/TestExceptionHolder.java
trunk/client/src/test/java/org/teiid/jdbc/TestWrapperImpl.java
trunk/client/src/test/java/org/teiid/jdbc/TestXAConnection.java
trunk/client/src/test/java/org/teiid/net/socket/TestSocketServerConnection.java
trunk/client/src/test/java/org/teiid/net/socket/TestSocketServerInstanceImpl.java
trunk/common-core/src/main/java/org/teiid/core/types/BlobType.java
trunk/common-core/src/main/java/org/teiid/core/types/ClobType.java
trunk/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java
trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java
trunk/common-core/src/main/java/org/teiid/core/types/JDBCSQLTypeInfo.java
trunk/common-core/src/main/java/org/teiid/core/util/SqlUtil.java
trunk/common-core/src/test/java/org/teiid/core/types/TestDataTypeManager.java
trunk/common-core/src/test/java/org/teiid/core/types/TestSQLXMLImpl.java
trunk/common-core/src/test/java/org/teiid/core/types/basic/TestStringToXmlTransform.java
trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java
Log:
TEIID-177 removing comment based approach to backwards compatibility and changing the isvalid check to use a ping
Modified: trunk/client/src/main/java/org/teiid/adminapi/AdminFactory.java
===================================================================
--- trunk/client/src/main/java/org/teiid/adminapi/AdminFactory.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/adminapi/AdminFactory.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -90,9 +90,7 @@
return target;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
if (method.getName().equals("close")) { //$NON-NLS-1$
Modified: trunk/client/src/main/java/org/teiid/client/util/ExceptionHolder.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/util/ExceptionHolder.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/client/util/ExceptionHolder.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -59,9 +59,7 @@
this.nested = nested;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
List<String> classNames = ExternalizeUtil.readList(in, String.class);
String message = (String)in.readObject();
@@ -93,9 +91,7 @@
}
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public void writeExternal(ObjectOutput out) throws IOException {
List<String> classNames = new ArrayList<String>();
Class<?> clazz = exception.getClass();
Modified: trunk/client/src/main/java/org/teiid/jdbc/CallableStatementImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/CallableStatementImpl.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/jdbc/CallableStatementImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -33,16 +33,10 @@
import java.sql.Date;
import java.sql.Ref;
import java.sql.SQLException;
-//## JDBC4.0-begin ##
import java.sql.SQLXML;
import java.sql.NClob;
import java.sql.RowId;
-//## JDBC4.0-end ##
-/*## JDBC3.0-JDK1.5-begin ##
-import com.metamatrix.core.jdbc.SQLXML;
-## JDBC3.0-JDK1.5-end ##*/
-
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Calendar;
@@ -468,17 +462,13 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-begin ##
public NClob getNClob(int parameterIndex) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
- //## JDBC4.0-begin ##
public NClob getNClob(String parameterName) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public String getNString(int parameterIndex) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
@@ -508,17 +498,13 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-begin ##
public RowId getRowId(int parameterIndex) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
- //## JDBC4.0-begin ##
public RowId getRowId(String parameterName) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public SQLXML getSQLXML(String parameterName) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
@@ -687,11 +673,9 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-begin ##
public void setNClob(String parameterName, NClob value) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public void setNClob(String parameterName, Reader reader)
throws SQLException {
@@ -731,11 +715,9 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-begin ##
public void setRowId(String parameterName, RowId x) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public void setSQLXML(String parameterName, SQLXML xmlObject)
throws SQLException {
Modified: trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -32,15 +32,10 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
-//## JDBC4.0-begin ##
import java.sql.SQLClientInfoException;
import java.sql.NClob;
import java.sql.SQLXML;
-//## JDBC4.0-end ##
-/*## JDBC3.0-JDK1.5-begin ##
-import com.metamatrix.core.jdbc.SQLXML;
-## JDBC3.0-JDK1.5-end ##*/
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
@@ -52,6 +47,9 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -840,22 +838,20 @@
return transactionXid;
}
- public boolean isValid(int timeout) throws SQLException {
- Statement statement = null;
- try {
- statement = createStatement();
- statement.setQueryTimeout(timeout);
- statement.execute("select 1"); //$NON-NLS-1$
- return true;
- } catch (SQLException e) {
- return false;
- } finally {
- if (statement != null) {
- try {
- statement.close();
- } catch (SQLException e) {
- }
- }
+ public boolean isValid(int timeout) throws SQLException {
+ ResultsFuture<?> future = this.getServerConnection().isOpen();
+ if (future == null) {
+ return false;
+ }
+ try {
+ future.get(timeout, TimeUnit.SECONDS);
+ return true;
+ } catch (InterruptedException e) {
+ return false;
+ } catch (ExecutionException e) {
+ return false;
+ } catch (TimeoutException e) {
+ return false;
}
}
@@ -891,7 +887,6 @@
return this.serverConn.isSameInstance(conn.serverConn);
}
- //## JDBC4.0-begin ##
public void setClientInfo(Properties properties)
throws SQLClientInfoException {
}
@@ -899,7 +894,6 @@
public void setClientInfo(String name, String value)
throws SQLClientInfoException {
}
- //## JDBC4.0-end ##
public Properties getClientInfo() throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
@@ -922,11 +916,9 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-begin ##
public NClob createNClob() throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public SQLXML createSQLXML() throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
Modified: trunk/client/src/main/java/org/teiid/jdbc/DataTypeTransformer.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/DataTypeTransformer.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/jdbc/DataTypeTransformer.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -29,12 +29,7 @@
import java.sql.Clob;
import java.sql.Date;
import java.sql.SQLException;
-//## JDBC4.0-begin ##
import java.sql.SQLXML;
-//## JDBC4.0-end ##
-/*## JDBC3.0-JDK1.5-begin ##
-import com.metamatrix.core.jdbc.SQLXML;
-## JDBC3.0-JDK1.5-end ##*/
import java.sql.Time;
import java.sql.Timestamp;
Modified: trunk/client/src/main/java/org/teiid/jdbc/DatabaseMetaDataImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/DatabaseMetaDataImpl.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/jdbc/DatabaseMetaDataImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -2382,11 +2382,9 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-begin ##
public RowIdLifetime getRowIdLifetime() throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public ResultSet getSchemas(String catalog, String schemaPattern)
throws SQLException {
Modified: trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -190,9 +190,7 @@
throw new TeiidSQLException(msg);
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public boolean execute() throws SQLException {
executeSql(new String[] {this.prepareSql}, false, ResultsMode.EITHER);
return hasResultSet();
@@ -211,17 +209,13 @@
return this.updateCounts;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public ResultSet executeQuery() throws SQLException {
executeSql(new String[] {this.prepareSql}, false, ResultsMode.RESULTSET);
return resultSet;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public int executeUpdate() throws SQLException {
executeSql(new String[] {this.prepareSql}, false, ResultsMode.UPDATECOUNT);
return this.updateCounts[0];
@@ -750,11 +744,9 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-begin ##
public void setNClob(int parameterIndex, NClob value) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public void setNClob(int parameterIndex, Reader reader) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
@@ -774,11 +766,9 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-begin ##
public void setRowId(int parameterIndex, RowId x) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public void setUnicodeStream(int parameterIndex, InputStream x, int length)
throws SQLException {
Modified: trunk/client/src/main/java/org/teiid/jdbc/ResultSetImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/ResultSetImpl.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/jdbc/ResultSetImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -406,9 +406,7 @@
checkClosed(); // check to see if the ResultSet is closed
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public void clearWarnings() throws SQLException {
// do nothing
checkClosed(); // check to see if the ResultSet is closed
@@ -1263,7 +1261,6 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-begin ##
public NClob getNClob(int columnIndex) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
@@ -1271,7 +1268,6 @@
public NClob getNClob(String columnLabel) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public String getNString(int columnIndex) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
@@ -1299,17 +1295,13 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-begin ##
public RowId getRowId(int columnIndex) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
- //## JDBC4.0-begin ##
public RowId getRowId(String columnLabel) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public SQLXML getSQLXML(String columnLabel) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
@@ -1625,11 +1617,9 @@
}
- //## JDBC4.0-begin ##
public void updateNClob(int columnIndex, NClob clob) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public void updateNClob(int columnIndex, Reader reader, long length)
throws SQLException {
@@ -1640,11 +1630,9 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-begin ##
public void updateNClob(String columnLabel, NClob clob) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public void updateNClob(String columnLabel, Reader reader, long length)
throws SQLException {
@@ -1704,17 +1692,13 @@
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-begin ##
public void updateRowId(int columnIndex, RowId x) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
- //## JDBC4.0-begin ##
public void updateRowId(String columnLabel, RowId x) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
}
- //## JDBC4.0-end ##
public void updateShort(int columnIndex, short x) throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
Modified: trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -22,8 +22,6 @@
package org.teiid.jdbc;
-import java.io.IOException;
-import java.io.Reader;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -56,9 +54,8 @@
import org.teiid.client.plan.Annotation;
import org.teiid.client.plan.PlanNode;
import org.teiid.core.TeiidComponentException;
-import org.teiid.core.TeiidProcessingException;
import org.teiid.core.TeiidException;
-import org.teiid.core.util.ObjectConverterUtil;
+import org.teiid.core.TeiidProcessingException;
import org.teiid.core.util.SqlUtil;
@@ -299,17 +296,13 @@
}
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public boolean execute(String sql) throws SQLException {
executeSql(new String[] {sql}, false, ResultsMode.EITHER);
return hasResultSet();
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public int[] executeBatch() throws SQLException {
if (batchedUpdates == null || batchedUpdates.isEmpty()) {
return new int[0];
@@ -319,17 +312,13 @@
return updateCounts;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public ResultSet executeQuery(String sql) throws SQLException {
executeSql(new String[] {sql}, false, ResultsMode.RESULTSET);
return resultSet;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public int executeUpdate(String sql) throws SQLException {
String[] commands = new String[] {sql};
executeSql(commands, false, ResultsMode.UPDATECOUNT);
Modified: trunk/client/src/main/java/org/teiid/jdbc/WarningUtil.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/WarningUtil.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/jdbc/WarningUtil.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -52,13 +52,7 @@
return warning;
}
}
- //## JDBC4.0-begin ##
return new SQLWarning(ex);
- //## JDBC4.0-end ##
- /*## JDBC3.0-JDK1.5-begin ##
- return new SQLWarning(ex.getMessage());
- ## JDBC3.0-JDK1.5-end ##*/
-
}
/**
Modified: trunk/client/src/main/java/org/teiid/jdbc/WrapperImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/WrapperImpl.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/jdbc/WrapperImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -23,17 +23,12 @@
package org.teiid.jdbc;
import java.sql.SQLException;
-//## JDBC4.0-begin ##
import java.sql.Wrapper;
import org.teiid.core.util.ArgCheck;
-public class WrapperImpl
- //## JDBC4.0-begin ##
- implements Wrapper
- //## JDBC4.0-end ##
- {
+public class WrapperImpl implements Wrapper {
public boolean isWrapperFor(Class<?> iface) throws SQLException {
ArgCheck.isNotNull(iface);
Modified: trunk/client/src/main/java/org/teiid/jdbc/XAConnectionImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/XAConnectionImpl.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/jdbc/XAConnectionImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -33,9 +33,7 @@
import javax.sql.ConnectionEvent;
import javax.sql.ConnectionEventListener;
-//## JDBC4.0-begin ##
import javax.sql.StatementEventListener;
-//## JDBC4.0-end ##
import javax.sql.XAConnection;
import javax.transaction.xa.XAResource;
@@ -190,11 +188,9 @@
}
}
- //## JDBC4.0-begin ##
public void addStatementEventListener(StatementEventListener arg0) {
}
public void removeStatementEventListener(StatementEventListener arg0) {
}
- //## JDBC4.0-end ##
}
Modified: trunk/client/src/main/java/org/teiid/net/socket/OioOjbectChannelFactory.java
===================================================================
--- trunk/client/src/main/java/org/teiid/net/socket/OioOjbectChannelFactory.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/net/socket/OioOjbectChannelFactory.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -71,9 +71,7 @@
inputStream = new ObjectDecoderInputStream(new DataInputStream(bis), cl, MAX_OBJECT_SIZE);
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public void close() {
log.finer("closing socket"); //$NON-NLS-1$
try {
@@ -98,23 +96,17 @@
}
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public SocketAddress getRemoteAddress() {
return socket.getRemoteSocketAddress();
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public boolean isOpen() {
return !socket.isClosed();
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public Object read() throws IOException, ClassNotFoundException {
log.finer("reading message from socket"); //$NON-NLS-1$
synchronized (readLock) {
@@ -129,9 +121,7 @@
}
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public synchronized Future<?> write(Object msg) {
log.finer("writing message to socket"); //$NON-NLS-1$
ResultsFuture<Void> result = new ResultsFuture<Void>();
@@ -160,9 +150,7 @@
PropertiesUtils.setBeanProperties(this, props, "org.teiid.sockets"); //$NON-NLS-1$
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public ObjectChannel createObjectChannel(SocketAddress address, boolean ssl) throws IOException,
CommunicationException {
final Socket socket;
Modified: trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java
===================================================================
--- trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -33,6 +33,7 @@
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Timer;
@@ -181,7 +182,8 @@
// Log on to server
try {
this.logonResult = logon.logon(connProps);
- if (this.serverDiscovery.getKnownHosts(logonResult, this.serverInstance).size() > 1) {
+ List<HostInfo> knownHosts = this.serverDiscovery.getKnownHosts(logonResult, this.serverInstance);
+ if (knownHosts.size() > 1 && !new HashSet<HostInfo>(knownHosts).equals(new HashSet<HostInfo>(this.serverDiscovery.getKnownHosts(logonResult, null)))) {
//if there are multiple instances, allow for load-balancing
closeServerInstance();
}
Modified: trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnectionFactory.java
===================================================================
--- trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnectionFactory.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnectionFactory.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -74,9 +74,7 @@
this.key = key;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public Object invoke(Object arg0, Method arg1, Object[] arg2)
throws Throwable {
if (arg1.getName().equals("shutdown")) { //$NON-NLS-1$
Modified: trunk/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -105,16 +105,12 @@
}
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public HostInfo getHostInfo() {
return this.hostInfo;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public SocketAddress getRemoteAddress() {
return this.socketChannel.getRemoteAddress();
}
@@ -286,9 +282,7 @@
}
@SuppressWarnings("unchecked")
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public <T> T getService(Class<T> iface) {
return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {iface}, new RemoteInvocationHandler(iface));
}
@@ -303,9 +297,7 @@
this.secure = ILogon.class.isAssignableFrom(targetClass);
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
Throwable t = null;
Modified: trunk/client/src/main/java/org/teiid/net/socket/SocketUtil.java
===================================================================
--- trunk/client/src/main/java/org/teiid/net/socket/SocketUtil.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/net/socket/SocketUtil.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -116,12 +116,7 @@
// One way SSL with custom properties defined
result = getClientSSLContext(null, null, truststore, truststorePassword, keystoreAlgorithm, keystoreType, keystoreProtocol);
} else {
- //## JDBC4.0-begin ##
result = SSLContext.getDefault();
- //## JDBC4.0-end ##
- /*## JDBC3.0-JDK1.5-begin ##
- result = SSLContext.getInstance("Default");
- ## JDBC3.0-JDK1.5-end ##*/
}
return new SSLSocketFactory(result, anon);
}
Modified: trunk/client/src/main/java/org/teiid/net/socket/UrlServerDiscovery.java
===================================================================
--- trunk/client/src/main/java/org/teiid/net/socket/UrlServerDiscovery.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/main/java/org/teiid/net/socket/UrlServerDiscovery.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -44,38 +44,28 @@
this.url = url;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public List<HostInfo> getKnownHosts(LogonResult result,
SocketServerInstance instance) {
return url.getHostInfo();
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public void init(TeiidURL url, Properties p) {
this.url = url;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public void connectionSuccessful(HostInfo info) {
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public void markInstanceAsBad(HostInfo info) {
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public void shutdown() {
}
Modified: trunk/client/src/test/java/org/teiid/client/util/TestExceptionHolder.java
===================================================================
--- trunk/client/src/test/java/org/teiid/client/util/TestExceptionHolder.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/test/java/org/teiid/client/util/TestExceptionHolder.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -21,7 +21,6 @@
public class TestExceptionHolder {
- //## JDBC4.0-begin ##
@SuppressWarnings("all")
public static class BadException extends TeiidProcessingException {
private Object obj;
@@ -116,11 +115,4 @@
Throwable e = holder.getException();
assertTrue(e instanceof TeiidException);
}
- //## JDBC4.0-end ##
-
- /*## JDBC3.0-JDK1.5-begin ##
- public void testPass(){
- // since the jar files required are built with 1.6, it will always fail, so just comment the test for 1.5
- }
- ## JDBC3.0-JDK1.5-end ##*/
}
Modified: trunk/client/src/test/java/org/teiid/jdbc/TestWrapperImpl.java
===================================================================
--- trunk/client/src/test/java/org/teiid/jdbc/TestWrapperImpl.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/test/java/org/teiid/jdbc/TestWrapperImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -27,16 +27,14 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.SQLException;
-//## JDBC4.0-begin ##
import java.sql.Wrapper;
import org.teiid.jdbc.WrapperImpl;
-//## JDBC4.0-end ##
import junit.framework.TestCase;
public class TestWrapperImpl extends TestCase {
- //## JDBC4.0-begin ##
+
interface Foo extends Wrapper {
void callMe();
}
@@ -50,9 +48,9 @@
}
}
- //## JDBC4.0-end ##
+
public void testProxy() throws SQLException {
- //## JDBC4.0-begin ##
+
final FooImpl fooImpl = new FooImpl();
Foo proxy = (Foo)Proxy.newProxyInstance(TestWrapperImpl.class.getClassLoader(), new Class[] {Foo.class}, new InvocationHandler() {
@@ -85,7 +83,6 @@
} catch (SQLException e) {
assertEquals("Wrapped object is not an instance of class java.lang.String", e.getMessage());
}
- //## JDBC4.0-end ##
}
}
Modified: trunk/client/src/test/java/org/teiid/jdbc/TestXAConnection.java
===================================================================
--- trunk/client/src/test/java/org/teiid/jdbc/TestXAConnection.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/test/java/org/teiid/jdbc/TestXAConnection.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -43,9 +43,7 @@
final ConnectionImpl mmConn = TestConnection.getMMConnection();
XAConnectionImpl xaConn = new XAConnectionImpl(new XAConnectionImpl.ConnectionSource() {
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public ConnectionImpl createConnection() throws SQLException {
return mmConn;
}
@@ -71,9 +69,7 @@
@Test public void testNotification() throws Exception {
XAConnectionImpl xaConn = new XAConnectionImpl(new XAConnectionImpl.ConnectionSource() {
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public ConnectionImpl createConnection() throws SQLException {
ConnectionImpl c = Mockito.mock(ConnectionImpl.class);
Mockito.doThrow(new SQLException(new InvalidSessionException())).when(c).commit();
Modified: trunk/client/src/test/java/org/teiid/net/socket/TestSocketServerConnection.java
===================================================================
--- trunk/client/src/test/java/org/teiid/net/socket/TestSocketServerConnection.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/test/java/org/teiid/net/socket/TestSocketServerConnection.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -61,25 +61,19 @@
Throwable t;
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public void assertIdentity(SessionToken sessionId)
throws InvalidSessionException, TeiidComponentException {
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public ResultsFuture<?> logoff()
throws InvalidSessionException {
return null;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public LogonResult logon(
Properties connectionProperties)
throws LogonException,
@@ -87,9 +81,7 @@
return new LogonResult(new SessionToken(1, "fooUser"), "foo", 1, "fake"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public ResultsFuture<?> ping()
throws InvalidSessionException,
TeiidComponentException {
@@ -129,9 +121,7 @@
public void testLogonFailsWithMultipleHosts() throws Exception {
Properties p = new Properties();
SocketServerInstanceFactory instanceFactory = new SocketServerInstanceFactory() {
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public SocketServerInstance getServerInstance(HostInfo info,
boolean ssl) throws CommunicationException, IOException {
throw new SingleInstanceCommunicationException();
@@ -193,9 +183,7 @@
Properties p = new Properties();
ServerDiscovery discovery = new UrlServerDiscovery(new TeiidURL(hostInfo.getHostName(), hostInfo.getPortNumber(), false));
SocketServerInstanceFactory instanceFactory = new SocketServerInstanceFactory() {
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public SocketServerInstance getServerInstance(final HostInfo info,
boolean ssl) throws CommunicationException, IOException {
SocketServerInstance instance = Mockito.mock(SocketServerInstance.class);
Modified: trunk/client/src/test/java/org/teiid/net/socket/TestSocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/test/java/org/teiid/net/socket/TestSocketServerInstanceImpl.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/client/src/test/java/org/teiid/net/socket/TestSocketServerInstanceImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -56,23 +56,17 @@
this.readMsgs = readMsgs;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public void close() {
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public boolean isOpen() {
return true;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public Future<?> write(Object msg) {
msgs.add(msg);
ResultsFuture<?> result = new ResultsFuture<Void>();
@@ -80,9 +74,7 @@
return result;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public Object read() throws IOException,
ClassNotFoundException {
if (readCount >= readMsgs.size()) {
@@ -102,24 +94,18 @@
return msg;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public SocketAddress getRemoteAddress() {
return null;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public ObjectChannel createObjectChannel(SocketAddress address,
boolean ssl) throws IOException, CommunicationException {
return this;
}
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public int getSoTimeout() {
return 1;
}
Modified: trunk/common-core/src/main/java/org/teiid/core/types/BlobType.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/BlobType.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/common-core/src/main/java/org/teiid/core/types/BlobType.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -121,7 +121,6 @@
this.reference.truncate(len);
}
- //## JDBC4.0-begin ##
public void free() throws SQLException {
this.reference.free();
}
@@ -130,7 +129,6 @@
throws SQLException {
return this.reference.getBinaryStream(pos, length);
}
- //## JDBC4.0-end ##
public static SerialBlob createBlob(byte[] bytes) {
if (bytes == null) {
Modified: trunk/common-core/src/main/java/org/teiid/core/types/ClobType.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/ClobType.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/common-core/src/main/java/org/teiid/core/types/ClobType.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -200,15 +200,14 @@
};
}
- //## JDBC4.0-begin ##
- public void free() throws SQLException {
+
+ public void free() throws SQLException {
this.reference.free();
}
public Reader getCharacterStream(long pos, long length) throws SQLException {
return this.reference.getCharacterStream(pos, length);
}
- //## JDBC4.0-end ##
public static SerialClob createClob(char[] chars) {
try {
Modified: trunk/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -704,50 +704,38 @@
static void loadSourceConversions() {
sourceConverters.put(Clob.class, new SourceTransform<Clob, ClobType>() {
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public ClobType transform(Clob value) {
return new ClobType(value);
}
});
sourceConverters.put(char[].class, new SourceTransform<char[], ClobType>() {
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public ClobType transform(char[] value) {
return new ClobType(ClobType.createClob(value));
}
});
sourceConverters.put(Blob.class, new SourceTransform<Blob, BlobType>() {
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public BlobType transform(Blob value) {
return new BlobType(value);
}
});
addSourceTransform(byte[].class, new SourceTransform<byte[], BlobType>() {
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public BlobType transform(byte[] value) {
return new BlobType(BlobType.createBlob(value));
}
});
addSourceTransform(SQLXML.class, new SourceTransform<SQLXML, XMLType>() {
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public XMLType transform(SQLXML value) {
return new XMLType(value);
}
});
//Note: the default transform from non-InputStreamFactory source is a fully materialized string
addSourceTransform(Source.class, new SourceTransform<Source, XMLType>() {
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public XMLType transform(Source value) {
if (value instanceof InputStreamFactory) {
return new XMLType(new SQLXMLImpl((InputStreamFactory)value));
@@ -761,9 +749,7 @@
}
});
addSourceTransform(Date.class, new SourceTransform<Date, Timestamp>() {
- //## JDBC4.0-begin ##
@Override
- //## JDBC4.0-end ##
public Timestamp transform(Date value) {
return new Timestamp(value.getTime());
}
Modified: trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -27,6 +27,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Reader;
import java.nio.charset.Charset;
import java.sql.Blob;
@@ -34,6 +35,7 @@
import java.sql.SQLException;
import java.sql.SQLXML;
+import javax.activation.DataSource;
import javax.xml.transform.Source;
import org.teiid.core.util.ReaderInputStream;
@@ -102,7 +104,7 @@
}
- public static class ClobInputStreamFactory extends InputStreamFactory {
+ public static class ClobInputStreamFactory extends InputStreamFactory implements DataSource {
private Clob clob;
private Charset charset = Charset.forName(Streamable.ENCODING);
@@ -136,10 +138,25 @@
throw new IOException(e);
}
}
+
+ @Override
+ public String getContentType() {
+ return "text/plain"; //$NON-NLS-1$
+ }
+
+ @Override
+ public String getName() {
+ return "clob"; //$NON-NLS-1$
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ throw new UnsupportedOperationException();
+ }
}
- public static class BlobInputStreamFactory extends InputStreamFactory {
+ public static class BlobInputStreamFactory extends InputStreamFactory implements DataSource {
private Blob blob;
@@ -165,9 +182,24 @@
}
}
+ @Override
+ public String getContentType() {
+ return "application/octet-stream"; //$NON-NLS-1$
+ }
+
+ @Override
+ public String getName() {
+ return "blob"; //$NON-NLS-1$
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
}
- public static class SQLXMLInputStreamFactory extends InputStreamFactory {
+ public static class SQLXMLInputStreamFactory extends InputStreamFactory implements DataSource {
private SQLXML sqlxml;
@@ -193,6 +225,21 @@
}
}
+ @Override
+ public String getContentType() {
+ return "application/xml"; //$NON-NLS-1$
+ }
+
+ @Override
+ public String getName() {
+ return "sqlxml"; //$NON-NLS-1$
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
}
}
Modified: trunk/common-core/src/main/java/org/teiid/core/types/JDBCSQLTypeInfo.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/JDBCSQLTypeInfo.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/common-core/src/main/java/org/teiid/core/types/JDBCSQLTypeInfo.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -24,13 +24,8 @@
import java.sql.Blob;
import java.sql.Clob;
-//## JDBC4.0-begin ##
import java.sql.SQLXML;
-//## JDBC4.0-end ##
-/*## JDBC3.0-JDK1.5-begin ##
-import com.metamatrix.core.jdbc.SQLXML;
-## JDBC3.0-JDK1.5-end ##*/
import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
@@ -86,12 +81,7 @@
public static final String OBJECT_CLASS = DataTypeManager.DefaultDataClasses.OBJECT.getName();
public static final String CLOB_CLASS = Clob.class.getName();
public static final String BLOB_CLASS = Blob.class.getName();
- //## JDBC4.0-begin ##
public static final String XML_CLASS = SQLXML.class.getName();
- //## JDBC4.0-end ##
- /*## JDBC3.0-JDK1.5-begin ##
- public static final String XML_CLASS = DataTypeManager.DefaultDataClasses.OBJECT.getName();
- ## JDBC3.0-JDK1.5-end ##*/
private static Map<String, Integer> NAME_TO_TYPE_MAP = new HashMap<String, Integer>();
private static Map<Integer, String> TYPE_TO_NAME_MAP = new HashMap<Integer, String>();
@@ -119,17 +109,11 @@
addTypeMapping(NULL, null, Types.NULL);
- //## JDBC4.0-begin ##
addTypeMapping(XML, XML_CLASS, Types.SQLXML);
TYPE_TO_NAME_MAP.put(Types.NVARCHAR, STRING);
TYPE_TO_NAME_MAP.put(Types.LONGNVARCHAR, STRING);
TYPE_TO_NAME_MAP.put(Types.NCHAR, CHAR);
TYPE_TO_NAME_MAP.put(Types.NCLOB, CLOB);
- //## JDBC4.0-end ##
-
- /*## JDBC3.0-JDK1.5-begin ##
- NAME_TO_CLASSNAME.put(XML, OBJECT_CLASS);
- ## JDBC3.0-JDK1.5-end ##*/
}
private static void addTypeMapping(String typeName, String javaClass, int sqlType, int ... secondaryTypes) {
Modified: trunk/common-core/src/main/java/org/teiid/core/util/SqlUtil.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/util/SqlUtil.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/common-core/src/main/java/org/teiid/core/util/SqlUtil.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -24,14 +24,10 @@
import java.sql.SQLException;
-//## JDBC4.0-begin ##
import java.sql.SQLFeatureNotSupportedException;
-//## JDBC4.0-end ##
import java.util.regex.Pattern;
-
-
/**
* Utilities for dealing with SQL strings.
*/
@@ -111,13 +107,6 @@
}
public static SQLException createFeatureNotSupportedException() {
- //## JDBC4.0-begin ##
return new SQLFeatureNotSupportedException();
- //## JDBC4.0-end ##
-
- /*## JDBC3.0-JDK1.5-begin ##
- return new SQLException("unsupported feature");
- ## JDBC3.0-JDK1.5-end ##*/
-
}
}
Modified: trunk/common-core/src/test/java/org/teiid/core/types/TestDataTypeManager.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/types/TestDataTypeManager.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/common-core/src/test/java/org/teiid/core/types/TestDataTypeManager.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -172,9 +172,7 @@
}
assertEquals(Types.TIMESTAMP, JDBCSQLTypeInfo.getSQLTypeFromRuntimeType(DataTypeManager.DefaultDataClasses.TIMESTAMP));
- //## JDBC4.0-begin ##
assertEquals(Types.SQLXML, JDBCSQLTypeInfo.getSQLTypeFromRuntimeType(DataTypeManager.DefaultDataClasses.XML));
- //## JDBC4.0-end ##
assertEquals(DataTypeManager.DefaultDataTypes.STRING, JDBCSQLTypeInfo.getTypeName(Types.CHAR));
assertEquals(Types.CHAR, JDBCSQLTypeInfo.getSQLTypeFromRuntimeType(DataTypeManager.DefaultDataClasses.CHAR));
}
Modified: trunk/common-core/src/test/java/org/teiid/core/types/TestSQLXMLImpl.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/types/TestSQLXMLImpl.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/common-core/src/test/java/org/teiid/core/types/TestSQLXMLImpl.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -38,7 +38,6 @@
String testStr = "<foo>test</foo>"; //$NON-NLS-1$
- //## JDBC4.0-begin ##
@Test public void testGetSource() throws Exception {
SQLXMLImpl xml = new SQLXMLImpl(testStr);
assertTrue(xml.getSource(null) instanceof StreamSource);
@@ -46,7 +45,6 @@
StreamSource ss = (StreamSource)xml.getSource(null);
assertEquals(testStr, new String(ObjectConverterUtil.convertToByteArray(ss.getInputStream()), Streamable.ENCODING));
}
- //## JDBC4.0-end ##
@Test public void testGetCharacterStream() throws Exception {
SQLXMLImpl xml = new SQLXMLImpl(testStr);
Modified: trunk/common-core/src/test/java/org/teiid/core/types/basic/TestStringToXmlTransform.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/types/basic/TestStringToXmlTransform.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/common-core/src/test/java/org/teiid/core/types/basic/TestStringToXmlTransform.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -22,7 +22,6 @@
package org.teiid.core.types.basic;
-//## JDBC4.0-begin ##
import static org.junit.Assert.*;
import java.sql.SQLXML;
Modified: trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -43,13 +43,7 @@
public static final String PATH_SEPARATOR = "/"; //$NON-NLS-1$
- //## JDBC4.0-begin ##
private static final String DEFAULT_TESTDATA_PATH = "src/test/resources"; //$NON-NLS-1$
- //## JDBC4.0-end ##
-
- /*## JDBC3.0-JDK1.5-begin ##
- private static final String DEFAULT_TESTDATA_PATH = "target/generated-sources/test/resources"; //$NON-NLS-1$
- ## JDBC3.0-JDK1.5-end ##*/
private static final String DEFAULT_TEMP_DIR = "target/scratch"; //$NON-NLS-1$
Modified: trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java 2010-07-01 14:23:31 UTC (rev 2326)
+++ trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java 2010-07-01 14:26:20 UTC (rev 2327)
@@ -88,7 +88,6 @@
import org.teiid.query.sql.lang.SubqueryContainer;
import org.teiid.query.sql.lang.SubqueryFromClause;
import org.teiid.query.sql.lang.TableFunctionReference;
-import org.teiid.query.sql.lang.TextTable;
import org.teiid.query.sql.lang.UnaryFromClause;
import org.teiid.query.sql.proc.CreateUpdateProcedureCommand;
import org.teiid.query.sql.symbol.GroupSymbol;
15 years, 9 months
teiid SVN: r2326 - in trunk/runtime/src/main: resources/org/teiid/runtime and 1 other directory.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2010-07-01 10:23:31 -0400 (Thu, 01 Jul 2010)
New Revision: 2326
Modified:
trunk/runtime/src/main/java/org/teiid/deployers/VDBDeployer.java
trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties
Log:
TEIID-1079 reusing message for translator not found.
Modified: trunk/runtime/src/main/java/org/teiid/deployers/VDBDeployer.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/deployers/VDBDeployer.java 2010-07-01 12:11:44 UTC (rev 2325)
+++ trunk/runtime/src/main/java/org/teiid/deployers/VDBDeployer.java 2010-07-01 14:23:31 UTC (rev 2326)
@@ -180,7 +180,7 @@
String name = model.getSourceTranslatorName(source);
Translator translator = VDBDeployer.this.translatorRepository.getTranslatorMetaData(new VDBKey(deployment.getName(), deployment.getVersion()), name);
if (translator == null) {
- throw new DeploymentException(RuntimePlugin.Util.getString("translator_not_found", name)); //$NON-NLS-1$
+ throw new DeploymentException(RuntimePlugin.Util.getString("translator_not_found", deployment.getName(), deployment.getVersion(), name)); //$NON-NLS-1$
}
ExecutionFactory<Object, Object> ef = map.get(translator);
Modified: trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties
===================================================================
--- trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties 2010-07-01 12:11:44 UTC (rev 2325)
+++ trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties 2010-07-01 14:23:31 UTC (rev 2326)
@@ -306,4 +306,3 @@
required_property_not_exists=Required property "{0}" has no value. Deployment is incomplete.
name_not_found=Translator property "name" not defined for the deployment "{0}"
translator_type_not_found=The parent translator defined not found in configuration "{0}"
-translator_not_found=Unknown translator "{0}"
15 years, 9 months