[teiid-commits] teiid SVN: r2800 - in trunk: engine/src/main/java/org/teiid/common/buffer and 8 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Dec 23 17:11:02 EST 2010


Author: rareddy
Date: 2010-12-23 17:11:01 -0500 (Thu, 23 Dec 2010)
New Revision: 2800

Added:
   trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java
Modified:
   trunk/build/kits/jboss-container/teiid-releasenotes.html
   trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
   trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
   trunk/engine/src/main/java/org/teiid/query/sql/lang/Query.java
   trunk/engine/src/main/resources/org/teiid/query/i18n.properties
   trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
   trunk/engine/src/test/java/org/teiid/query/sql/lang/TestQuery.java
Log:
TEIID-1227: adding support to cache lob values in the resultset to disk and releasing the lob references kept to release the connections.

Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html	2010-12-23 22:11:01 UTC (rev 2800)
@@ -39,6 +39,7 @@
 	be used in aggregation or joins and the optimizer will take advantage of the partitioning information.  For example, when a partitioned union is joined against another partitioned union, the optimizer will reorganize the join of unions into a union of joins.
 	<LI><B>Delegate Translator</B> - A new translator added that is capable of delegating all the calls to another configured translator.
 	<LI><B>JDBC Reauthentication</B> - Teiid connections (defined by the org.teiid.jdbc.TeiidConnection interface) now support the changeUser method to reauthenticate a given connection.
+	<LI><B>Lob Caching</B> - Lobs are allowed to cache to disk as part of ResultSet caching. Distributed lob caching is not allowed. 
 </UL>
 
 <h2><a name="Compatibility">Compatibility Issues</a></h2>

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -38,4 +38,5 @@
 	
 	void remove();
 	
+	FileStore createStorage(String prefix);
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -202,23 +202,38 @@
 	
 	protected abstract void removeDirect();
 	
