[teiid-commits] teiid SVN: r2522 - in branches/7.1.x: connectors/translator-ws/src/main/java/org/teiid/translator/ws and 6 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Sep 2 15:54:15 EDT 2010


Author: shawkins
Date: 2010-09-02 15:54:14 -0400 (Thu, 02 Sep 2010)
New Revision: 2522

Modified:
   branches/7.1.x/connectors/connector-ws/src/main/java/org/teiid/resource/adapter/ws/WSConnectionImpl.java
   branches/7.1.x/connectors/translator-ws/src/main/java/org/teiid/translator/ws/BinaryWSProcedureExecution.java
   branches/7.1.x/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSExecutionFactory.java
   branches/7.1.x/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSProcedureExecution.java
   branches/7.1.x/documentation/admin-guide/src/main/docbook/en-US/content/vdb-deployment.xml
   branches/7.1.x/documentation/reference/src/main/docbook/en-US/content/scalar_functions.xml
   branches/7.1.x/documentation/reference/src/main/docbook/en-US/content/translators.xml
   branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
   branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
   branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   branches/7.1.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
   branches/7.1.x/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
Log:
TEIID-1138 finish the binary webservice feature.  also moved the conversion logic of datasource/source to runtime types into datatiertuplesource.

Modified: branches/7.1.x/connectors/connector-ws/src/main/java/org/teiid/resource/adapter/ws/WSConnectionImpl.java
===================================================================
--- branches/7.1.x/connectors/connector-ws/src/main/java/org/teiid/resource/adapter/ws/WSConnectionImpl.java	2010-09-02 18:41:23 UTC (rev 2521)
+++ branches/7.1.x/connectors/connector-ws/src/main/java/org/teiid/resource/adapter/ws/WSConnectionImpl.java	2010-09-02 19:54:14 UTC (rev 2522)
@@ -98,7 +98,6 @@
 			try {
 				final URL url = new URL(endpoint);
 				final HttpURLConnection httpConn = (HttpURLConnection) url.openConnection();
-				httpConn.setDoOutput(true);
 				httpConn.setRequestMethod((String) requestContext.get(MessageContext.HTTP_REQUEST_METHOD));
 				Map<String, List<String>> header = (Map<String, List<String>>)requestContext.get(MessageContext.HTTP_REQUEST_HEADERS);
 				for (Map.Entry<String, List<String>> entry : header.entrySet()) {
@@ -113,7 +112,7 @@
 				}
 				
 				if (msg != null) {
-					httpConn.setDoInput(true);
+					httpConn.setDoOutput(true);
 					OutputStream os = httpConn.getOutputStream();
 					InputStream is = msg.getInputStream();
 					ObjectConverterUtil.write(os, is, -1);
@@ -220,16 +219,12 @@
 		} else {
 			//TODO: cache service/port/dispatch instances?
 			Bus bus = BusFactory.getThreadDefaultBus();
-			if (mcf.getSecurityType() == WSManagedConnectionFactory.SecurityType.WSSecurity) {
-				BusFactory.setThreadDefaultBus(mcf.getBus());
-			}
+			BusFactory.setThreadDefaultBus(mcf.getBus());
 			Service svc;
 			try {
 				svc = Service.create(svcQname);
 			} finally {
-				if (mcf.getSecurityType() == WSManagedConnectionFactory.SecurityType.WSSecurity) {
-					BusFactory.setThreadDefaultBus(bus);
-				}
+				BusFactory.setThreadDefaultBus(bus);
 			}
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_WS, MessageLevel.DETAIL)) {
 				LogManager.logDetail(LogConstants.CTX_WS, "Creating a dispatch with endpoint", endpoint); //$NON-NLS-1$

Modified: branches/7.1.x/connectors/translator-ws/src/main/java/org/teiid/translator/ws/BinaryWSProcedureExecution.java
===================================================================
--- branches/7.1.x/connectors/translator-ws/src/main/java/org/teiid/translator/ws/BinaryWSProcedureExecution.java	2010-09-02 18:41:23 UTC (rev 2521)
+++ branches/7.1.x/connectors/translator-ws/src/main/java/org/teiid/translator/ws/BinaryWSProcedureExecution.java	2010-09-02 19:54:14 UTC (rev 2522)
@@ -114,7 +114,6 @@
     
     @Override
     public List<?> getOutputParameterValues() throws TranslatorException {
-    	//TODO: this blob should bet buffered
         return Arrays.asList(returnValue, returnValue.getContentType());
     }    
     

Modified: branches/7.1.x/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSExecutionFactory.java
===================================================================
--- branches/7.1.x/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSExecutionFactory.java	2010-09-02 18:41:23 UTC (rev 2521)
+++ branches/7.1.x/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSExecutionFactory.java	2010-09-02 19:54:14 UTC (rev 2522)
@@ -22,12 +22,10 @@
 
 package org.teiid.translator.ws;
 
-import java.sql.SQLXML;
 import java.util.Collections;
 import java.util.List;
 
 import javax.resource.cci.ConnectionFactory;
-import javax.xml.transform.Source;
 import javax.xml.ws.Service.Mode;
 import javax.xml.ws.http.HTTPBinding;
 import javax.xml.ws.soap.SOAPBinding;
@@ -112,10 +110,6 @@
 		return new WSProcedureExecution(command, metadata, executionContext, this, connection);
     }
     
