[teiid-commits] teiid SVN: r593 - in trunk: connector-api/src/main/java/org/teiid/connector/internal and 7 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Mar 19 15:11:15 EDT 2009


Author: shawkins
Date: 2009-03-19 15:11:15 -0400 (Thu, 19 Mar 2009)
New Revision: 593

Added:
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java
Modified:
   trunk/connector-api/src/main/java/org/teiid/connector/api/ExecutionContext.java
   trunk/connector-api/src/main/java/org/teiid/connector/internal/ConnectorPropertyNames.java
   trunk/connector-metadata/src/test/java/com/metamatrix/connector/metadata/TestIndexConnector.java
   trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/ConnectorHost.java
   trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/EnvironmentUtility.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ExecutionContextImpl.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeConnector.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeExecutionContextImpl.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorStateManager.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestExecutionContextImpl.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPerUserPool.java
   trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/extension/TestSQLConversionVisitor.java
   trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/oracle/TestOracleSQLConversionVisitor.java
Log:
TEIID-164 adding back connector resultset caching.

Modified: trunk/connector-api/src/main/java/org/teiid/connector/api/ExecutionContext.java
===================================================================
--- trunk/connector-api/src/main/java/org/teiid/connector/api/ExecutionContext.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/connector-api/src/main/java/org/teiid/connector/api/ExecutionContext.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -130,12 +130,6 @@
     String getConnectionIdentifier();
     
     /**
-     * Whether to use ResultSet cache if it is enabled.
-     * @return True if use ResultSet cache; false otherwise.
-     */
-    boolean useResultSetCache();
-    
-    /**
      * When the execution is turned on with "alive=true", the execution object will not
      * be implicitly closed at the end of the last batch.  It will only be closed at end
      * of the user query. This is useful in keeping the connection open for 

Modified: trunk/connector-api/src/main/java/org/teiid/connector/internal/ConnectorPropertyNames.java
===================================================================
--- trunk/connector-api/src/main/java/org/teiid/connector/internal/ConnectorPropertyNames.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/connector-api/src/main/java/org/teiid/connector/internal/ConnectorPropertyNames.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -104,6 +104,11 @@
     
     public static final String ADMIN_CONNECTIONS_ALLOWED = "AdminConnectionsAllowed"; //$NON-NLS-1$
 
+    public static final String USE_RESULTSET_CACHE = "ResultSetCacheEnabled"; //$NON-NLS-1$
+    public static final String MAX_RESULTSET_CACHE_SIZE = "ResultSetCacheMaxSize"; //$NON-NLS-1$
+    public static final String MAX_RESULTSET_CACHE_AGE = "ResultSetCacheMaxAge"; //$NON-NLS-1$
+    public static final String RESULTSET_CACHE_SCOPE = "ResultSetCacheScope"; //$NON-NLS-1$
+    
     /**
      * This property can be used to bypass the normal logic that throws an exception when a command
      * is about to be executed by a non-XA compatible connector, but there is a global transaction.

Modified: trunk/connector-metadata/src/test/java/com/metamatrix/connector/metadata/TestIndexConnector.java
===================================================================
--- trunk/connector-metadata/src/test/java/com/metamatrix/connector/metadata/TestIndexConnector.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/connector-metadata/src/test/java/com/metamatrix/connector/metadata/TestIndexConnector.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -163,7 +163,7 @@
     }
     
     private ExecutionContext helpGetSecurityContext() {
-        return new ExecutionContextImpl("testname", "1", null, null, null, null, null, null, null, null, false); //$NON-NLS-1$ //$NON-NLS-2$
+        return new ExecutionContextImpl("testname", "1", null, null, null, null, null, null, null, null); //$NON-NLS-1$ //$NON-NLS-2$
     }
     /* 
      * @see junit.framework.TestCase#tearDown()

Modified: trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/ConnectorHost.java
===================================================================
--- trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/ConnectorHost.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/ConnectorHost.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -128,7 +128,7 @@
     }
     
     public void setSecurityContext(String vdbName, String vdbVersion, String userName, Serializable trustedPayload, Serializable executionPayload) {          
-        this.executionContext = new ExecutionContextImpl(vdbName, vdbVersion, userName, trustedPayload, executionPayload, "Connection", "Connector<CDK>", "Request", "1", "0", false); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$  
+        this.executionContext = new ExecutionContextImpl(vdbName, vdbVersion, userName, trustedPayload, executionPayload, "Connection", "Connector<CDK>", "Request", "1", "0"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$  
     }
     
     public void setExecutionContext(ExecutionContext context) {

Modified: trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/EnvironmentUtility.java
===================================================================
--- trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/EnvironmentUtility.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/connector-sdk/src/main/java/com/metamatrix/cdk/api/EnvironmentUtility.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -98,7 +98,7 @@
      * @return A SecurityContext / ExecutionContext instance
      */
     public static ExecutionContext createSecurityContext(String user) {
-        return new ExecutionContextImpl("vdb", "1", user, null, null, "Connection", "ConnectorID<CDK>", "Request", "1", "0", false);  //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$ //$NON-NLS-6$ //$NON-NLS-7$
+        return new ExecutionContextImpl("vdb", "1", user, null, null, "Connection", "ConnectorID<CDK>", "Request", "1", "0");  //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$ //$NON-NLS-6$ //$NON-NLS-7$
     }
 
     /**
@@ -111,7 +111,7 @@
      * @return A SecurityContext / ExecutionContext instance
      */
     public static ExecutionContext createSecurityContext(String vdbName, String vdbVersion, String user, Serializable trustedToken) {
-        return new ExecutionContextImpl(vdbName, vdbVersion, user, trustedToken, null, "Connection", "ConnectorID<CDK>", "Request", "1", "0", false);  //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
+        return new ExecutionContextImpl(vdbName, vdbVersion, user, trustedToken, null, "Connection", "ConnectorID<CDK>", "Request", "1", "0");  //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
     }
     
     /**
@@ -122,7 +122,7 @@
      * @return A SecurityContext / ExecutionContext instance
      */
     public static ExecutionContext createExecutionContext(String requestID, String partID) {
-        return new ExecutionContextImpl("vdb", "1", "user", null, null, "Connection", "ConnectorID<CDK>", requestID, partID, "0", false);   //$NON-NLS-1$//$NON-NLS-2$//$NON-NLS-3$//$NON-NLS-4$ //$NON-NLS-5$ //$NON-NLS-6$
+        return new ExecutionContextImpl("vdb", "1", "user", null, null, "Connection", "ConnectorID<CDK>", requestID, partID, "0");   //$NON-NLS-1$//$NON-NLS-2$//$NON-NLS-3$//$NON-NLS-4$ //$NON-NLS-5$ //$NON-NLS-6$
     }
 
     /**
@@ -142,7 +142,7 @@
     public static ExecutionContext createExecutionContext(String vdbName, String vdbVersion, String user,
                                                         Serializable trustedToken, Serializable executionPayload,                                                         
 														String connectionID, String connectorID, String requestID, String partID, boolean useResultSetCache) {
-        return new ExecutionContextImpl(vdbName, vdbVersion, user, trustedToken, executionPayload, connectionID, connectorID, requestID, partID, "0", useResultSetCache); //$NON-NLS-1$
+        return new ExecutionContextImpl(vdbName, vdbVersion, user, trustedToken, executionPayload, connectionID, connectorID, requestID, partID, "0"); //$NON-NLS-1$
     }
 
 }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/AsynchConnectorWorkItem.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -22,15 +22,27 @@
 
 package org.teiid.dqp.internal.datamgr.impl;
 
+import org.teiid.connector.api.Connector;
+import org.teiid.connector.api.ConnectorException;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.common.comm.api.ResultsReceiver;
 import com.metamatrix.core.util.Assertion;
 import com.metamatrix.dqp.message.AtomicRequestMessage;
 import com.metamatrix.dqp.message.AtomicResultsMessage;
+import com.metamatrix.query.metadata.QueryMetadataInterface;
 
 public class AsynchConnectorWorkItem extends ConnectorWorkItem {
                 
     AsynchConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver) {
     	super(message, manager, resultsReceiver);
+    }
+    
+    @Override
+    protected void createConnection(Connector connector,
+    		QueryMetadataInterface queryMetadata) throws ConnectorException,
+    		MetaMatrixComponentException {
+    	super.createConnection(connector, queryMetadata);
     	Assertion.assertTrue(!this.isTransactional, "Asynch work items are not suitable for transactions"); //$NON-NLS-1$
     }
     

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -58,6 +58,7 @@
 import org.teiid.dqp.internal.process.DQPWorkContext;
 import org.teiid.dqp.internal.transaction.TransactionProvider;
 
+import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.common.application.ApplicationEnvironment;
 import com.metamatrix.common.application.ApplicationService;
 import com.metamatrix.common.application.exception.ApplicationLifecycleException;
@@ -72,6 +73,7 @@
 import com.metamatrix.core.util.ReflectionHelper;
 import com.metamatrix.core.util.StringUtil;
 import com.metamatrix.dqp.DQPPlugin;
+import com.metamatrix.dqp.ResourceFinder;
 import com.metamatrix.dqp.internal.datamgr.ConnectorID;
 import com.metamatrix.dqp.message.AtomicRequestID;
 import com.metamatrix.dqp.message.AtomicRequestMessage;
@@ -103,11 +105,13 @@
     private ConnectorID connectorID;
     private WorkerPool connectorWorkerPool;
     private ResultSetCache rsCache;
+    private ConnectorWorkItemFactory workItemFactory;
 	private String connectorName;
     private int maxResultRows;
     private boolean exceptionOnMaxRows = true;
     private boolean synchWorkers;
     private boolean isXa;
+    private boolean isImmutable;
 
     //services acquired in start
     private MetadataService metadataService;
@@ -127,13 +131,11 @@
     
     public void initialize(Properties props) {
     	this.props = props;
+    	this.isImmutable = PropertiesUtils.getBooleanProperty(props, ConnectorPropertyNames.IS_IMMUTABLE, false);
     }
     
     public boolean isImmutable() {
-    	if ( this.props == null ) {
-    		this.props = new Properties();
-    	}
-        return PropertiesUtils.getBooleanProperty(props, ConnectorPropertyNames.IS_IMMUTABLE, false);
+        return isImmutable;
     }
 
     public ClassLoader getClassloader() {
@@ -160,7 +162,7 @@
                         "capabilities-request", //$NON-NLS-1$
                         connectorID.getID(), 
                         requestID.toString(), 
-                        "capabilities-request", "0", false); //$NON-NLS-1$ //$NON-NLS-2$ 
+                        "capabilities-request", "0"); //$NON-NLS-1$ //$NON-NLS-2$ 
 
             	conn = connector.getConnection(context);
             	caps = conn.getCapabilities();
@@ -183,19 +185,14 @@
         	rsCache.clear();
         }
     }
-        
+     
     public void executeRequest(ResultsReceiver<AtomicResultsMessage> receiver, AtomicRequestMessage message) {
         // Set the connector ID to be used; if not already set. 
     	AtomicRequestID atomicRequestId = message.getAtomicRequestID();
     	LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {atomicRequestId, "Create State"}); //$NON-NLS-1$
+
+    	ConnectorWorkItem item = workItemFactory.createWorkItem(message, receiver);
     	
-    	ConnectorWorkItem item = null;
-    	if (synchWorkers) {
-    		item = new SynchConnectorWorkItem(message, this, receiver);
-    	} else {
-    		item = new AsynchConnectorWorkItem(message, this, receiver);
-    	}
-
         Assertion.isNull(requestStates.put(atomicRequestId, item), "State already existed"); //$NON-NLS-1$
         message.markProcessingStart();
         enqueueRequest(item);
@@ -337,7 +334,19 @@
 
         // Initialize and start the connector
         initStartConnector(connectorEnv);
-
+        try {
+            //check result set cache
+            if(PropertiesUtils.getBooleanProperty(props, ConnectorPropertyNames.USE_RESULTSET_CACHE, false)) {
+	            Properties rsCacheProps = new Properties();
+	        	rsCacheProps.setProperty(ResultSetCache.RS_CACHE_MAX_SIZE, props.getProperty(ConnectorPropertyNames.MAX_RESULTSET_CACHE_SIZE, "0")); //$NON-NLS-1$
+	        	rsCacheProps.setProperty(ResultSetCache.RS_CACHE_MAX_AGE, props.getProperty(ConnectorPropertyNames.MAX_RESULTSET_CACHE_AGE, "0")); //$NON-NLS-1$
+	        	rsCacheProps.setProperty(ResultSetCache.RS_CACHE_SCOPE, props.getProperty(ConnectorPropertyNames.RESULTSET_CACHE_SCOPE, ResultSetCache.RS_CACHE_SCOPE_VDB)); 
+	    		this.rsCache = createResultSetCache(rsCacheProps);
+            }
+		} catch (MetaMatrixComponentException e) {
+			throw new ApplicationLifecycleException(e);
+		}
+		this.workItemFactory = new ConnectorWorkItemFactory(this, this.rsCache, synchWorkers);
         this.started = true;
     }
     
@@ -457,6 +466,11 @@
         }         
         return c;
     }
+    
+    protected ResultSetCache createResultSetCache(Properties rsCacheProps)
+			throws MetaMatrixComponentException {
+		return new ResultSetCache(rsCacheProps, ResourceFinder.getCacheFactory());
+	}
 
     /**
      * Queries the Connector Manager, if it already has been started. 
@@ -644,6 +658,10 @@
 		this.classloader = classloader;
 	}
 	
+	public void setWorkItemFactory(ConnectorWorkItemFactory workItemFactory) {
+		this.workItemFactory = workItemFactory;
+	}
+	
 	/**
 	 * Overloads the connector capabilities with one defined in the connector binding properties
 	 */

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -86,8 +86,8 @@
     private List<Integer> convertToDesiredRuntimeType;
         
     /* End state information */    
