[teiid-commits] teiid SVN: r2803 - in trunk: common-core/src/main/java/org/teiid/core/util and 5 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Fri Dec 24 13:47:11 EST 2010


Author: shawkins
Date: 2010-12-24 13:47:11 -0500 (Fri, 24 Dec 2010)
New Revision: 2803

Modified:
   trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java
   trunk/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java
   trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java
   trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLUtil.java
Log:
TEIID-1227 expanding the persistent check and ensuring that procedures with lob params are not cached.

Modified: trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java	2010-12-24 02:09:29 UTC (rev 2802)
+++ trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java	2010-12-24 18:47:11 UTC (rev 2803)
@@ -83,6 +83,10 @@
     	return null;
     }
     
+    public boolean isPersistent() {
+    	return false;
+    }
+    
     public static class FileInputStreamFactory extends InputStreamFactory {
     	
     	private File f;
@@ -102,6 +106,11 @@
     		return new BufferedInputStream(new FileInputStream(f));
     	}
     	
+    	@Override
+    	public boolean isPersistent() {
+    		return true;
+    	}
+    	
     }
     
     public static class ClobInputStreamFactory extends InputStreamFactory implements DataSource {

Modified: trunk/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java	2010-12-24 02:09:29 UTC (rev 2802)
+++ trunk/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java	2010-12-24 18:47:11 UTC (rev 2803)
@@ -95,10 +95,9 @@
         return out.toByteArray();
     }
     