-    public SQLXML convertToXMLType(Source value) {
-    	return (SQLXML)getTypeFacility().convertToRuntimeType(value);
-    } 	
-	
 	@Override
     public final List getSupportedFunctions() {
         return Collections.EMPTY_LIST;

Modified: branches/7.1.x/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSProcedureExecution.java
===================================================================
--- branches/7.1.x/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSProcedureExecution.java	2010-09-02 18:41:23 UTC (rev 2521)
+++ branches/7.1.x/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSProcedureExecution.java	2010-09-02 19:54:14 UTC (rev 2522)
@@ -42,9 +42,6 @@
 import org.teiid.core.types.XMLType;
 import org.teiid.language.Argument;
 import org.teiid.language.Call;
-import org.teiid.logging.LogConstants;
-import org.teiid.logging.LogManager;
-import org.teiid.logging.MessageLevel;
 import org.teiid.metadata.RuntimeMetadata;
 import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.ExecutionContext;
@@ -62,7 +59,7 @@
 	RuntimeMetadata metadata;
     ExecutionContext context;
     private Call procedure;
-    private SQLXML returnValue;
+    private Source returnValue;
     private WSConnection conn;
     private WSExecutionFactory executionFactory;
     
@@ -132,14 +129,7 @@
 				// JBoss Native DispatchImpl throws exception when the source is null
 				source = new StreamSource(new StringReader("<none/>")); //$NON-NLS-1$
 			}
-			Source result = dispatch.invoke(source);
-			this.returnValue = this.executionFactory.convertToXMLType(result);
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_WS, MessageLevel.DETAIL)) {
-	        	try {
-					LogManager.logDetail(LogConstants.CTX_CONNECTOR, "WebService Response: " + this.returnValue.getString()); //$NON-NLS-1$
-				} catch (SQLException e) {
-				}
-	        }
+			this.returnValue = dispatch.invoke(source);
 		} catch (SQLException e) {
 			throw new TranslatorException(e);
 		} catch (WebServiceException e) {
@@ -153,9 +143,6 @@
 		if (xml == null) {
 			return null;
 		}
-		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_WS, MessageLevel.DETAIL)) { 
-			LogManager.logDetail(LogConstants.CTX_CONNECTOR, "Request " + xml.getString()); //$NON-NLS-1$
-	    }
 		return xml.getSource(StreamSource.class);
 	}
     

Modified: branches/7.1.x/documentation/admin-guide/src/main/docbook/en-US/content/vdb-deployment.xml
===================================================================
--- branches/7.1.x/documentation/admin-guide/src/main/docbook/en-US/content/vdb-deployment.xml	2010-09-02 18:41:23 UTC (rev 2521)
+++ branches/7.1.x/documentation/admin-guide/src/main/docbook/en-US/content/vdb-deployment.xml	2010-09-02 19:54:14 UTC (rev 2522)
@@ -156,7 +156,7 @@
 				 <code>WsSecurityConfigName</code>.  The namespace URI for the QName should be http://teiid.org.</para>
 				 
 				 <example>
-            	<title>Example web service based data source</title>
+            	<title>Example WS-Security enabled data source</title>
                 <programlisting><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
 <connection-factories>
    <no-tx-connection-factory>