-	public InputStream createInputStream(final long start) {
+	public InputStream createInputStream(final long start, final long length) {
 		return new InputStream() {
 			private long offset = start;
+			private long streamLength = length;
 			
 			@Override
 			public int read() throws IOException {
-				throw new UnsupportedOperationException("buffered reading must be used"); //$NON-NLS-1$
+				byte[] buffer = new byte[1];
+				int read = read(buffer, 0, 1);
+				if (read == -1) {
+					return -1;
+				}
+				return buffer[0];
 			}
 			
 			@Override
 			public int read(byte[] b, int off, int len) throws IOException {
 				try {
-					int bytes = FileStore.this.read(offset, b, off, len);
-					if (bytes != -1) {
-						this.offset += bytes;
+					if (this.streamLength != -1 && len > this.streamLength) {
+						len = (int)this.streamLength;
 					}
-					return bytes;
+					if (this.streamLength == -1 || this.streamLength > 0) {
+						int bytes = FileStore.this.read(offset, b, off, len);
+						if (bytes != -1) {
+							this.offset += bytes;
+							if (this.streamLength != -1) {
+								this.streamLength -= bytes;
+							}
+						}
+						return bytes;
+					}
+					return -1;
 				} catch (TeiidComponentException e) {
 					throw new IOException(e);
 				}
@@ -226,6 +241,10 @@
 		};
 	}
 	
+	public InputStream createInputStream(final long start) {
+		return createInputStream(start, -1);
+	}
+	
 	public OutputStream createOutputStream() {
 		return new OutputStream() {
 			

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -22,15 +22,33 @@
 
 package org.teiid.common.buffer;
 
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.teiid.core.TeiidComponentException;
+import org.teiid.core.types.BlobImpl;
+import org.teiid.core.types.BlobType;
+import org.teiid.core.types.ClobImpl;
+import org.teiid.core.types.ClobType;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.InputStreamFactory;
+import org.teiid.core.types.SQLXMLImpl;
 import org.teiid.core.types.Streamable;
+import org.teiid.core.types.XMLType;
 import org.teiid.query.QueryPlugin;
+import org.teiid.query.processor.xml.XMLUtil.FileStoreInputStreamFactory;
 import org.teiid.query.sql.symbol.Expression;
 
 /**
@@ -38,9 +56,10 @@
  * TODO: for temp tables we may need to have a copy by value management strategy
  */
 public class LobManager {
+	private static final int IO_BUFFER_SIZE = 1 << 14;
+	private Map<String, Streamable<?>> lobReferences = new ConcurrentHashMap<String, Streamable<?>>();
+	private Map<String, Streamable<?>> lobFilestores = new ConcurrentHashMap<String, Streamable<?>>();
 
-	private Map<String, Streamable<?>> lobReferences = new ConcurrentHashMap<String, Streamable<?>>(); 
-
 	public void updateReferences(int[] lobIndexes, List<?> tuple)
 			throws TeiidComponentException {
 		for (int i = 0; i < lobIndexes.length; i++) {
@@ -63,14 +82,20 @@
     	if (this.lobReferences != null) {
     		lob = this.lobReferences.get(id);
     	}
+    	
     	if (lob == null) {
+			lob = this.lobFilestores.get(id);
+    	}
+    	
+    	if (lob == null) {
     		throw new TeiidComponentException(QueryPlugin.Util.getString("ProcessWorker.wrongdata")); //$NON-NLS-1$
     	}
     	return lob;
     }
-    
+        
     public void clear() {
     	this.lobReferences.clear();
+    	this.lobFilestores.clear();
     }
     
     public static int[] getLobIndexes(List expressions) {
@@ -91,4 +116,108 @@
 	    return Arrays.copyOf(result, resultIndex);
     }
 
+    public Collection<Streamable<?>> getLobReferences(){
+    	return lobReferences.values();
+    }
+    
+	public void persist(FileStore lobStore) throws TeiidComponentException {
+		ArrayList<Streamable<?>> lobs = new ArrayList<Streamable<?>>(this.lobReferences.values());
+		for (Streamable<?> lob:lobs) {
+			persist(lob.getReferenceStreamId(), lobStore);
+		}
+	}    
+    
+	public Streamable<?> persist(String id, FileStore fs) throws TeiidComponentException {
+		Streamable<?> persistedLob = this.lobFilestores.get(id);
+		if (persistedLob == null) {
+			Streamable<?> lobReference =  this.lobReferences.get(id);
+			if (lobReference == null) {
+	    		throw new TeiidComponentException(QueryPlugin.Util.getString("ProcessWorker.wrongdata")); //$NON-NLS-1$
+	    	}
+	    	
+	    	persistedLob = persistLob(lobReference, fs);
+			synchronized (this) {
+				this.lobFilestores.put(id, persistedLob);
+				this.lobReferences.remove(id);		
+			}
+		}
+		return persistedLob;
+	}
+	
+	private Streamable<?> persistLob(final Streamable<?> lob, final FileStore store) throws TeiidComponentException {
+		long offset = store.getLength();
+		int length = 0;
+		Streamable<?> persistedLob;
+		
+		// if this is XML and already saved to disk just return
+		if (lob.getReference() instanceof SQLXMLImpl) {
+			try {
+				SQLXMLImpl xml = (SQLXMLImpl)lob.getReference();
+				InputStreamFactory isf = xml.getStreamFactory();
+				if (isf instanceof FileStoreInputStreamFactory) {
+					return lob;
+				}
+			} catch (SQLException e) {
+				// go through regular persistence.
+			}
+		}
+					
+		// stream the contents of lob into file store.
+		byte[] bytes = new byte[102400]; // 100k
+		try {
+			InputStreamFactory isf = new InputStreamFactory() {
+				@Override
+				public InputStream getInputStream() throws IOException {
+			    	if (lob instanceof BlobType) {
+			    		return new BlobInputStreamFactory((Blob)lob).getInputStream();
+			    	}
+			    	else if (lob instanceof ClobType) {
+			    		return new ClobInputStreamFactory((Clob)lob).getInputStream();
+			    	}
+			    	return new SQLXMLInputStreamFactory((SQLXML)lob).getInputStream();
+				}					
+			};
+			InputStream is = isf.getInputStream();
+			OutputStream fsos = new BufferedOutputStream(store.createOutputStream(), IO_BUFFER_SIZE);
+			while(true) {
+				int read = is.read(bytes, 0, 102400);
+				if (read == -1) {
+					break;
+				}
+				length += read;
+				fsos.write(bytes, 0, read);
+			}
+			fsos.close();
+			is.close();
+		} catch (IOException e) {
+			throw new TeiidComponentException(e);
+		}		
+		
+		// re-construct the new lobs based on the file store
+		final long lobOffset = offset;
+		final int lobLength = length;
+		InputStreamFactory isf = new InputStreamFactory() {
+			@Override
+			public InputStream getInputStream() throws IOException {
+				return store.createInputStream(lobOffset, lobLength);
+			}
+		};			
+		
+		try {
+			if (lob instanceof BlobType) {
+				persistedLob = new BlobType(new BlobImpl(isf));
+			}
+			else if (lob instanceof ClobType) {
+				persistedLob = new ClobType(new ClobImpl(isf, ((ClobType)lob).length()));
+			}
+			else {
+				persistedLob = new XMLType(new SQLXMLImpl(isf));
+				((XMLType)persistedLob).setEncoding(((XMLType)lob).getEncoding());
+				((XMLType)persistedLob).setType(((XMLType)lob).getType());
+			}
+		} catch (SQLException e) {
+			throw new TeiidComponentException(e);
+		}		
+		return persistedLob;		
+	}
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -77,6 +77,7 @@
 	private LobManager lobManager;
 	private int[] lobIndexes;
 	private String uuid;
+	private FileStore lobStore;
 	
 	public TupleBuffer(BatchManager manager, String id, List<?> schema, int[] lobIndexes, int batchSize) {
 		this.manager = manager;
@@ -86,6 +87,8 @@
 		this.lobIndexes = lobIndexes;
 		if (this.lobIndexes != null) {
 			this.lobManager = new LobManager();
+			this.lobStore = this.manager.createStorage("_lobs"); //$NON-NLS-1$
+			this.lobStore.setCleanupReference(this);
 		}
 		this.batchSize = batchSize;		
 	}
@@ -155,6 +158,13 @@
 		this.batches.clear();
 	}
 	
+	public void persistLobs() throws TeiidComponentException {
+		if (this.lobManager != null) {
+			saveBatch(true, true);
+			this.lobManager.persist(this.lobStore);
+		}
+	}
+	
 	/**
 	 * Force the persistence of any rows held in memory.
 	 * @throws TeiidComponentException

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -33,6 +33,7 @@
 import java.lang.ref.SoftReference;
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -60,6 +61,7 @@
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.Streamable;
 import org.teiid.core.util.Assertion;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
@@ -104,6 +106,10 @@
 			this.store.setCleanupReference(this);
 			this.lobIndexes = lobIndexes;
 		}
+		
+		public FileStore createStorage(String prefix) {
+			return createFileStore(id+prefix);
+		}
 
 		@Override
 		public ManagedBatch createManagedBatch(TupleBatch batch, boolean softCache)
@@ -312,6 +318,10 @@
 						if (lobManager != null) {
 							for (List<?> tuple : batch.getTuples()) {
 								lobManager.updateReferences(batchManager.lobIndexes, tuple);
+								Collection<Streamable<?>> lobs = lobManager.getLobReferences();
+								for(Streamable<?> lob: lobs) {
+						            lobManager.persist(lob.getReferenceStreamId(), batchManager.store);
+								}
 							}
 						}
 						synchronized (batchManager.store) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -63,6 +63,7 @@
 	private int batchSize;
 	private String uuid;
 	private int rowCount;
+	private boolean hasLobs;
 	
 	public String getId() {
 		return this.uuid;
@@ -86,6 +87,7 @@
 		this.types = TupleBuffer.getTypeNames(results.getSchema());
 		this.rowCount = results.getRowCount();
 		this.uuid = results.getId();
+		this.hasLobs = results.isLobs();
 	}
 	
 	public void setCommand(Command command) {
@@ -120,6 +122,9 @@
 	public synchronized boolean restore(Cache cache, BufferManager bufferManager) {
 		try {
 			if (this.results == null) {
+				if (this.hasLobs) {
+					return false;
+				}
 				List<ElementSymbol> schema = new ArrayList<ElementSymbol>(types.length);
 				for (String type : types) {
 					ElementSymbol es = new ElementSymbol("x"); //$NON-NLS-1$

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -558,6 +558,7 @@
 	private void clearResultSetCache(String vdbName, int version) {
 		//clear cache in server
 		if(rsCache != null){
+			LogManager.logInfo(LogConstants.CTX_DQP, QueryPlugin.Util.getString("DQPCore.clearing_resultset_cache", vdbName, version)); //$NON-NLS-1$
 			rsCache.clearForVDB(vdbName, version);
 		}
 	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -296,6 +296,14 @@
 					resultsBuffer.remove();
 				}
 				
+				try {
+					if (cid != null && this.resultsBuffer.isLobs()) {
+						this.resultsBuffer.persistLobs();
+					}
+				} catch (TeiidComponentException e) {
+					LogManager.logDetail(LogConstants.CTX_DQP, QueryPlugin.Util.getString("failed_to_cache")); //$NON-NLS-1$
+				}
+				
 				for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
 					connectorRequest.fullyCloseSource();
 			    }
@@ -377,7 +385,7 @@
 		collector = new BatchCollector(processor, resultsBuffer) {
 			protected void flushBatchDirect(TupleBatch batch, boolean add) throws TeiidComponentException,TeiidProcessingException {
 				boolean added = false;
-				if (cid != null || resultsBuffer.isLobs()) {
+				if (cid != null) {
 					super.flushBatchDirect(batch, add);
 					added = true;
 				}
@@ -391,7 +399,7 @@
 	                cr.setAnalysisRecord(analysisRecord);
 	                cr.setResults(resultsBuffer);
 	                
-					if (originalCommand.getCacheHint() != null && originalCommand.getCacheHint().getDeterminism() != null) {
+	                if (originalCommand.getCacheHint() != null && originalCommand.getCacheHint().getDeterminism() != null) {
 						determinismLevel = originalCommand.getCacheHint().getDeterminism();
 						LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified the query determinism from ",processor.getContext().getDeterminismLevel(), " to ", determinismLevel }); //$NON-NLS-1$ //$NON-NLS-2$
 					}		                

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -253,7 +253,7 @@
 					}
 					
 					Class<?> type = DataTypeManager.determineDataTypeClass(obj);
-					if (DataTypeManager.isLOB(type) || type == DataTypeManager.DefaultDataClasses.OBJECT) {
+					if (type == DataTypeManager.DefaultDataClasses.OBJECT) {
 						return false;
 					}
 					this.parameters.add((Serializable)obj);

Modified: trunk/engine/src/main/java/org/teiid/query/sql/lang/Query.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/sql/lang/Query.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/query/sql/lang/Query.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -431,7 +431,7 @@
 
 	public static boolean areResultsCachable(Collection<? extends SingleElementSymbol> projectedSymbols) {
 		for (SingleElementSymbol projectedSymbol : projectedSymbols) {
-			if(DataTypeManager.isLOB(projectedSymbol.getType()) || projectedSymbol.getType() == DataTypeManager.DefaultDataClasses.OBJECT) {
+			if(projectedSymbol.getType() == DataTypeManager.DefaultDataClasses.OBJECT) {
 				return false;
 			}
 		}

Modified: trunk/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/org/teiid/query/i18n.properties	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/resources/org/teiid/query/i18n.properties	2010-12-23 22:11:01 UTC (rev 2800)
@@ -830,6 +830,7 @@
 DQPCore.Unable_to_load_metadata_for_VDB_name__{0},_version__{1}=Unable to load metadata for VDB name= {0}, version= {1}
 DQPCore.Unknown_query_metadata_exception_while_registering_query__{0}.=Unknown query metadata exception while registering query: {0}.
 DQPCore.Clearing_prepared_plan_cache=Clearing prepared plan cache
+DQPCore.clearing_resultset_cache=Clearing the resultset cache for vdb {0}.{1}
 DQPCore.The_request_has_been_closed.=The request {0} has been closed.
 DQPCore.The_atomic_request_has_been_cancelled=The atomic request {0} has been canceled.
 DQPCore.failed_to_cancel=Failed to Cancel request, as request already finished processing
@@ -888,6 +889,7 @@
 
 RequestWorkItem.cache_nondeterministic=Caching command "{0}" at a session level, but less deterministic functions were evaluated. 
 not_found_cache=Results not found in cache
+failed_to_cache=Failed to store the result set contents to disk.
 failed_to_unwrap_connection=Failed to unwrap the source connection.
 connection_factory_not_found=Failed to find the Connection Factory with JNDI name {0}. Please check the name or deploy the Connection Factory with specified name.
 

Added: trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java	                        (rev 0)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.common.buffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import org.junit.Test;
+import org.teiid.common.buffer.impl.BufferManagerImpl;
+import org.teiid.core.types.BlobImpl;
+import org.teiid.core.types.BlobType;
+import org.teiid.core.types.ClobImpl;
+import org.teiid.core.types.ClobType;
+import org.teiid.core.types.InputStreamFactory;
+import org.teiid.core.types.Streamable;
+import org.teiid.core.util.ReaderInputStream;
+
+ at SuppressWarnings("nls")
+public class TestLobManager {
+
+	@Test
+	public void testLobPeristence() throws Exception{
+		
+		BufferManagerImpl buffMgr = BufferManagerFactory.createBufferManager();
+		FileStore fs = buffMgr.createFileStore("temp");
+		
+		ClobType clob = new ClobType(new ClobImpl(new InputStreamFactory() {
+			@Override
+			public InputStream getInputStream() throws IOException {
+				return new ReaderInputStream(new StringReader("Clob contents One"),  Charset.forName(Streamable.ENCODING)); 
+			}
+			
+		}, -1));
+		
+		BlobType blob = new BlobType(new BlobImpl(new InputStreamFactory() {
+			@Override
+			public InputStream getInputStream() throws IOException {
+				return new ReaderInputStream(new StringReader("Blob contents Two"),  Charset.forName(Streamable.ENCODING));
+			}
+			
+		}));		
+		
+		LobManager lobManager = new LobManager();
+		lobManager.updateReferences(new int[] {0,1}, Arrays.asList(clob, blob));
+		lobManager.persist(fs);
+		
+		Streamable<?>lob = lobManager.getLobReference(clob.getReferenceStreamId());
+		assertTrue(lob.getClass().isAssignableFrom(ClobType.class));
+		ClobType clobRead = (ClobType)lob;
+		assertEquals(ClobType.getString(clob), ClobType.getString(clobRead));
+		assertTrue(clobRead.length() != -1);
+		
+		lob = lobManager.getLobReference(blob.getReferenceStreamId());
+		assertTrue(lob.getClass().isAssignableFrom(BlobType.class));
+		BlobType blobRead = (BlobType)lob;
+		assertTrue(Arrays.equals(read(blob.getBinaryStream()), read(blobRead.getBinaryStream())));
+		
+	}
+	
+	private byte[] read(InputStream in) throws Exception {
+		ByteArrayOutputStream out = new ByteArrayOutputStream();
+		byte[] bytes = new byte[100];
+		while (true) {
+			int c = in.read(bytes, 0, 100);
+			if (c == -1) {
+				break;
+			}
+			out.write(c);
+		}
+		return out.toByteArray();
+	}
+}


Property changes on: trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -30,6 +30,7 @@
 import javax.sql.rowset.serial.SerialClob;
 
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.types.ClobType;
 import org.teiid.core.types.DataTypeManager;
@@ -61,6 +62,11 @@
 				}
 			};
 		}
+
+		@Override
+		public FileStore createStorage(String prefix) {
+			return Mockito.mock(FileStore.class);
+		}
 	}
 
 	@Test public void testForwardOnly() throws Exception {

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -25,6 +25,8 @@
 import static org.junit.Assert.*;
 
 import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.util.Arrays;
 import java.util.Map;
@@ -36,6 +38,7 @@
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.util.UnitTestUtil;
 
+ at SuppressWarnings("nls")
 public class TestFileStorageManager {
 		
 	public FileStorageManager getStorageManager(Integer maxFileSize, Integer openFiles, String dir) throws TeiidComponentException {
@@ -111,4 +114,37 @@
         assertTrue(Arrays.equals(bytes, bytesRead));
 	}
     
+	
+    @Test public void testWritingMultipleFiles() throws Exception {
+    	FileStorageManager sm = getStorageManager(1024, null, null); 
+        String tsID = "0";     //$NON-NLS-1$
+        // Add one batch
+        FileStore store = sm.createFileStore(tsID);
+        String contentOrig = new String("some file content this will stored in same tmp file with another");
+        OutputStream out = store.createOutputStream();
+        out.write(contentOrig.getBytes(), 0, contentOrig.getBytes().length);
+        out.close();
+
+        out = store.createOutputStream();
+        long start = store.getLength();
+        byte[] bytesOrig = new byte[2048];
+        r.nextBytes(bytesOrig);
+        out.write(bytesOrig, 0, 2048);
+        
+        byte[] readContent = new byte[2048];
+        InputStream in = store.createInputStream(0, contentOrig.getBytes().length);        
+    	int c = in.read(readContent, 0, 3000);
+       	assertEquals(contentOrig, new String(readContent, 0, c));       	
+       	c = in.read(readContent, 0, 3000);
+       	assertEquals(-1, c);
+       	in.close();
+        
+        in = store.createInputStream(start, 2048);
+        c = in.read(readContent, 0, 3000);
+        assertTrue(Arrays.equals(bytesOrig, readContent));
+       	c = in.read(readContent, 0, 3000);
+       	assertEquals(-1, c);
+       	in.close();        
+    }	
+	
 }

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -32,10 +32,12 @@
 import java.util.List;
 
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.teiid.cache.Cache;
 import org.teiid.cache.DefaultCache;
 import org.teiid.common.buffer.BatchManager;
 import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.FileStore;
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.core.TeiidComponentException;
@@ -70,6 +72,11 @@
 				}
 			};
 		}
+
+		@Override
+		public FileStore createStorage(String prefix) {
+			return Mockito.mock(FileStore.class);
+		}
 	}
 	
 	@Test

Modified: trunk/engine/src/test/java/org/teiid/query/sql/lang/TestQuery.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/sql/lang/TestQuery.java	2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/test/java/org/teiid/query/sql/lang/TestQuery.java	2010-12-23 22:11:01 UTC (rev 2800)
@@ -135,21 +135,21 @@
         select.addSymbol(column);        
         query.setSelect(select);        
         query.setFrom(from);
-        assertTrue(!query.areResultsCachable());
+        assertTrue(query.areResultsCachable());
         select = new Select();
         column = new ElementSymbol("y");//$NON-NLS-1$
         column.setType(ClobType.class);
         select.addSymbol(column);        
         query.setSelect(select);        
         query.setFrom(from);
-        assertTrue(!query.areResultsCachable());
+        assertTrue(query.areResultsCachable());
         select = new Select();
         column = new ElementSymbol("y");//$NON-NLS-1$
         column.setType(XMLType.class);
         select.addSymbol(column);        
         query.setSelect(select);        
         query.setFrom(from);
-        assertTrue(!query.areResultsCachable());        
+        assertTrue(query.areResultsCachable());        
     }
     
     public void testClone2() {    



More information about the teiid-commits mailing list