-    public static void write(final OutputStream out, final InputStream is, int length) throws IOException {
+    public static void write(final OutputStream out, final InputStream is, byte[] l_buffer, int length) throws IOException {
     	int writen = 0;
         try {
-	        byte[] l_buffer = new byte[DEFAULT_READING_SIZE]; // buffer holding bytes to be transferred
 	        int l_nbytes = 0;  // Number of bytes read
 	        while ((l_nbytes = is.read(l_buffer)) != -1) {
 	        	if (length != -1 && writen > length - l_nbytes) {
@@ -117,6 +116,10 @@
         }
     }
     
+    public static void write(final OutputStream out, final InputStream is, int length) throws IOException {
+    	write(out, is, new byte[DEFAULT_READING_SIZE], length); // buffer holding bytes to be transferred
+    }
+    
     public static void write(final Writer out, final Reader is, int length) throws IOException {
     	int writen = 0;
         try {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java	2010-12-24 02:09:29 UTC (rev 2802)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java	2010-12-24 18:47:11 UTC (rev 2803)
@@ -22,7 +22,6 @@
 
 package org.teiid.common.buffer;
 
-import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -30,14 +29,13 @@
 import java.sql.Clob;
 import java.sql.SQLException;
 import java.sql.SQLXML;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.teiid.core.TeiidComponentException;
+import org.teiid.core.types.BaseLob;
 import org.teiid.core.types.BlobImpl;
 import org.teiid.core.types.BlobType;
 import org.teiid.core.types.ClobImpl;
@@ -47,8 +45,8 @@
 import org.teiid.core.types.SQLXMLImpl;
 import org.teiid.core.types.Streamable;
 import org.teiid.core.types.XMLType;
+import org.teiid.core.util.ObjectConverterUtil;
 import org.teiid.query.QueryPlugin;
-import org.teiid.query.processor.xml.XMLUtil.FileStoreInputStreamFactory;
 import org.teiid.query.sql.symbol.Expression;
 
 /**
@@ -56,9 +54,7 @@
  * TODO: for temp tables we may need to have a copy by value management strategy
  */
 public class LobManager {
-	private static final int IO_BUFFER_SIZE = 1 << 14;
 	private Map<String, Streamable<?>> lobReferences = new ConcurrentHashMap<String, Streamable<?>>();
-	private Map<String, Streamable<?>> lobFilestores = new ConcurrentHashMap<String, Streamable<?>>();
 
 	public void updateReferences(int[] lobIndexes, List<?> tuple)
 			throws TeiidComponentException {
@@ -78,26 +74,13 @@
 	}
 	
     public Streamable<?> getLobReference(String id) throws TeiidComponentException {
-    	Streamable<?> lob = null;
-    	if (this.lobReferences != null) {
-    		lob = this.lobReferences.get(id);
-    	}
-    	
+    	Streamable<?> lob = this.lobReferences.get(id);
     	if (lob == null) {
-			lob = this.lobFilestores.get(id);
-    	}
-    	
-    	if (lob == null) {
     		throw new TeiidComponentException(QueryPlugin.Util.getString("ProcessWorker.wrongdata")); //$NON-NLS-1$
     	}
     	return lob;
     }
         
-    public void clear() {
-    	this.lobReferences.clear();
-    	this.lobFilestores.clear();
-    }
-    
     public static int[] getLobIndexes(List expressions) {
     	if (expressions == null) {
     		return null;
@@ -116,54 +99,33 @@
 	    return Arrays.copyOf(result, resultIndex);
     }
 
-    public Collection<Streamable<?>> getLobReferences(){
-    	return lobReferences.values();
-    }
-    
 	public void persist(FileStore lobStore) throws TeiidComponentException {
-		ArrayList<Streamable<?>> lobs = new ArrayList<Streamable<?>>(this.lobReferences.values());
-		for (Streamable<?> lob:lobs) {
-			persist(lob.getReferenceStreamId(), lobStore);
+		// stream the contents of lob into file store.
+		byte[] bytes = new byte[102400]; // 100k
+
+		for (Map.Entry<String, Streamable<?>> entry : this.lobReferences.entrySet()) {
+			entry.setValue(persistLob(entry.getValue(), lobStore, bytes));
 		}
 	}    
     
-	public Streamable<?> persist(String id, FileStore fs) throws TeiidComponentException {
-		Streamable<?> persistedLob = this.lobFilestores.get(id);
-		if (persistedLob == null) {
-			Streamable<?> lobReference =  this.lobReferences.get(id);
-			if (lobReference == null) {
-	    		throw new TeiidComponentException(QueryPlugin.Util.getString("ProcessWorker.wrongdata")); //$NON-NLS-1$
-	    	}
-	    	
-	    	persistedLob = persistLob(lobReference, fs);
-			synchronized (this) {
-				this.lobFilestores.put(id, persistedLob);
-				this.lobReferences.remove(id);		
-			}
-		}
-		return persistedLob;
-	}
-	
-	private Streamable<?> persistLob(final Streamable<?> lob, final FileStore store) throws TeiidComponentException {
-		long offset = store.getLength();
-		int length = 0;
-		Streamable<?> persistedLob;
+	private Streamable<?> persistLob(final Streamable<?> lob, final FileStore store, byte[] bytes) throws TeiidComponentException {
 		
-		// if this is XML and already saved to disk just return
-		if (lob.getReference() instanceof SQLXMLImpl) {
+		// if this is already saved to disk just return
+		if (lob.getReference() instanceof BaseLob) {
 			try {
-				SQLXMLImpl xml = (SQLXMLImpl)lob.getReference();
-				InputStreamFactory isf = xml.getStreamFactory();
-				if (isf instanceof FileStoreInputStreamFactory) {
+				BaseLob baseLob = (BaseLob)lob.getReference();
+				InputStreamFactory isf = baseLob.getStreamFactory();
+				if (isf.isPersistent()) {
 					return lob;
 				}
 			} catch (SQLException e) {
 				// go through regular persistence.
 			}
 		}
+		long offset = store.getLength();
+		int length = 0;
+		Streamable<?> persistedLob;
 					
-		// stream the contents of lob into file store.
-		byte[] bytes = new byte[102400]; // 100k
 		try {
 			InputStreamFactory isf = new InputStreamFactory() {
 				@Override
@@ -178,20 +140,11 @@
 				}					
 			};
 			InputStream is = isf.getInputStream();
-			OutputStream fsos = new BufferedOutputStream(store.createOutputStream(), IO_BUFFER_SIZE);
-			while(true) {
-				int read = is.read(bytes, 0, 102400);
-				if (read == -1) {
-					break;
-				}
-				length += read;
-				fsos.write(bytes, 0, read);
-			}
-			fsos.close();
-			is.close();
+			OutputStream fsos = store.createOutputStream();
+			ObjectConverterUtil.write(fsos, is, bytes, -1);
 		} catch (IOException e) {
 			throw new TeiidComponentException(e);
-		}		
+		}
 		
 		// re-construct the new lobs based on the file store
 		final long lobOffset = offset;
@@ -201,6 +154,11 @@
 			public InputStream getInputStream() throws IOException {
 				return store.createInputStream(lobOffset, lobLength);
 			}
+			
+			@Override
+			public boolean isPersistent() {
+				return true;
+			}
 		};			
 		
 		try {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2010-12-24 02:09:29 UTC (rev 2802)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2010-12-24 18:47:11 UTC (rev 2803)
@@ -160,7 +160,6 @@
 	
 	public void persistLobs() throws TeiidComponentException {
 		if (this.lobManager != null) {
-			saveBatch(true, true);
 			this.lobManager.persist(this.lobStore);
 		}
 	}
@@ -236,6 +235,9 @@
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
 	            LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Removing TupleBuffer:", this.tupleSourceID); //$NON-NLS-1$
 	        }
+			if (this.lobStore != null) {
+				this.lobStore.remove();
+			}
 			this.batchBuffer = null;
 			purge();
 			this.manager.remove();

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2010-12-24 02:09:29 UTC (rev 2802)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2010-12-24 18:47:11 UTC (rev 2803)
@@ -33,7 +33,6 @@
 import java.lang.ref.SoftReference;
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -61,7 +60,6 @@
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.types.DataTypeManager;
-import org.teiid.core.types.Streamable;
 import org.teiid.core.util.Assertion;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
@@ -318,10 +316,6 @@
 						if (lobManager != null) {
 							for (List<?> tuple : batch.getTuples()) {
 								lobManager.updateReferences(batchManager.lobIndexes, tuple);
-								Collection<Streamable<?>> lobs = lobManager.getLobReferences();
-								for(Streamable<?> lob: lobs) {
-						            lobManager.persist(lob.getReferenceStreamId(), batchManager.store);
-								}
 							}
 						}
 						synchronized (batchManager.store) {
@@ -368,9 +362,6 @@
 			if (info != null) {
 				batchManager.unusedSpace.addAndGet(info[1]); 
 			}
-			if (lobManager != null) {
-				lobManager.clear();
-			}
 			activeBatch = null;
 			batchReference = null;
 		}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-12-24 02:09:29 UTC (rev 2802)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-12-24 18:47:11 UTC (rev 2803)
@@ -294,14 +294,12 @@
 				rowcount = resultsBuffer.getRowCount();
 				if (this.cid == null || !this.doneProducingBatches) {
 					resultsBuffer.remove();
-				}
-				
-				try {
-					if (cid != null && this.resultsBuffer.isLobs()) {
+				} else {
+					try {
 						this.resultsBuffer.persistLobs();
+					} catch (TeiidComponentException e) {
+						LogManager.logDetail(LogConstants.CTX_DQP, QueryPlugin.Util.getString("failed_to_cache")); //$NON-NLS-1$
 					}
-				} catch (TeiidComponentException e) {
-					LogManager.logDetail(LogConstants.CTX_DQP, QueryPlugin.Util.getString("failed_to_cache")); //$NON-NLS-1$
 				}
 				
 				for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {

Modified: trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java	2010-12-24 02:09:29 UTC (rev 2802)
+++ trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java	2010-12-24 18:47:11 UTC (rev 2803)
@@ -39,6 +39,7 @@
 import org.teiid.api.exception.query.QueryValidatorException;
 import org.teiid.client.plan.Annotation;
 import org.teiid.client.plan.Annotation.Priority;
+import org.teiid.common.buffer.LobManager;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.id.IDGenerator;
@@ -582,7 +583,7 @@
 				if (container instanceof StoredProcedure) {
 					boolean noCache = isNoCacheGroup(metadata, ((StoredProcedure) container).getProcedureID(), option);
 					if (!noCache) {
-						if (container.areResultsCachable() && Query.areResultsCachable(container.getProcedureParameters().keySet()) && context.isResultSetCacheEnabled()) {
+						if (context.isResultSetCacheEnabled() && container.areResultsCachable() && LobManager.getLobIndexes(new ArrayList<ElementSymbol>(container.getProcedureParameters().keySet())) == null) {
 							container.getGroup().setGlobalTable(true);
 							container.setCacheHint(c.getCacheHint());
 							recordAnnotation(analysisRecord, Annotation.CACHED_PROCEDURE, Priority.LOW, "SimpleQueryResolver.procedure_cache_used", container.getGroup()); //$NON-NLS-1$

Modified: trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLUtil.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLUtil.java	2010-12-24 02:09:29 UTC (rev 2802)
+++ trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLUtil.java	2010-12-24 18:47:11 UTC (rev 2803)
@@ -86,6 +86,11 @@
 		public void free() throws IOException {
 			lobBuffer.remove();
 		}
+		
+		@Override
+		public boolean isPersistent() {
+			return true;
+		}
 	}
 
     /**



More information about the teiid-commits mailing list