@@ -204,6 +204,45 @@
 	please consult the <ulink url="https://cwiki.apache.org/CXF20DOC/ws-security.html">CXF documentation</ulink> or 
 	the <ulink url="http://community.jboss.org/wiki/JBossWS-StackCXFUserGuide#WSSecurity">JBossWS-CXF documentation</ulink>.</para>
 			</section>
+			<section>
+				<title>Logging</title>
+				<para>Even when not using WS-Security, the <code>WsSecurityConfigURL</code> config property may then be used to supply the Spring XML configuration 
+				file.  Logging of requests and responses may be enabled for the entire bus or for specific port configurations.  
+				If a specific port configuration is used, just as with WS-Security, the local name of the port 
+				QName should match <code>WsSecurityConfigName</code> and the namespace URI should be http://teiid.org.</para>
+				<example>
+            	<title>Example logging data source</title>
+                <programlisting><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
+<connection-factories>
+   <no-tx-connection-factory>
+      <jndi-name>somewhere-ws-source</jndi-name>
+      <rar-name>teiid-connector-ws.rar</rar-name>
+      <connection-definition>javax.resource.cci.ConnectionFactory</connection-definition>      
+      <config-property name="EndPoint">http://somewhere.com</config-property>
+      <config-property name="WsSecurityConfigURL">${jboss.server.home.dir}/server/default/conf/xxx-jbossws-cxf.xml</config-property>
+      <config-property name="WsSecurityConfigName">port_y</config-property>
+   </no-tx-connection-factory>
+</connection-factories>]]></programlisting>
+<para>Corresponding xxx-jbossws-cxf.xml</para>
+<programlisting><![CDATA[<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:jaxws="http://cxf.apache.org/jaxws"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans
+          http://www.springframework.org/schema/beans/spring-beans.xsd
+          http://cxf.apache.org/jaxws
+          http://cxf.apache.org/schemas/jaxws.xsd">
+
+    <jaxws:client name="{http://teiid.org}port_y" 
+        createdFromAPI="true">
+        <jaxws:features>
+            <bean class="org.apache.cxf.feature.LoggingFeature"/>
+        </jaxws:features>
+    </jaxws:client>
+    
+</beans>]]></programlisting>
+</example>
+			<para>Logging is then performed at an INFO level to the org.apache.cxf.interceptor context.  Logging is only performed for non-binary web service calls.</para>
+			</section>
         </section>
         
         <section>

Modified: branches/7.1.x/documentation/reference/src/main/docbook/en-US/content/scalar_functions.xml
===================================================================
--- branches/7.1.x/documentation/reference/src/main/docbook/en-US/content/scalar_functions.xml	2010-09-02 18:41:23 UTC (rev 2521)
+++ branches/7.1.x/documentation/reference/src/main/docbook/en-US/content/scalar_functions.xml	2010-09-02 19:54:14 UTC (rev 2522)
@@ -1036,7 +1036,7 @@
             </entry>
           </row><row>
             <entry>
-              <para>TO_BYTES(x, encoding)</para>
+              <para id="to_bytes">TO_BYTES(x, encoding)</para>
             </entry>
             <entry>
               <para>Return a blob from the clob with the given encoding.  

Modified: branches/7.1.x/documentation/reference/src/main/docbook/en-US/content/translators.xml
===================================================================
--- branches/7.1.x/documentation/reference/src/main/docbook/en-US/content/translators.xml	2010-09-02 18:41:23 UTC (rev 2521)
+++ branches/7.1.x/documentation/reference/src/main/docbook/en-US/content/translators.xml	2010-09-02 19:54:14 UTC (rev 2522)
@@ -693,21 +693,31 @@
 </tgroup>
 </table>
 <para>There are ws importer settings, but it does provide metadata for dynamic vdbs.</para>
-<note>
-<para>Setting the org.teiid.CONNECTOR.WS logging context to detail will show the request/response documents as part of the log.</para>
-</note>
 <section><title>Usage</title>