-    private boolean lastBatch;
-    private int rowCount;
+    protected boolean lastBatch;
+    protected int rowCount;
     
     protected enum RequestState {
     	NEW, MORE, CLOSE
@@ -100,33 +100,31 @@
     private volatile boolean closeRequested;
     private boolean isClosed;
 
-    ResultsReceiver<AtomicResultsMessage> resultsReceiver;
+    protected ResultsReceiver<AtomicResultsMessage> resultsReceiver;
     
     ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager, ResultsReceiver<AtomicResultsMessage> resultsReceiver) {
         this.id = message.getAtomicRequestID();
         this.requestMsg = message;
         this.manager = manager;
         this.resultsReceiver = resultsReceiver;
+        AtomicRequestID requestID = this.requestMsg.getAtomicRequestID();
+        this.securityContext = new ExecutionContextImpl(requestMsg.getWorkContext().getVdbName(),
+                requestMsg.getWorkContext().getVdbVersion(),
+                requestMsg.getWorkContext().getUserName(),
+                requestMsg.getWorkContext().getTrustedPayload(),
+                requestMsg.getExecutionPayload(),                                                                       
+                requestMsg.getWorkContext().getConnectionID(),                                                                      
+                requestMsg.getConnectorID().getID(),
+                requestMsg.getRequestID().toString(),
+                Integer.toString(requestID.getNodeID()),
+                Integer.toString(requestID.getExecutionId())
+                );
+        this.securityContext.setBatchSize(this.requestMsg.getFetchSize());
     }
 
     protected void createConnection(Connector connector, QueryMetadataInterface queryMetadata) throws ConnectorException, MetaMatrixComponentException {
         LogManager.logTrace(LogConstants.CTX_CONNECTOR, new Object[] {id, "creating connection for atomic-request"});  //$NON-NLS-1$
-        AtomicRequestID requestID = this.requestMsg.getAtomicRequestID();
-        this.securityContext = new ExecutionContextImpl(requestMsg.getWorkContext().getVdbName(),
-                                                       requestMsg.getWorkContext().getVdbVersion(),
-                                                       requestMsg.getWorkContext().getUserName(),
-                                                       requestMsg.getWorkContext().getTrustedPayload(),
-                                                       requestMsg.getExecutionPayload(),                                                                       
-                                                       requestMsg.getWorkContext().getConnectionID(),                                                                      
-                                                       requestMsg.getConnectorID().getID(),
-                                                       requestMsg.getRequestID().toString(),
-                                                       Integer.toString(requestID.getNodeID()),
-                                                       Integer.toString(requestID.getExecutionId()),
-                                                       requestMsg.useResultSetCache()
-                                                       && (requestMsg.getCommand()).areResultsCachable()
-                                                       ); 
-        this.securityContext.setBatchSize(this.requestMsg.getFetchSize());
-
+         
         if (requestMsg.isTransactional()){
         	if (manager.isXa()) {
 	    		connection = ((XAConnector)connector).getXAConnection(this.securityContext, requestMsg.getTransactionContext());
@@ -273,7 +271,7 @@
 		this.resultsReceiver.receiveResults(response);
 	}
     
-    protected void processNewRequest() throws ConnectorException, CommunicationException {
+    protected void processNewRequest() throws ConnectorException {
     	// Execute query
     	this.execution.execute();
         LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Executed command"}); //$NON-NLS-1$
@@ -404,34 +402,39 @@
         		if (row != null) {
         			correctTypes(row);
         			rows.add(row);
+        			this.rowCount++;
         		}
         	}
             LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Obtained last batch, total row count:", rowCount}); //$NON-NLS-1$
         }   
         
         if (sendResults) {
-        	int currentRowCount = rows.size();
-            if ( !lastBatch && currentRowCount == 0 ) {
-                // Defect 13366 - Should send all batches, even if they're zero size.
-                // Log warning if received a zero-size non-last batch from the connector.
-                LogManager.logWarning(LogConstants.CTX_CONNECTOR, DQPPlugin.Util.getString("ConnectorWorker.zero_size_non_last_batch", requestMsg.getConnectorID())); //$NON-NLS-1$
-            }
-
-            AtomicResultsMessage response = createResultsMessage(this.requestMsg, rows.toArray(new List[currentRowCount]), requestMsg.getCommand().getProjectedSymbols());
-            
-            // if we need to keep the execution alive, then we can not support
-            // implicit close.
-            response.setSupportsImplicitClose(!this.securityContext.keepExecutionAlive());
-            response.setTransactional(this.securityContext.isTransactional());
-            response.setWarnings(this.securityContext.getWarnings());
-
-            if ( lastBatch ) {
-                response.setFinalRow(rowCount);
-            } 
-            this.resultsReceiver.receiveResults(response);
+        	sendResults(rows);
         }
     }
 
+	protected void sendResults(List<List> rows) {
+		int currentRowCount = rows.size();
+		if ( !lastBatch && currentRowCount == 0 ) {
+		    // Defect 13366 - Should send all batches, even if they're zero size.
+		    // Log warning if received a zero-size non-last batch from the connector.
+		    LogManager.logWarning(LogConstants.CTX_CONNECTOR, DQPPlugin.Util.getString("ConnectorWorker.zero_size_non_last_batch", requestMsg.getConnectorID())); //$NON-NLS-1$
+		}
+
+		AtomicResultsMessage response = createResultsMessage(this.requestMsg, rows.toArray(new List[currentRowCount]), requestMsg.getCommand().getProjectedSymbols());
+		
+		// if we need to keep the execution alive, then we can not support
+		// implicit close.
+		response.setSupportsImplicitClose(!this.securityContext.keepExecutionAlive());
+		response.setTransactional(this.securityContext.isTransactional());
+		response.setWarnings(this.securityContext.getWarnings());
+
+		if ( lastBatch ) {
+		    response.setFinalRow(rowCount);
+		} 
+		this.resultsReceiver.receiveResults(response);
+	}
+
 	private void correctTypes(List row) throws ConnectorException {
 		//TODO: add a proper source schema
 		for (int i = convertToRuntimeType.size() - 1; i >= 0; i--) {

Added: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -0,0 +1,173 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.datamgr.impl;
+
+import java.util.Arrays;
+
+import org.teiid.connector.api.ConnectorException;
+import org.teiid.dqp.internal.cache.CacheID;
+import org.teiid.dqp.internal.cache.CacheResults;
+import org.teiid.dqp.internal.cache.ResultSetCache;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.comm.api.ResultsReceiver;
+import com.metamatrix.common.log.LogManager;
+import com.metamatrix.dqp.message.AtomicRequestID;
+import com.metamatrix.dqp.message.AtomicRequestMessage;
+import com.metamatrix.dqp.message.AtomicResultsMessage;
+import com.metamatrix.dqp.util.LogConstants;
+import com.metamatrix.query.sql.lang.Command;
+
+public class ConnectorWorkItemFactory {
+	
+    private final static char DELIMITER = '.';
+
+	private ResultSetCache rsCache;
+	private ConnectorManager manager;
+	private boolean synchWorkers;
+
+	/**
+	 * A work item that can get results from cache.
+	 */
+	private final class CachedResultsConnectorWorkItem extends
+			AsynchConnectorWorkItem {
+		private final CacheID cacheID;
+
+		private CachedResultsConnectorWorkItem(AtomicRequestMessage message,
+				ConnectorManager manager,
+				ResultsReceiver<AtomicResultsMessage> resultsReceiver,
+				CacheID cacheID) {
+			super(message, manager, resultsReceiver);
+			this.cacheID = cacheID;
+		}
+
+		@Override
+		protected void createExecution()
+				throws MetaMatrixComponentException,
+				ConnectorException {
+		}
+		
+		@Override
+		protected void processNewRequest() throws ConnectorException {
+			handleBatch();
+		}
+
+		@Override
+		protected void handleBatch() throws ConnectorException {
+			int firstRow = rowCount + 1;
+			//already in cache
+			CacheResults results = rsCache.getResults(cacheID, new int[]{firstRow, firstRow + requestMsg.getFetchSize() -1});
+			this.rowCount = rowCount + results.getResults().length;
+			if(results.isFinal()){
+				this.lastBatch = true;
+			}
+
+		    LogManager.logTrace(LogConstants.CTX_DQP, 
+		                        new Object[] { "CacheSynchQueryExecution - returnning batch from cache, startRow =",  //$NON-NLS-1$
+		                                       new Integer(firstRow),
+		                                       ", endRow =", //$NON-NLS-1$
+		                                       new Integer(rowCount)});
+		    sendResults(Arrays.asList(results.getResults()));
+		}
+	}
+
+	/**
+	 * Intercepts results for cachable commands
+	 */
+	public class CachedResultsReceiver implements ResultsReceiver<AtomicResultsMessage> {
+		
+		private ResultsReceiver<AtomicResultsMessage> actual;
+		private AtomicRequestID requestId;
+		private CacheID cacheID;
+		private int firstRow = 1;
+		
+		public CachedResultsReceiver(ResultsReceiver<AtomicResultsMessage> actual,
+				CacheID cacheID, AtomicRequestID requestId) {
+			this.actual = actual;
+			this.cacheID = cacheID;
+			this.requestId = requestId;
+		}
+
+		@Override
+		public void receiveResults(AtomicResultsMessage results) {
+			boolean isFinal = results.getFinalRow() >= 0;
+			if (results.isRequestClosed()) {
+				rsCache.removeTempResults(cacheID);
+			} else {
+				CacheResults cr = new CacheResults(results.getResults(), firstRow, isFinal);
+				firstRow += results.getResults().length;
+				rsCache.setResults(cacheID, cr, requestId);
+			}
+			actual.receiveResults(results);
+		}
+
+		@Override
+		public void exceptionOccurred(Throwable e) {
+			rsCache.removeTempResults(cacheID);
+			actual.exceptionOccurred(e);
+		}
+
+	}
+	
+	public ConnectorWorkItemFactory(ConnectorManager manager,
+			ResultSetCache rsCache, boolean synchWorkers) {
+		this.manager = manager;
+		this.rsCache = rsCache;
+		this.synchWorkers = synchWorkers;
+	}
+	
+	public ConnectorWorkItem createWorkItem(AtomicRequestMessage message, ResultsReceiver<AtomicResultsMessage> receiver) {
+    	if (this.rsCache != null && message.useResultSetCache()) {
+        	final CacheID cacheID = createCacheID(message);
+
+        	if (cacheID != null) {
+	        	if (rsCache.hasResults(cacheID)) {
+	        		return new CachedResultsConnectorWorkItem(message, manager,
+							receiver, cacheID);
+	        	}
+        		receiver = new CachedResultsReceiver(receiver, cacheID, message.getAtomicRequestID());
+        	}
+    	}
+    	
+    	if (synchWorkers) {
+    		return new SynchConnectorWorkItem(message, manager, receiver);
+    	} 
+    	return new AsynchConnectorWorkItem(message, manager, receiver);
+	}
+	
+	private CacheID createCacheID(AtomicRequestMessage message) {
+		Command command = message.getCommand();
+		if (!command.areResultsCachable()) {
+			return null;
+		}
+		String scope = rsCache.getCacheScope();
+		String scopeId = null;
+		if(ResultSetCache.RS_CACHE_SCOPE_VDB.equalsIgnoreCase(scope)){
+			scopeId = message.getWorkContext().getVdbName() + DELIMITER + message.getWorkContext().getVdbVersion();
+		}else{
+			scopeId = message.getWorkContext().getConnectionID();
+		}
+		return new CacheID(scopeId, command.toString());
+	}
+
+}


Property changes on: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItemFactory.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ExecutionContextImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ExecutionContextImpl.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ExecutionContextImpl.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -55,8 +55,6 @@
     private Serializable executionPayload;
     // ID of the parent JDBC Connection which is executing the statement
     private String requestConnectionID;
-    // uses the result set chache or not
-	private boolean useResultSetCache;
     // Execute count of the query
     private String executeCount;
     // keep the execution object alive during the processing. default:false 
@@ -71,7 +69,7 @@
     
     public ExecutionContextImpl(String vdbName, String vdbVersion, String userName,
                                 Serializable trustedPayload, Serializable executionPayload, 
-                                String originalConnectionID, String connectorId, String requestId, String partId, String execCount, boolean useResultSetCache) {
+                                String originalConnectionID, String connectorId, String requestId, String partId, String execCount) {
         
         this.vdbName = vdbName;
         this.vdbVersion = vdbVersion;
@@ -83,7 +81,6 @@
         this.partID = partId;        
         this.requestConnectionID = originalConnectionID;
         this.executeCount = execCount;
-        this.useResultSetCache = useResultSetCache;        
     }
     
     public String getConnectorIdentifier() {
@@ -124,10 +121,6 @@
 		return requestConnectionID;
 	}
 
-	public boolean useResultSetCache() {
-		return useResultSetCache;
-	}
-    
     public void keepExecutionAlive(boolean alive) {
         this.keepAlive = alive;
     }    

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeConnector.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeConnector.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeConnector.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -60,6 +60,17 @@
     private long simulatedBatchRetrievalTime = 1000L;
     private ClassLoader classloader;
     
+    private int connectionCount;
+    private int executionCount;
+    
+    public int getConnectionCount() {
+		return connectionCount;
+	}
+    
+    public int getExecutionCount() {
+		return executionCount;
+	}
+    
     @Override
     public Connection getConnection(org.teiid.connector.api.ExecutionContext context) throws ConnectorException {
         return new FakeConnection();
@@ -78,8 +89,14 @@
 	}
 	
     private class FakeConnection extends BasicConnection implements XAConnection {
+    	
+    	public FakeConnection() {
+			connectionCount++;
+		}
+    	
         public boolean released = false;
         public Execution createExecution(ICommand command, ExecutionContext executionContext, RuntimeMetadata metadata) throws ConnectorException {
+        	executionCount++;
             return new FakeBlockingExecution(executionContext);
         }
         public ConnectorCapabilities getCapabilities() {

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeExecutionContextImpl.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeExecutionContextImpl.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeExecutionContextImpl.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -46,8 +46,7 @@
             "ConnectorID"+unique,   //$NON-NLS-1$
             "RequestID"+unique,   //$NON-NLS-1$
             "PartID"+unique, //$NON-NLS-1$
-            "ExecCount"+unique, //$NON-NLS-1$
-            false); 
+            "ExecCount"+unique); 
     }
     
 }

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -26,23 +26,31 @@
  */
 package org.teiid.dqp.internal.datamgr.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-import junit.framework.TestCase;
-
+import org.junit.Test;
 import org.mockito.Mockito;
 import org.teiid.connector.internal.ConnectorPropertyNames;
-import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
+import org.teiid.dqp.internal.cache.ResultSetCache;
 import org.teiid.dqp.internal.datamgr.impl.TestConnectorWorkItem.QueueResultsReceiver;
 import org.teiid.dqp.internal.pooling.connector.FakeSourceConnectionFactory;
 import org.teiid.dqp.internal.process.DQPWorkContext;
 
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.cache.FakeCache;
 import com.metamatrix.common.application.ApplicationEnvironment;
 import com.metamatrix.common.application.exception.ApplicationLifecycleException;
 import com.metamatrix.dqp.message.AtomicRequestMessage;
+import com.metamatrix.dqp.message.AtomicResultsMessage;
 import com.metamatrix.dqp.service.DQPServiceNames;
 import com.metamatrix.dqp.service.FakeMetadataService;
 import com.metamatrix.query.optimizer.capabilities.SourceCapabilities;
@@ -51,18 +59,7 @@
 /**
  * JUnit test for TestConnectorManagerImpl
  */
-public final class TestConnectorManagerImpl extends TestCase {
-    // =========================================================================
-    //                        F R A M E W O R K
-    // =========================================================================
-    /**
-     * Constructor for TestConnectorManagerImpl.
-     * @param name
-     */
-    public TestConnectorManagerImpl(final String name) {
-        super(name);
-    }
-
+public final class TestConnectorManagerImpl {
     private Properties helpGetAppProps() {
         Properties appProperties = new Properties();
 
@@ -77,7 +74,7 @@
     //                         T E S T   C A S E S
     // =========================================================================
 
-    public void testStartFailsWithNullRequiredProp() throws Exception {
+    @Test public void testStartFailsWithNullRequiredProp() throws Exception {
         ConnectorManager cm = new ConnectorManager();
         Properties appProperties = helpGetAppProps();
         // Remove required property
@@ -92,7 +89,7 @@
         } 
     }
 
-    public void testReceive() throws Exception {
+    @Test public void testReceive() throws Exception {
     	ConnectorManager cm = new ConnectorManager();
     	startConnectorManager(cm, helpGetAppProps());
         
@@ -103,7 +100,7 @@
         cm.stop();
     }
     
-    public void testConnectorCapabilitiesOverride() throws Exception {
+    @Test public void testConnectorCapabilitiesOverride() throws Exception {
     	ConnectorManager cm = new ConnectorManager();
     	startConnectorManager(cm, helpGetAppProps());
 
@@ -134,7 +131,7 @@
         cm.start(env);
 	}
     
-    public void testIsXA() throws Exception {
+    @Test public void testIsXA() throws Exception {
     	ConnectorManager cm = new ConnectorManager();
         Properties props = new Properties();
         props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS, FakeConnector.class.getName());
@@ -144,7 +141,7 @@
         cm.stop();
     }
     
-    public void testIsXA_Failure() throws Exception {
+    @Test public void testIsXA_Failure() throws Exception {
         ConnectorManager cm = new ConnectorManager();
         Properties props = new Properties();
         props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS, FakeSourceConnectionFactory.class.getName());
@@ -157,7 +154,51 @@
         cm.stop();
     }
     
-    public void testDefect19049() throws Exception {
+    @Test public void testCaching() throws Exception {
+    	ConnectorManager cm = new ConnectorManager() {
+    		@Override
+    		protected ResultSetCache createResultSetCache(
+    				Properties rsCacheProps)
+    				throws MetaMatrixComponentException {
+    			assertEquals(rsCacheProps.get(ResultSetCache.RS_CACHE_MAX_AGE), String.valueOf(0));
+    			return new ResultSetCache(rsCacheProps, new FakeCache.FakeCacheFactory());
+    		}
+    	};
+        Properties props = new Properties();
+        props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS, FakeConnector.class.getName());
+        props.setProperty(ConnectorPropertyNames.USE_RESULTSET_CACHE, Boolean.TRUE.toString());
+        startConnectorManager(cm, props);
+        ConnectorWrapper wrapper = cm.getConnector();
+        FakeConnector fc = (FakeConnector)wrapper.getActualConnector();
+        assertEquals(0, fc.getConnectionCount());
+        assertEquals(0, fc.getExecutionCount());
+        AtomicRequestMessage request = TestConnectorWorkItem.createNewAtomicRequestMessage(1, 1);
+        request.setUseResultSetCache(true);
+        QueueResultsReceiver receiver = new QueueResultsReceiver();
+        cm.executeRequest(receiver, request);
+        AtomicResultsMessage arm = receiver.getResults().poll(1000, TimeUnit.MILLISECONDS);
+        assertEquals(-1, arm.getFinalRow());
+        //get the last batch - it will be 0 sized
+        cm.requstMore(request.getAtomicRequestID());
+        assertNotNull(receiver.getResults().poll(1000, TimeUnit.MILLISECONDS));
+        cm.closeRequest(request.getAtomicRequestID());
+        assertEquals(1, fc.getConnectionCount());
+        assertEquals(1, fc.getExecutionCount());
+
+        //this request should hit the cache
+        AtomicRequestMessage request1 = TestConnectorWorkItem.createNewAtomicRequestMessage(2, 1);
+        request1.setUseResultSetCache(true);
+        QueueResultsReceiver receiver1 = new QueueResultsReceiver();
+        cm.executeRequest(receiver1, request1);
+        arm = receiver1.getResults().poll(1000, TimeUnit.MILLISECONDS);
+        assertEquals(5, arm.getFinalRow());
+        assertEquals(1, fc.getConnectionCount());
+        assertEquals(1, fc.getExecutionCount());
+        
+        cm.stop();
+    }
+    
+    @Test public void testDefect19049() throws Exception {
         ConnectorManager cm = new ConnectorManager();
         Properties props = new Properties();
         final String connectorName = FakeConnector.class.getName();

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorStateManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorStateManager.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorStateManager.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -66,6 +66,7 @@
         csm = new ConnectorManager();
         csm.setConnectorWorkerPool(Mockito.mock(WorkerPool.class));
         csm.setConnector(new ConnectorWrapper(new FakeConnector()));
+        csm.setWorkItemFactory(new ConnectorWorkItemFactory(csm, null, true));
     }
 
     void helpAssureOneState() {

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestExecutionContextImpl.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestExecutionContextImpl.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestExecutionContextImpl.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -38,7 +38,7 @@
 
     public ExecutionContextImpl createContext(String requestID, String partID) {
         return new ExecutionContextImpl("vdb", "1", "user", null, null,   //$NON-NLS-1$//$NON-NLS-2$ //$NON-NLS-3$
-                                        "Connection", "Connector", requestID, partID, "0", false); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+                                        "Connection", "Connector", requestID, partID, "0"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
     }
     
     public void testEqivalenceSemanticsSame() {

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -71,7 +71,7 @@
     }
         
     public static ExecutionContext createContext(final String user, boolean userIdentity) {
-    	ExecutionContextImpl context = new ExecutionContextImpl(null, null, user, null, null, null, null, null, null, null, false);
+    	ExecutionContextImpl context = new ExecutionContextImpl(null, null, user, null, null, null, null, null, null, null);
     	if (userIdentity) {
     		context.setConnectorIdentity(new MappedUserIdentity(context.getUser(), null, null));
     	}

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPerUserPool.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPerUserPool.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestPerUserPool.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -111,7 +111,7 @@
         }
     	
         // session payload
-        ExecutionContextImpl impl = new ExecutionContextImpl(null, null, null, credentials, null, null, null, null, null, null, false);
+        ExecutionContextImpl impl = new ExecutionContextImpl(null, null, null, credentials, null, null, null, null, null, null);
         impl.setConnectorIdentity(factory.createIdentity(impl));
         return impl;
     }

Modified: trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/extension/TestSQLConversionVisitor.java
===================================================================
--- trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/extension/TestSQLConversionVisitor.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/extension/TestSQLConversionVisitor.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -74,8 +74,7 @@
                                                                             "ExecutionPayload",  //$NON-NLS-1$            
                                                                             "ConnectionID",   //$NON-NLS-1$
                                                                             "Connector",
-                                                                            "RequestID", "PartID", "ExecCount", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
-                                                                            false);    
+                                                                            "RequestID", "PartID", "ExecCount");    
     /**
      * Constructor for TestSQLConversionVisitor.
      * @param name

Modified: trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/oracle/TestOracleSQLConversionVisitor.java
===================================================================
--- trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/oracle/TestOracleSQLConversionVisitor.java	2009-03-19 17:17:58 UTC (rev 592)
+++ trunk/test-integration/src/test/java/com/metamatrix/connector/jdbc/oracle/TestOracleSQLConversionVisitor.java	2009-03-19 19:11:15 UTC (rev 593)
@@ -407,7 +407,7 @@
         String output = "SELECT /*+ ALL_ROWS */ PARTS.PART_NAME, ROWNUM FROM PARTS"; //$NON-NLS-1$
                
         String hint = "/*+ ALL_ROWS */"; //$NON-NLS-1$
-        ExecutionContext context = new ExecutionContextImpl(null, null, null, null, hint, null, "", null, null, null, false); //$NON-NLS-1$
+        ExecutionContext context = new ExecutionContextImpl(null, null, null, null, hint, null, "", null, null, null); //$NON-NLS-1$
         
         helpTestVisitor(getTestVDB(),
             input, 




More information about the teiid-commits mailing list