[teiid-commits] teiid SVN: r593 - in trunk: connector-api/src/main/java/org/teiid/connector/internal and 7 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Thu Mar 19 15:11:15 EDT 2009
Author: shawkins
Date: 2009-03-19 15:11:15 -0400 (Thu, 19 Mar 2009)
New Revision: 593
Added:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java
Modified:
trunk/connector-api/src/main/java/org/teiid/connector/api/ExecutionContext.java
trunk/connector-api/src/main/java/org/teiid/connector/internal/ConnectorPropertyNames.java
trunk/connector-metadata/src/test/java/com/metamatrix/connector/metadata/TestIndexConnector.java
trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/ConnectorHost.java
trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/EnvironmentUtility.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ExecutionContextImpl.java
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeConnector.java
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeExecutionContextImpl.java
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorStateManager.java
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestExecutionContextImpl.java
trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java
trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPerUserPool.java
trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/extension/TestSQLConversionVisitor.java
trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/oracle/TestOracleSQLConversionVisitor.java
Log:
TEIID-164 adding back connector resultset caching.
Modified: trunk/connector-api/src/main/java/org/teiid/connector/api/ExecutionContext.java
===================================================================
--- trunk/connector-api/src/main/java/org/teiid/connector/api/ExecutionContext.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/connector-api/src/main/java/org/teiid/connector/api/ExecutionContext.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -130,12 +130,6 @@
String getConnectionIdentifier();
/**
- * Whether to use ResultSet cache if it is enabled.
- * @return True if use ResultSet cache; false otherwise.
- */
- boolean useResultSetCache();
-
- /**
* When the execution is turned on with "alive=true", the execution object will not
* be implicitly closed at the end of the last batch. It will only be closed at end
* of the user query. This is useful in keeping the connection open for
Modified: trunk/connector-api/src/main/java/org/teiid/connector/internal/ConnectorPropertyNames.java
===================================================================
--- trunk/connector-api/src/main/java/org/teiid/connector/internal/ConnectorPropertyNames.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/connector-api/src/main/java/org/teiid/connector/internal/ConnectorPropertyNames.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -104,6 +104,11 @@
public static final String ADMIN_CONNECTIONS_ALLOWED = "AdminConnectionsAllowed"; //$NON-NLS-1$
+ public static final String USE_RESULTSET_CACHE = "ResultSetCacheEnabled"; //$NON-NLS-1$
+ public static final String MAX_RESULTSET_CACHE_SIZE = "ResultSetCacheMaxSize"; //$NON-NLS-1$
+ public static final String MAX_RESULTSET_CACHE_AGE = "ResultSetCacheMaxAge"; //$NON-NLS-1$
+ public static final String RESULTSET_CACHE_SCOPE = "ResultSetCacheScope"; //$NON-NLS-1$
+
/**
* This property can be used to bypass the normal logic that throws an exception when a command
* is about to be executed by a non-XA compatible connector, but there is a global transaction.
Modified: trunk/connector-metadata/src/test/java/com/metamatrix/connector/metadata/TestIndexConnector.java
===================================================================
--- trunk/connector-metadata/src/test/java/com/metamatrix/connector/metadata/TestIndexConnector.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/connector-metadata/src/test/java/com/metamatrix/connector/metadata/TestIndexConnector.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -163,7 +163,7 @@
}
private ExecutionContext helpGetSecurityContext() {
- return new ExecutionContextImpl("testname", "1", null, null, null, null, null, null, null, null, false); //$NON-NLS-1$ //$NON-NLS-2$
+ return new ExecutionContextImpl("testname", "1", null, null, null, null, null, null, null, null); //$NON-NLS-1$ //$NON-NLS-2$
}
/*
* @see junit.framework.TestCase#tearDown()
Modified: trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/ConnectorHost.java
===================================================================
--- trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/ConnectorHost.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/ConnectorHost.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -128,7 +128,7 @@
}
public void setSecurityContext(String vdbName, String vdbVersion, String userName, Serializable trustedPayload, Serializable executionPayload) {
- this.executionContext = new ExecutionContextImpl(vdbName, vdbVersion, userName, trustedPayload, executionPayload, "Connection", "Connector<CDK>", "Request", "1", "0", false); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
+ this.executionContext = new ExecutionContextImpl(vdbName, vdbVersion, userName, trustedPayload, executionPayload, "Connection", "Connector<CDK>", "Request", "1", "0"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
}
public void setExecutionContext(ExecutionContext context) {
Modified: trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/EnvironmentUtility.java
===================================================================
--- trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/EnvironmentUtility.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/EnvironmentUtility.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -98,7 +98,7 @@
* @return A SecurityContext / ExecutionContext instance
*/
public static ExecutionContext createSecurityContext(String user) {
- return new ExecutionContextImpl("vdb", "1", user, null, null, "Connection", "ConnectorID<CDK>", "Request", "1", "0", false); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$ //$NON-NLS-6$ //$NON-NLS-7$
+ return new ExecutionContextImpl("vdb", "1", user, null, null, "Connection", "ConnectorID<CDK>", "Request", "1", "0"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$ //$NON-NLS-6$ //$NON-NLS-7$
}
/**
@@ -111,7 +111,7 @@
* @return A SecurityContext / ExecutionContext instance
*/
public static ExecutionContext createSecurityContext(String vdbName, String vdbVersion, String user, Serializable trustedToken) {
- return new ExecutionContextImpl(vdbName, vdbVersion, user, trustedToken, null, "Connection", "ConnectorID<CDK>", "Request", "1", "0", false); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
+ return new ExecutionContextImpl(vdbName, vdbVersion, user, trustedToken, null, "Connection", "ConnectorID<CDK>", "Request", "1", "0"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
}
/**
@@ -122,7 +122,7 @@
* @return A SecurityContext / ExecutionContext instance
*/
public static ExecutionContext createExecutionContext(String requestID, String partID) {
- return new ExecutionContextImpl("vdb", "1", "user", null, null, "Connection", "ConnectorID<CDK>", requestID, partID, "0", false); //$NON-NLS-1$//$NON-NLS-2$//$NON-NLS-3$//$NON-NLS-4$ //$NON-NLS-5$ //$NON-NLS-6$
+ return new ExecutionContextImpl("vdb", "1", "user", null, null, "Connection", "ConnectorID<CDK>", requestID, partID, "0"); //$NON-NLS-1$//$NON-NLS-2$//$NON-NLS-3$//$NON-NLS-4$ //$NON-NLS-5$ //$NON-NLS-6$
}
/**
@@ -142,7 +142,7 @@
public static ExecutionContext createExecutionContext(String vdbName, String vdbVersion, String user,
Serializable trustedToken, Serializable executionPayload,
String connectionID, String connectorID, String requestID, String partID, boolean useResultSetCache) {
- return new ExecutionContextImpl(vdbName, vdbVersion, user, trustedToken, executionPayload, connectionID, connectorID, requestID, partID, "0", useResultSetCache); //$NON-NLS-1$
+ return new ExecutionContextImpl(vdbName, vdbVersion, user, trustedToken, executionPayload, connectionID, connectorID, requestID, partID, "0"); //$NON-NLS-1$
}
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -22,15 +22,27 @@
package org.teiid.dqp.internal.datamgr.impl;
+import org.teiid.connector.api.Connector;
+import org.teiid.connector.api.ConnectorException;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.comm.api.ResultsReceiver;
import com.metamatrix.core.util.Assertion;
import com.metamatrix.dqp.message.AtomicRequestMessage;
import com.metamatrix.dqp.message.AtomicResultsMessage;
+import com.metamatrix.query.metadata.QueryMetadataInterface;
public class AsynchConnectorWorkItem extends ConnectorWorkItem {
AsynchConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver) {
super(message, manager, resultsReceiver);
+ }
+
+ @Override
+ protected void createConnection(Connector connector,
+ QueryMetadataInterface queryMetadata) throws ConnectorException,
+ MetaMatrixComponentException {
+ super.createConnection(connector, queryMetadata);
Assertion.assertTrue(!this.isTransactional, "Asynch work items are not suitable for transactions"); //$NON-NLS-1$
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -58,6 +58,7 @@
import org.teiid.dqp.internal.process.DQPWorkContext;
import org.teiid.dqp.internal.transaction.TransactionProvider;
+import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.application.ApplicationEnvironment;
import com.metamatrix.common.application.ApplicationService;
import com.metamatrix.common.application.exception.ApplicationLifecycleException;
@@ -72,6 +73,7 @@
import com.metamatrix.core.util.ReflectionHelper;
import com.metamatrix.core.util.StringUtil;
import com.metamatrix.dqp.DQPPlugin;
+import com.metamatrix.dqp.ResourceFinder;
import com.metamatrix.dqp.internal.datamgr.ConnectorID;
import com.metamatrix.dqp.message.AtomicRequestID;
import com.metamatrix.dqp.message.AtomicRequestMessage;
@@ -103,11 +105,13 @@
private ConnectorID connectorID;
private WorkerPool connectorWorkerPool;
private ResultSetCache rsCache;
+ private ConnectorWorkItemFactory workItemFactory;
private String connectorName;
private int maxResultRows;
private boolean exceptionOnMaxRows = true;
private boolean synchWorkers;
private boolean isXa;
+ private boolean isImmutable;
//services acquired in start
private MetadataService metadataService;
@@ -127,13 +131,11 @@
public void initialize(Properties props) {
this.props = props;
+ this.isImmutable = PropertiesUtils.getBooleanProperty(props, ConnectorPropertyNames.IS_IMMUTABLE, false);
}
public boolean isImmutable() {
- if ( this.props == null ) {
- this.props = new Properties();
- }
- return PropertiesUtils.getBooleanProperty(props, ConnectorPropertyNames.IS_IMMUTABLE, false);
+ return isImmutable;
}
public ClassLoader getClassloader() {
@@ -160,7 +162,7 @@
"capabilities-request", //$NON-NLS-1$
connectorID.getID(),
requestID.toString(),
- "capabilities-request", "0", false); //$NON-NLS-1$ //$NON-NLS-2$
+ "capabilities-request", "0"); //$NON-NLS-1$ //$NON-NLS-2$
conn = connector.getConnection(context);
caps = conn.getCapabilities();
@@ -183,19 +185,14 @@
rsCache.clear();
}
}
-
+
public void executeRequest(ResultsReceiver<AtomicResultsMessage> receiver, AtomicRequestMessage message) {
// Set the connector ID to be used; if not already set.
AtomicRequestID atomicRequestId = message.getAtomicRequestID();
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {atomicRequestId, "Create State"}); //$NON-NLS-1$
+
+ ConnectorWorkItem item = workItemFactory.createWorkItem(message, receiver);
- ConnectorWorkItem item = null;
- if (synchWorkers) {
- item = new SynchConnectorWorkItem(message, this, receiver);
- } else {
- item = new AsynchConnectorWorkItem(message, this, receiver);
- }
-
Assertion.isNull(requestStates.put(atomicRequestId, item), "State already existed"); //$NON-NLS-1$
message.markProcessingStart();
enqueueRequest(item);
@@ -337,7 +334,19 @@
// Initialize and start the connector
initStartConnector(connectorEnv);
-
+ try {
+ //check result set cache
+ if(PropertiesUtils.getBooleanProperty(props, ConnectorPropertyNames.USE_RESULTSET_CACHE, false)) {
+ Properties rsCacheProps = new Properties();
+ rsCacheProps.setProperty(ResultSetCache.RS_CACHE_MAX_SIZE, props.getProperty(ConnectorPropertyNames.MAX_RESULTSET_CACHE_SIZE, "0")); //$NON-NLS-1$
+ rsCacheProps.setProperty(ResultSetCache.RS_CACHE_MAX_AGE, props.getProperty(ConnectorPropertyNames.MAX_RESULTSET_CACHE_AGE, "0")); //$NON-NLS-1$
+ rsCacheProps.setProperty(ResultSetCache.RS_CACHE_SCOPE, props.getProperty(ConnectorPropertyNames.RESULTSET_CACHE_SCOPE, ResultSetCache.RS_CACHE_SCOPE_VDB));
+ this.rsCache = createResultSetCache(rsCacheProps);
+ }
+ } catch (MetaMatrixComponentException e) {
+ throw new ApplicationLifecycleException(e);
+ }
+ this.workItemFactory = new ConnectorWorkItemFactory(this, this.rsCache, synchWorkers);
this.started = true;
}
@@ -457,6 +466,11 @@
}
return c;
}
+
+ protected ResultSetCache createResultSetCache(Properties rsCacheProps)
+ throws MetaMatrixComponentException {
+ return new ResultSetCache(rsCacheProps, ResourceFinder.getCacheFactory());
+ }
/**
* Queries the Connector Manager, if it already has been started.
@@ -644,6 +658,10 @@
this.classloader = classloader;
}
+ public void setWorkItemFactory(ConnectorWorkItemFactory workItemFactory) {
+ this.workItemFactory = workItemFactory;
+ }
+
/**
* Overloads the connector capabilities with one defined in the connector binding properties
*/
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -86,8 +86,8 @@
private List<Integer> convertToDesiredRuntimeType;
/* End state information */
- private boolean lastBatch;
- private int rowCount;
+ protected boolean lastBatch;
+ protected int rowCount;
protected enum RequestState {
NEW, MORE, CLOSE
@@ -100,33 +100,31 @@
private volatile boolean closeRequested;
private boolean isClosed;
- ResultsReceiver<AtomicResultsMessage> resultsReceiver;
+ protected ResultsReceiver<AtomicResultsMessage> resultsReceiver;
ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver) {
this.id = message.getAtomicRequestID();
this.requestMsg = message;
this.manager = manager;
this.resultsReceiver = resultsReceiver;
+ AtomicRequestID requestID = this.requestMsg.getAtomicRequestID();
+ this.securityContext = new ExecutionContextImpl(requestMsg.getWorkContext().getVdbName(),
+ requestMsg.getWorkContext().getVdbVersion(),
+ requestMsg.getWorkContext().getUserName(),
+ requestMsg.getWorkContext().getTrustedPayload(),
+ requestMsg.getExecutionPayload(),
+ requestMsg.getWorkContext().getConnectionID(),
+ requestMsg.getConnectorID().getID(),
+ requestMsg.getRequestID().toString(),
+ Integer.toString(requestID.getNodeID()),
+ Integer.toString(requestID.getExecutionId())
+ );
+ this.securityContext.setBatchSize(this.requestMsg.getFetchSize());
}
protected void createConnection(Connector connector, QueryMetadataInterface queryMetadata) throws ConnectorException, MetaMatrixComponentException {
LogManager.logTrace(LogConstants.CTX_CONNECTOR, new Object[] {id, "creating connection for atomic-request"}); //$NON-NLS-1$
- AtomicRequestID requestID = this.requestMsg.getAtomicRequestID();
- this.securityContext = new ExecutionContextImpl(requestMsg.getWorkContext().getVdbName(),
- requestMsg.getWorkContext().getVdbVersion(),
- requestMsg.getWorkContext().getUserName(),
- requestMsg.getWorkContext().getTrustedPayload(),
- requestMsg.getExecutionPayload(),
- requestMsg.getWorkContext().getConnectionID(),
- requestMsg.getConnectorID().getID(),
- requestMsg.getRequestID().toString(),
- Integer.toString(requestID.getNodeID()),
- Integer.toString(requestID.getExecutionId()),
- requestMsg.useResultSetCache()
- && (requestMsg.getCommand()).areResultsCachable()
- );
- this.securityContext.setBatchSize(this.requestMsg.getFetchSize());
-
+
if (requestMsg.isTransactional()){
if (manager.isXa()) {
connection = ((XAConnector)connector).getXAConnection(this.securityContext, requestMsg.getTransactionContext());
@@ -273,7 +271,7 @@
this.resultsReceiver.receiveResults(response);
}
- protected void processNewRequest() throws ConnectorException, CommunicationException {
+ protected void processNewRequest() throws ConnectorException {
// Execute query
this.execution.execute();
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Executed command"}); //$NON-NLS-1$
@@ -404,34 +402,39 @@
if (row != null) {
correctTypes(row);
rows.add(row);
+ this.rowCount++;
}
}
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Obtained last batch, total row count:", rowCount}); //$NON-NLS-1$
}
if (sendResults) {
- int currentRowCount = rows.size();
- if ( !lastBatch && currentRowCount == 0 ) {
- // Defect 13366 - Should send all batches, even if they're zero size.
- // Log warning if received a zero-size non-last batch from the connector.
- LogManager.logWarning(LogConstants.CTX_CONNECTOR, DQPPlugin.Util.getString("ConnectorWorker.zero_size_non_last_batch", requestMsg.getConnectorID())); //$NON-NLS-1$
- }
-
- AtomicResultsMessage response = createResultsMessage(this.requestMsg, rows.toArray(new List[currentRowCount]), requestMsg.getCommand().getProjectedSymbols());
-
- // if we need to keep the execution alive, then we can not support
- // implicit close.
- response.setSupportsImplicitClose(!this.securityContext.keepExecutionAlive());
- response.setTransactional(this.securityContext.isTransactional());
- response.setWarnings(this.securityContext.getWarnings());
-
- if ( lastBatch ) {
- response.setFinalRow(rowCount);
- }
- this.resultsReceiver.receiveResults(response);
+ sendResults(rows);
}
}
+ protected void sendResults(List<List> rows) {
+ int currentRowCount = rows.size();
+ if ( !lastBatch && currentRowCount == 0 ) {
+ // Defect 13366 - Should send all batches, even if they're zero size.
+ // Log warning if received a zero-size non-last batch from the connector.
+ LogManager.logWarning(LogConstants.CTX_CONNECTOR, DQPPlugin.Util.getString("ConnectorWorker.zero_size_non_last_batch", requestMsg.getConnectorID())); //$NON-NLS-1$
+ }
+
+ AtomicResultsMessage response = createResultsMessage(this.requestMsg, rows.toArray(new List[currentRowCount]), requestMsg.getCommand().getProjectedSymbols());
+
+ // if we need to keep the execution alive, then we can not support
+ // implicit close.
+ response.setSupportsImplicitClose(!this.securityContext.keepExecutionAlive());
+ response.setTransactional(this.securityContext.isTransactional());
+ response.setWarnings(this.securityContext.getWarnings());
+
+ if ( lastBatch ) {
+ response.setFinalRow(rowCount);
+ }
+ this.resultsReceiver.receiveResults(response);
+ }
+
private void correctTypes(List row) throws ConnectorException {
//TODO: add a proper source schema
for (int i = convertToRuntimeType.size() - 1; i >= 0; i--) {
Added: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java (rev 0)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -0,0 +1,173 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.datamgr.impl;
+
+import java.util.Arrays;
+
+import org.teiid.connector.api.ConnectorException;
+import org.teiid.dqp.internal.cache.CacheID;
+import org.teiid.dqp.internal.cache.CacheResults;
+import org.teiid.dqp.internal.cache.ResultSetCache;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.comm.api.ResultsReceiver;
+import com.metamatrix.common.log.LogManager;
+import com.metamatrix.dqp.message.AtomicRequestID;
+import com.metamatrix.dqp.message.AtomicRequestMessage;
+import com.metamatrix.dqp.message.AtomicResultsMessage;
+import com.metamatrix.dqp.util.LogConstants;
+import com.metamatrix.query.sql.lang.Command;
+
+public class ConnectorWorkItemFactory {
+
+ private final static char DELIMITER = '.';
+
+ private ResultSetCache rsCache;
+ private ConnectorManager manager;
+ private boolean synchWorkers;
+
+ /**
+ * A work item that can get results from cache.
+ */
+ private final class CachedResultsConnectorWorkItem extends
+ AsynchConnectorWorkItem {
+ private final CacheID cacheID;
+
+ private CachedResultsConnectorWorkItem(AtomicRequestMessage message,
+ ConnectorManager manager,
+ ResultsReceiver<AtomicResultsMessage> resultsReceiver,
+ CacheID cacheID) {
+ super(message, manager, resultsReceiver);
+ this.cacheID = cacheID;
+ }
+
+ @Override
+ protected void createExecution()
+ throws MetaMatrixComponentException,
+ ConnectorException {
+ }
+
+ @Override
+ protected void processNewRequest() throws ConnectorException {
+ handleBatch();
+ }
+
+ @Override
+ protected void handleBatch() throws ConnectorException {
+ int firstRow = rowCount + 1;
+ //already in cache
+ CacheResults results = rsCache.getResults(cacheID, new int[]{firstRow, firstRow + requestMsg.getFetchSize() -1});
+ this.rowCount = rowCount + results.getResults().length;
+ if(results.isFinal()){
+ this.lastBatch = true;
+ }
+
+ LogManager.logTrace(LogConstants.CTX_DQP,
+ new Object[] { "CacheSynchQueryExecution - returnning batch from cache, startRow =", //$NON-NLS-1$
+ new Integer(firstRow),
+ ", endRow =", //$NON-NLS-1$
+ new Integer(rowCount)});
+ sendResults(Arrays.asList(results.getResults()));
+ }
+ }
+
+ /**
+ * Intercepts results for cachable commands
+ */
+ public class CachedResultsReceiver implements ResultsReceiver<AtomicResultsMessage> {
+
+ private ResultsReceiver<AtomicResultsMessage> actual;
+ private AtomicRequestID requestId;
+ private CacheID cacheID;
+ private int firstRow = 1;
+
+ public CachedResultsReceiver(ResultsReceiver<AtomicResultsMessage> actual,
+ CacheID cacheID, AtomicRequestID requestId) {
+ this.actual = actual;
+ this.cacheID = cacheID;
+ this.requestId = requestId;
+ }
+
+ @Override
+ public void receiveResults(AtomicResultsMessage results) {
+ boolean isFinal = results.getFinalRow() >= 0;
+ if (results.isRequestClosed()) {
+ rsCache.removeTempResults(cacheID);
+ } else {
+ CacheResults cr = new CacheResults(results.getResults(), firstRow, isFinal);
+ firstRow += results.getResults().length;
+ rsCache.setResults(cacheID, cr, requestId);
+ }
+ actual.receiveResults(results);
+ }
+
+ @Override
+ public void exceptionOccurred(Throwable e) {
+ rsCache.removeTempResults(cacheID);
+ actual.exceptionOccurred(e);
+ }
+
+ }
+
+ public ConnectorWorkItemFactory(ConnectorManager manager,
+ ResultSetCache rsCache, boolean synchWorkers) {
+ this.manager = manager;
+ this.rsCache = rsCache;
+ this.synchWorkers = synchWorkers;
+ }
+
+ public ConnectorWorkItem createWorkItem(AtomicRequestMessage message, ResultsReceiver<AtomicResultsMessage> receiver) {
+ if (this.rsCache != null && message.useResultSetCache()) {
+ final CacheID cacheID = createCacheID(message);
+
+ if (cacheID != null) {
+ if (rsCache.hasResults(cacheID)) {
+ return new CachedResultsConnectorWorkItem(message, manager,
+ receiver, cacheID);
+ }
+ receiver = new CachedResultsReceiver(receiver, cacheID, message.getAtomicRequestID());
+ }
+ }
+
+ if (synchWorkers) {
+ return new SynchConnectorWorkItem(message, manager, receiver);
+ }
+ return new AsynchConnectorWorkItem(message, manager, receiver);
+ }
+
+ private CacheID createCacheID(AtomicRequestMessage message) {
+ Command command = message.getCommand();
+ if (!command.areResultsCachable()) {
+ return null;
+ }
+ String scope = rsCache.getCacheScope();
+ String scopeId = null;
+ if(ResultSetCache.RS_CACHE_SCOPE_VDB.equalsIgnoreCase(scope)){
+ scopeId = message.getWorkContext().getVdbName() + DELIMITER + message.getWorkContext().getVdbVersion();
+ }else{
+ scopeId = message.getWorkContext().getConnectionID();
+ }
+ return new CacheID(scopeId, command.toString());
+ }
+
+}
Property changes on: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ExecutionContextImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ExecutionContextImpl.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ExecutionContextImpl.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -55,8 +55,6 @@
private Serializable executionPayload;
// ID of the parent JDBC Connection which is executing the statement
private String requestConnectionID;
- // uses the result set chache or not
- private boolean useResultSetCache;
// Execute count of the query
private String executeCount;
// keep the execution object alive during the processing. default:false
@@ -71,7 +69,7 @@
public ExecutionContextImpl(String vdbName, String vdbVersion, String userName,
Serializable trustedPayload, Serializable executionPayload,
- String originalConnectionID, String connectorId, String requestId, String partId, String execCount, boolean useResultSetCache) {
+ String originalConnectionID, String connectorId, String requestId, String partId, String execCount) {
this.vdbName = vdbName;
this.vdbVersion = vdbVersion;
@@ -83,7 +81,6 @@
this.partID = partId;
this.requestConnectionID = originalConnectionID;
this.executeCount = execCount;
- this.useResultSetCache = useResultSetCache;
}
public String getConnectorIdentifier() {
@@ -124,10 +121,6 @@
return requestConnectionID;
}
- public boolean useResultSetCache() {
- return useResultSetCache;
- }
-
public void keepExecutionAlive(boolean alive) {
this.keepAlive = alive;
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeConnector.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeConnector.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeConnector.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -60,6 +60,17 @@
private long simulatedBatchRetrievalTime = 1000L;
private ClassLoader classloader;
+ private int connectionCount;
+ private int executionCount;
+
+ public int getConnectionCount() {
+ return connectionCount;
+ }
+
+ public int getExecutionCount() {
+ return executionCount;
+ }
+
@Override
public Connection getConnection(org.teiid.connector.api.ExecutionContext context) throws ConnectorException {
return new FakeConnection();
@@ -78,8 +89,14 @@
}
private class FakeConnection extends BasicConnection implements XAConnection {
+
+ public FakeConnection() {
+ connectionCount++;
+ }
+
public boolean released = false;
public Execution createExecution(ICommand command, ExecutionContext executionContext, RuntimeMetadata metadata) throws ConnectorException {
+ executionCount++;
return new FakeBlockingExecution(executionContext);
}
public ConnectorCapabilities getCapabilities() {
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeExecutionContextImpl.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeExecutionContextImpl.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeExecutionContextImpl.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -46,8 +46,7 @@
"ConnectorID"+unique, //$NON-NLS-1$
"RequestID"+unique, //$NON-NLS-1$
"PartID"+unique, //$NON-NLS-1$
- "ExecCount"+unique, //$NON-NLS-1$
- false);
+ "ExecCount"+unique);
}
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -26,23 +26,31 @@
*/
package org.teiid.dqp.internal.datamgr.impl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import junit.framework.TestCase;
-
+import org.junit.Test;
import org.mockito.Mockito;
import org.teiid.connector.internal.ConnectorPropertyNames;
-import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
+import org.teiid.dqp.internal.cache.ResultSetCache;
import org.teiid.dqp.internal.datamgr.impl.TestConnectorWorkItem.QueueResultsReceiver;
import org.teiid.dqp.internal.pooling.connector.FakeSourceConnectionFactory;
import org.teiid.dqp.internal.process.DQPWorkContext;
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.cache.FakeCache;
import com.metamatrix.common.application.ApplicationEnvironment;
import com.metamatrix.common.application.exception.ApplicationLifecycleException;
import com.metamatrix.dqp.message.AtomicRequestMessage;
+import com.metamatrix.dqp.message.AtomicResultsMessage;
import com.metamatrix.dqp.service.DQPServiceNames;
import com.metamatrix.dqp.service.FakeMetadataService;
import com.metamatrix.query.optimizer.capabilities.SourceCapabilities;
@@ -51,18 +59,7 @@
/**
* JUnit test for TestConnectorManagerImpl
*/
-public final class TestConnectorManagerImpl extends TestCase {
- // =========================================================================
- // F R A M E W O R K
- // =========================================================================
- /**
- * Constructor for TestConnectorManagerImpl.
- * @param name
- */
- public TestConnectorManagerImpl(final String name) {
- super(name);
- }
-
+public final class TestConnectorManagerImpl {
private Properties helpGetAppProps() {
Properties appProperties = new Properties();
@@ -77,7 +74,7 @@
// T E S T C A S E S
// =========================================================================
- public void testStartFailsWithNullRequiredProp() throws Exception {
+ @Test public void testStartFailsWithNullRequiredProp() throws Exception {
ConnectorManager cm = new ConnectorManager();
Properties appProperties = helpGetAppProps();
// Remove required property
@@ -92,7 +89,7 @@
}
}
- public void testReceive() throws Exception {
+ @Test public void testReceive() throws Exception {
ConnectorManager cm = new ConnectorManager();
startConnectorManager(cm, helpGetAppProps());
@@ -103,7 +100,7 @@
cm.stop();
}
- public void testConnectorCapabilitiesOverride() throws Exception {
+ @Test public void testConnectorCapabilitiesOverride() throws Exception {
ConnectorManager cm = new ConnectorManager();
startConnectorManager(cm, helpGetAppProps());
@@ -134,7 +131,7 @@
cm.start(env);
}
- public void testIsXA() throws Exception {
+ @Test public void testIsXA() throws Exception {
ConnectorManager cm = new ConnectorManager();
Properties props = new Properties();
props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS, FakeConnector.class.getName());
@@ -144,7 +141,7 @@
cm.stop();
}
- public void testIsXA_Failure() throws Exception {
+ @Test public void testIsXA_Failure() throws Exception {
ConnectorManager cm = new ConnectorManager();
Properties props = new Properties();
props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS, FakeSourceConnectionFactory.class.getName());
@@ -157,7 +154,51 @@
cm.stop();
}
- public void testDefect19049() throws Exception {
+ @Test public void testCaching() throws Exception {
+ ConnectorManager cm = new ConnectorManager() {
+ @Override
+ protected ResultSetCache createResultSetCache(
+ Properties rsCacheProps)
+ throws MetaMatrixComponentException {
+ assertEquals(rsCacheProps.get(ResultSetCache.RS_CACHE_MAX_AGE), String.valueOf(0));
+ return new ResultSetCache(rsCacheProps, new FakeCache.FakeCacheFactory());
+ }
+ };
+ Properties props = new Properties();
+ props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS, FakeConnector.class.getName());
+ props.setProperty(ConnectorPropertyNames.USE_RESULTSET_CACHE, Boolean.TRUE.toString());
+ startConnectorManager(cm, props);
+ ConnectorWrapper wrapper = cm.getConnector();
+ FakeConnector fc = (FakeConnector)wrapper.getActualConnector();
+ assertEquals(0, fc.getConnectionCount());
+ assertEquals(0, fc.getExecutionCount());
+ AtomicRequestMessage request = TestConnectorWorkItem.createNewAtomicRequestMessage(1, 1);
+ request.setUseResultSetCache(true);
+ QueueResultsReceiver receiver = new QueueResultsReceiver();
+ cm.executeRequest(receiver, request);
+ AtomicResultsMessage arm = receiver.getResults().poll(1000, TimeUnit.MILLISECONDS);
+ assertEquals(-1, arm.getFinalRow());
+ //get the last batch - it will be 0 sized
+ cm.requstMore(request.getAtomicRequestID());
+ assertNotNull(receiver.getResults().poll(1000, TimeUnit.MILLISECONDS));
+ cm.closeRequest(request.getAtomicRequestID());
+ assertEquals(1, fc.getConnectionCount());
+ assertEquals(1, fc.getExecutionCount());
+
+ //this request should hit the cache
+ AtomicRequestMessage request1 = TestConnectorWorkItem.createNewAtomicRequestMessage(2, 1);
+ request1.setUseResultSetCache(true);
+ QueueResultsReceiver receiver1 = new QueueResultsReceiver();
+ cm.executeRequest(receiver1, request1);
+ arm = receiver1.getResults().poll(1000, TimeUnit.MILLISECONDS);
+ assertEquals(5, arm.getFinalRow());
+ assertEquals(1, fc.getConnectionCount());
+ assertEquals(1, fc.getExecutionCount());
+
+ cm.stop();
+ }
+
+ @Test public void testDefect19049() throws Exception {
ConnectorManager cm = new ConnectorManager();
Properties props = new Properties();
final String connectorName = FakeConnector.class.getName();
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorStateManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorStateManager.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorStateManager.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -66,6 +66,7 @@
csm = new ConnectorManager();
csm.setConnectorWorkerPool(Mockito.mock(WorkerPool.class));
csm.setConnector(new ConnectorWrapper(new FakeConnector()));
+ csm.setWorkItemFactory(new ConnectorWorkItemFactory(csm, null, true));
}
void helpAssureOneState() {
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestExecutionContextImpl.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestExecutionContextImpl.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestExecutionContextImpl.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -38,7 +38,7 @@
public ExecutionContextImpl createContext(String requestID, String partID) {
return new ExecutionContextImpl("vdb", "1", "user", null, null, //$NON-NLS-1$//$NON-NLS-2$ //$NON-NLS-3$
- "Connection", "Connector", requestID, partID, "0", false); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ "Connection", "Connector", requestID, partID, "0"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
public void testEqivalenceSemanticsSame() {
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -71,7 +71,7 @@
}
public static ExecutionContext createContext(final String user, boolean userIdentity) {
- ExecutionContextImpl context = new ExecutionContextImpl(null, null, user, null, null, null, null, null, null, null, false);
+ ExecutionContextImpl context = new ExecutionContextImpl(null, null, user, null, null, null, null, null, null, null);
if (userIdentity) {
context.setConnectorIdentity(new MappedUserIdentity(context.getUser(), null, null));
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPerUserPool.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPerUserPool.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPerUserPool.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -111,7 +111,7 @@
}
// session payload
- ExecutionContextImpl impl = new ExecutionContextImpl(null, null, null, credentials, null, null, null, null, null, null, false);
+ ExecutionContextImpl impl = new ExecutionContextImpl(null, null, null, credentials, null, null, null, null, null, null);
impl.setConnectorIdentity(factory.createIdentity(impl));
return impl;
}
Modified: trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/extension/TestSQLConversionVisitor.java
===================================================================
--- trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/extension/TestSQLConversionVisitor.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/extension/TestSQLConversionVisitor.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -74,8 +74,7 @@
"ExecutionPayload", //$NON-NLS-1$
"ConnectionID", //$NON-NLS-1$
"Connector",
- "RequestID", "PartID", "ExecCount", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
- false);
+ "RequestID", "PartID", "ExecCount");
/**
* Constructor for TestSQLConversionVisitor.
* @param name
Modified: trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/oracle/TestOracleSQLConversionVisitor.java
===================================================================
--- trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/oracle/TestOracleSQLConversionVisitor.java 2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/oracle/TestOracleSQLConversionVisitor.java 2009-03-19 19:11:15 UTC (rev 593)
@@ -407,7 +407,7 @@
String output = "SELECT /*+ ALL_ROWS */ PARTS.PART_NAME, ROWNUM FROM PARTS"; //$NON-NLS-1$
String hint = "/*+ ALL_ROWS */"; //$NON-NLS-1$
- ExecutionContext context = new ExecutionContextImpl(null, null, null, null, hint, null, "", null, null, null, false); //$NON-NLS-1$
+ ExecutionContext context = new ExecutionContextImpl(null, null, null, null, hint, null, "", null, null, null); //$NON-NLS-1$
helpTestVisitor(getTestVDB(),
input,
More information about the teiid-commits
mailing list