-<para>The main procedure, invoke, allows for multiple binding, or protocol modes, including HTTP, SOAP11, and SOAP12. 
+<para>The WS translator exposes low level procedures for accessing web services.  See also the ws-weather example in the kit.</para>
+<section>
+<title>Invoke Procedure</title>
+<para>invoke allows for multiple binding, or protocol modes, including HTTP, SOAP11, and SOAP12. 
 <programlisting>Procedure invoke(binding in STRING, action in STRING, request in XML, endpoint in STRING) returns XML</programlisting>
 </para>
 <para>The binding may be one of null (to use the default) HTTP, SOAP11, or SOAP12.  Action with a SOAP binding indicates the SOAPAction value.  Action with a HTTP binding indicates the HTTP method (GET, POST, etc.), which defaults to POST.</para>
 <para>A null value for the binding or endpoint will use the default value.  The default endpoint is specified in the WS resource adapter configuration.  The endpoint URL may be absolute or relative.  If it's relative then it will be combined with the default endpoint.</para>
-<para>Since multiple parameters are not required to have values, it is often more clear to call the invoke procedure with named parameter syntax. e.g. <programlisting>call invoke(binding='HTTP', action='GET')</programlisting></para>
+<para>Since multiple parameters are not required to have values, it is often more clear to call the invoke procedure with named parameter syntax. e.g. <programlisting>call invoke(binding=>'HTTP', action=>'GET')</programlisting></para>
 <para>The request XML should be a valid XML document or root element.</para>
-<para>See the ws-weather example in the kit and database metadata for a full description of invoke.</para>
 </section>
+<section>
+<title>InvokeHTTP Procedure</title>
+<para>invokeHttp can return the byte contents of an HTTP(S) call. 
+<programlisting>Procedure invokeHttp(action in STRING, request in OBJECT, endpoint in STRING, contentType out STRING) returns BLOB</programlisting>
+</para>
+<para>Action indicates the HTTP method (GET, POST, etc.), which defaults to POST.</para>
+<para>A null value for endpoint will use the default value.  The default endpoint is specified in the WS resource adapter configuration.  The endpoint URL may be absolute or relative.  If it's relative then it will be combined with the default endpoint.</para>
+<para>Since multiple parameters are not required to have values, it is often more clear to call the invoke procedure with named parameter syntax. e.g. <programlisting>call invokeHttp(action=>'GET')</programlisting></para>
+<para>The request can be one of SQLXML, STRING, BLOB, or CLOB.  The request will be sent as the POST payload in byte form.  For STRING/CLOB values this will default to the UTF-8 encoding.  To control the byte encoding, see the <link linkend="to_bytes">to_bytes</link> function.</para>
 </section>
 </section>
+</section>
+</section>
 <section id="dynamic_vdbs">
 <title>Dynamic VDBs</title>
 <para>

Modified: branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java	2010-09-02 18:41:23 UTC (rev 2521)
+++ branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java	2010-09-02 19:54:14 UTC (rev 2522)
@@ -31,8 +31,6 @@
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.types.DataTypeManager;
-import org.teiid.core.types.TransformationException;
 import org.teiid.core.util.Assertion;
 import org.teiid.dqp.DQPPlugin;
 import org.teiid.dqp.message.AtomicRequestID;
@@ -49,7 +47,6 @@
 import org.teiid.query.metadata.TempMetadataStore;
 import org.teiid.query.sql.lang.Command;
 import org.teiid.query.sql.lang.StoredProcedure;
-import org.teiid.query.sql.symbol.SingleElementSymbol;
 import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.Execution;
 import org.teiid.translator.ExecutionFactory;
@@ -75,9 +72,7 @@
     private volatile ResultSetExecution execution;
     private ProcedureBatchHandler procedureBatchHandler;
     private org.teiid.language.Command translatedCommand;
-    private Class<?>[] schema;
-    private List<Integer> convertToRuntimeType;
-    private boolean[] convertToDesiredRuntimeType;
+    private int expectedColumns;
         
     /* End state information */    
     private boolean lastBatch;
@@ -200,17 +195,7 @@
 	        this.connection = this.connector.getConnection(this.connectionFactory);
 	        // Translate the command
 	        Command command = this.requestMsg.getCommand();
-			List<SingleElementSymbol> symbols = this.requestMsg.getCommand().getProjectedSymbols();
-			this.schema = new Class[symbols.size()];
-			this.convertToDesiredRuntimeType = new boolean[symbols.size()];
-			this.convertToRuntimeType = new ArrayList<Integer>(symbols.size());
-			for (int i = 0; i < schema.length; i++) {
-				SingleElementSymbol symbol = symbols.get(i);
-				this.schema[i] = symbol.getType();
-				this.convertToDesiredRuntimeType[i] = true;
-				this.convertToRuntimeType.add(i);
-			}
-	
+	        this.expectedColumns = command.getProjectedSymbols().size();
 	        LanguageBridgeFactory factory = new LanguageBridgeFactory(queryMetadata);
 	        this.translatedCommand = factory.translate(command);
 	
@@ -285,8 +270,8 @@
             		this.lastBatch = true;
             		break;
             	}
-            	if (row.size() != this.schema.length) {
-            		throw new AssertionError("Inproper results returned.  Expected " + this.schema.length + " columns, but was " + row.size()); //$NON-NLS-1$ //$NON-NLS-2$
+            	if (row.size() != this.expectedColumns) {
+            		throw new AssertionError("Inproper results returned.  Expected " + this.expectedColumns + " columns, but was " + row.size()); //$NON-NLS-1$ //$NON-NLS-2$
         		}
             	this.rowCount += 1;
             	batchSize++;
@@ -294,7 +279,6 @@
             		row = this.procedureBatchHandler.padRow(row);
             	}
             	
-            	correctTypes(row);
             	rows.add(row);
 	            // Check for max result rows exceeded
 	            if(this.requestMsg.getMaxResultRows() > -1 && this.rowCount >= this.requestMsg.getMaxResultRows()){
@@ -318,7 +302,6 @@
         	if (this.procedureBatchHandler != null) {
         		List row = this.procedureBatchHandler.getParameterRow();
         		if (row != null) {
-        			correctTypes(row);
         			rows.add(row);
         			this.rowCount++;
         		}
@@ -346,46 +329,6 @@
 		return response;
 	}
 
-	private void correctTypes(List row) throws TranslatorException {
-		//TODO: add a proper source schema
-		for (int i = convertToRuntimeType.size() - 1; i >= 0; i--) {
-			int index = convertToRuntimeType.get(i);
-			Object value = row.get(index);
-			if (value != null) {
-				Object result = DataTypeManager.convertToRuntimeType(value);
-				if (DataTypeManager.isLOB(result.getClass())) {
-					this.securityContext.keepExecutionAlive(true);
-				}
-				if (value == result && !DataTypeManager.DefaultDataClasses.OBJECT.equals(this.schema[index])) {
-					convertToRuntimeType.remove(i);
-					continue;
-				}
-				row.set(index, result);
-			}
-		}
-		//TODO: add a proper intermediate schema
-		for (int i = 0; i < row.size(); i++) {
-			if (convertToDesiredRuntimeType[i]) {
-				Object value = row.get(i);
-				if (value != null) {
-					Object result;
-					try {
-						result = DataTypeManager.transformValue(value, value.getClass(), this.schema[i]);
-					} catch (TransformationException e) {
-						throw new TranslatorException(e);
-					}
-					if (value == result) {
-						convertToDesiredRuntimeType[i] = false;
-						continue;
-					}
-					row.set(i, result);
-				}
-			} else {
-				row.set(i, DataTypeManager.getCanonicalValue(row.get(i)));
-			}
-		}
-	}
-    
     public static AtomicResultsMessage createResultsMessage(List[] batch, List columnSymbols) {
         String[] dataTypes = TupleBuffer.getTypeNames(columnSymbols);        
         return new AtomicResultsMessage(batch, dataTypes);

Modified: branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2010-09-02 18:41:23 UTC (rev 2521)
+++ branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2010-09-02 19:54:14 UTC (rev 2522)
@@ -38,6 +38,7 @@
 import org.teiid.client.RequestMessage;
 import org.teiid.client.util.ResultsFuture;
 import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleSource;
 import org.teiid.core.CoreConstants;
 import org.teiid.core.TeiidComponentException;
@@ -398,4 +399,8 @@
     	requestMgr.scheduleWork(r, priority, delay);
     }
     
+    BufferManager getBufferManager() {
+		return bufferService.getBufferManager();
+	}
+    
 }

Modified: branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2010-09-02 18:41:23 UTC (rev 2521)
+++ branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2010-09-02 19:54:14 UTC (rev 2522)
@@ -22,22 +22,41 @@
 
 package org.teiid.dqp.internal.process;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
+import javax.activation.DataSource;
+import javax.xml.transform.Source;
+
 import org.teiid.client.SourceWarning;
 import org.teiid.client.util.ResultsFuture;
 import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.FileStore;
 import org.teiid.common.buffer.TupleSource;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.types.BlobImpl;
+import org.teiid.core.types.BlobType;
+import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.InputStreamFactory;
+import org.teiid.core.types.SQLXMLImpl;
+import org.teiid.core.types.StandardXMLTranslator;
+import org.teiid.core.types.Streamable;
+import org.teiid.core.types.TransformationException;
+import org.teiid.core.types.XMLType;
 import org.teiid.core.util.Assertion;
+import org.teiid.core.util.ObjectConverterUtil;
 import org.teiid.dqp.internal.datamgr.ConnectorWork;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.AtomicResultsMessage;
+import org.teiid.query.processor.xml.XMLUtil;
+import org.teiid.query.processor.xml.XMLUtil.FileStoreInputStreamFactory;
+import org.teiid.query.sql.symbol.SingleElementSymbol;
 import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.TranslatorException;
 
@@ -58,6 +77,10 @@
     private final ConnectorWork cwi;
     private final DataTierManagerImpl dtm;
     
+    private List<Integer> convertToRuntimeType;
+    private boolean[] convertToDesiredRuntimeType;
+    private Class<?>[] schema;
+    
     // Data state
     private int index;
     private int rowsProcessed;
@@ -66,6 +89,7 @@
     private volatile boolean canceled;
     private boolean executed;
     private volatile boolean done;
+    private boolean explicitClose;
     
     private volatile ResultsFuture<AtomicResultsMessage> futureResult;
     private volatile boolean running;
@@ -75,6 +99,17 @@
         this.workItem = workItem;
         this.cwi = cwi;
         this.dtm = dtm;
+		List<SingleElementSymbol> symbols = this.aqr.getCommand().getProjectedSymbols();
+		this.schema = new Class[symbols.size()];
+        this.convertToDesiredRuntimeType = new boolean[symbols.size()];
+		this.convertToRuntimeType = new ArrayList<Integer>(symbols.size());
+		for (int i = 0; i < symbols.size(); i++) {
+			SingleElementSymbol symbol = symbols.get(i);
+			this.schema[i] = symbol.getType();
+			this.convertToDesiredRuntimeType[i] = true;
+			this.convertToRuntimeType.add(i);
+		}
+        
     	Assertion.isNull(workItem.getConnectorRequest(aqr.getAtomicRequestID()));
         workItem.addConnectorRequest(aqr.getAtomicRequestID(), this);
         if (!aqr.isTransactional()) {
@@ -96,10 +131,75 @@
 		});
 	}
 
-    public List getSchema() {
-        return this.aqr.getCommand().getProjectedSymbols();
-    }
+	private List correctTypes(List row) throws TransformationException {
+		for (int i = convertToRuntimeType.size() - 1; i >= 0; i--) {
+			int idx = convertToRuntimeType.get(i);
+			Object value = row.get(idx);
+			if (value != null) {
+				Object result = convertToRuntimeType(value, this.schema[idx]);
+				if (DataTypeManager.isLOB(result.getClass())) {
+					explicitClose = true;
+				}
+				if (value == result && !DataTypeManager.DefaultDataClasses.OBJECT.equals(this.schema[idx])) {
+					convertToRuntimeType.remove(i);
+					continue;
+				}
+				row.set(idx, result);
+			}
+		}
+		//TODO: add a proper intermediate schema
+		for (int i = 0; i < row.size(); i++) {
+			if (convertToDesiredRuntimeType[i]) {
+				Object value = row.get(i);
+				if (value != null) {
+					Object result = DataTypeManager.transformValue(value, value.getClass(), this.schema[i]);
+					if (value == result) {
+						convertToDesiredRuntimeType[i] = false;
+						continue;
+					}
+					row.set(i, result);
+				}
+			} else {
+				row.set(i, DataTypeManager.getCanonicalValue(row.get(i)));
+			}
+		}
+		return row;
+	}
 
+	private Object convertToRuntimeType(Object value, Class<?> desiredType) throws TransformationException {
+		if (value instanceof DataSource && (!(value instanceof Source) || desiredType != DataTypeManager.DefaultDataClasses.XML)) {
+			if (value instanceof InputStreamFactory) {
+				return new BlobType(new BlobImpl((InputStreamFactory)value));
+			}
+			FileStore fs = dtm.getBufferManager().createFileStore("bytes"); //$NON-NLS-1$
+			//TODO: guess at the encoding from the content type
+			FileStoreInputStreamFactory fsisf = new FileStoreInputStreamFactory(fs, Streamable.ENCODING);
+			
+			try {
+				ObjectConverterUtil.write(fsisf.getOuputStream(), ((DataSource)value).getInputStream(), -1);
+			} catch (IOException e) {
+				throw new TransformationException(e, e.getMessage());
+			}
+			return new BlobType(new BlobImpl(fsisf));
+		}
+		if (value instanceof Source) {
+			if (value instanceof InputStreamFactory) {
+				return new XMLType(new SQLXMLImpl((InputStreamFactory)value));
+			}
+			StandardXMLTranslator sxt = new StandardXMLTranslator((Source)value);
+			SQLXMLImpl sqlxml;
+			try {
+				sqlxml = XMLUtil.saveToBufferManager(dtm.getBufferManager(), sxt);
+			} catch (TeiidComponentException e) {
+				throw new TeiidRuntimeException(e);
+			} catch (TeiidProcessingException e) {
+				throw new TeiidRuntimeException(e);
+			}
+			return new XMLType(sqlxml);
+		}
+		return DataTypeManager.convertToRuntimeType(value);
+	}
+
     public List<?> nextTuple() throws TeiidComponentException, TeiidProcessingException {
     	while (true) {
     		if (arm == null) {
@@ -124,7 +224,7 @@
     			receiveResults(results);
     		}
 	    	if (index < arm.getResults().length) {
-	            return this.arm.getResults()[index++];
+	            return correctTypes(this.arm.getResults()[index++]);
 	        }
 	    	arm = null;
 	    	if (isDone()) {
@@ -247,7 +347,7 @@
      * @see TupleSource#closeSource()
      */
     public void closeSource() {
-    	if (this.arm == null || this.arm.supportsImplicitClose()) {
+    	if (!explicitClose) {
         	fullyCloseSource();
     	}
     }
@@ -274,6 +374,7 @@
 
 	void receiveResults(AtomicResultsMessage response) {
 		this.arm = response;
+		explicitClose |= !arm.supportsImplicitClose();
         rowsProcessed += response.getResults().length;
         index = 0;
 		if (response.getWarnings() != null) {

Modified: branches/7.1.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
--- branches/7.1.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2010-09-02 18:41:23 UTC (rev 2521)
+++ branches/7.1.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2010-09-02 19:54:14 UTC (rev 2522)
@@ -24,6 +24,7 @@
 
 import static org.junit.Assert.*;
 
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.teiid.cache.DefaultCacheFactory;
@@ -52,9 +53,13 @@
     private AtomicRequestMessage request;
     private Command command;
     private DataTierTupleSource info;
-    private AutoGenDataService connectorManager;
+    private AutoGenDataService connectorManager = new AutoGenDataService();
     private RequestWorkItem workItem;
     
+    @Before public void setUp() {
+    	connectorManager = new AutoGenDataService();
+    }
+    
     private static Command helpGetCommand(String sql, QueryMetadataInterface metadata) throws Exception {
         Command command = QueryParser.getQueryParser().parseCommand(sql);
         QueryResolver.resolveCommand(command, metadata);
@@ -69,7 +74,6 @@
         QueryMetadataInterface metadata = FakeMetadataFactory.exampleBQTCached();
         DQPWorkContext workContext = FakeMetadataFactory.buildWorkContext(metadata, FakeMetadataFactory.exampleBQTVDB());
         
-        connectorManager = new AutoGenDataService();
         rm = new DQPCore();
         rm.setTransactionService(new FakeTransactionService());
         rm.setBufferService(new FakeBufferService());
@@ -126,8 +130,8 @@
     }
     
     @Test public void testNoRowsException() throws Exception {
+    	this.connectorManager.setRows(0);
     	helpSetup(3);
-    	this.connectorManager.setRows(0);
     	while (true) {
 	    	try {
 	        	assertNull(info.nextTuple());

Modified: branches/7.1.x/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- branches/7.1.x/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2010-09-02 18:41:23 UTC (rev 2521)
+++ branches/7.1.x/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2010-09-02 19:54:14 UTC (rev 2522)
@@ -119,6 +119,7 @@
                 Class type = symbol.getType();
                 row.add( getValue(type) );
             }
+            rows[i] = row;
         }   
         
         return rows;



More information about the teiid-commits mailing list