[teiid-commits] teiid SVN: r2800 - in trunk: engine/src/main/java/org/teiid/common/buffer and 8 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Thu Dec 23 17:11:02 EST 2010
Author: rareddy
Date: 2010-12-23 17:11:01 -0500 (Thu, 23 Dec 2010)
New Revision: 2800
Added:
trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java
Modified:
trunk/build/kits/jboss-container/teiid-releasenotes.html
trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
trunk/engine/src/main/java/org/teiid/query/sql/lang/Query.java
trunk/engine/src/main/resources/org/teiid/query/i18n.properties
trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
trunk/engine/src/test/java/org/teiid/query/sql/lang/TestQuery.java
Log:
TEIID-1227: adding support to cache lob values in the resultset to disk and releasing the lob references kept to release the connections.
Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html 2010-12-23 22:11:01 UTC (rev 2800)
@@ -39,6 +39,7 @@
be used in aggregation or joins and the optimizer will take advantage of the partitioning information. For example, when a partitioned union is joined against another partitioned union, the optimizer will reorganize the join of unions into a union of joins.
<LI><B>Delegate Translator</B> - A new translator added that is capable of delegating all the calls to another configured translator.
<LI><B>JDBC Reauthentication</B> - Teiid connections (defined by the org.teiid.jdbc.TeiidConnection interface) now support the changeUser method to reauthenticate a given connection.
+ <LI><B>Lob Caching</B> - Lobs are allowed to cache to disk as part of ResultSet caching. Distributed lob caching is not allowed.
</UL>
<h2><a name="Compatibility">Compatibility Issues</a></h2>
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -38,4 +38,5 @@
void remove();
+ FileStore createStorage(String prefix);
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -202,23 +202,38 @@
protected abstract void removeDirect();
- public InputStream createInputStream(final long start) {
+ public InputStream createInputStream(final long start, final long length) {
return new InputStream() {
private long offset = start;
+ private long streamLength = length;
@Override
public int read() throws IOException {
- throw new UnsupportedOperationException("buffered reading must be used"); //$NON-NLS-1$
+ byte[] buffer = new byte[1];
+ int read = read(buffer, 0, 1);
+ if (read == -1) {
+ return -1;
+ }
+ return buffer[0];
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
- int bytes = FileStore.this.read(offset, b, off, len);
- if (bytes != -1) {
- this.offset += bytes;
+ if (this.streamLength != -1 && len > this.streamLength) {
+ len = (int)this.streamLength;
}
- return bytes;
+ if (this.streamLength == -1 || this.streamLength > 0) {
+ int bytes = FileStore.this.read(offset, b, off, len);
+ if (bytes != -1) {
+ this.offset += bytes;
+ if (this.streamLength != -1) {
+ this.streamLength -= bytes;
+ }
+ }
+ return bytes;
+ }
+ return -1;
} catch (TeiidComponentException e) {
throw new IOException(e);
}
@@ -226,6 +241,10 @@
};
}
+ public InputStream createInputStream(final long start) {
+ return createInputStream(start, -1);
+ }
+
public OutputStream createOutputStream() {
return new OutputStream() {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -22,15 +22,33 @@
package org.teiid.common.buffer;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.types.BlobImpl;
+import org.teiid.core.types.BlobType;
+import org.teiid.core.types.ClobImpl;
+import org.teiid.core.types.ClobType;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.InputStreamFactory;
+import org.teiid.core.types.SQLXMLImpl;
import org.teiid.core.types.Streamable;
+import org.teiid.core.types.XMLType;
import org.teiid.query.QueryPlugin;
+import org.teiid.query.processor.xml.XMLUtil.FileStoreInputStreamFactory;
import org.teiid.query.sql.symbol.Expression;
/**
@@ -38,9 +56,10 @@
* TODO: for temp tables we may need to have a copy by value management strategy
*/
public class LobManager {
+ private static final int IO_BUFFER_SIZE = 1 << 14;
+ private Map<String, Streamable<?>> lobReferences = new ConcurrentHashMap<String, Streamable<?>>();
+ private Map<String, Streamable<?>> lobFilestores = new ConcurrentHashMap<String, Streamable<?>>();
- private Map<String, Streamable<?>> lobReferences = new ConcurrentHashMap<String, Streamable<?>>();
-
public void updateReferences(int[] lobIndexes, List<?> tuple)
throws TeiidComponentException {
for (int i = 0; i < lobIndexes.length; i++) {
@@ -63,14 +82,20 @@
if (this.lobReferences != null) {
lob = this.lobReferences.get(id);
}
+
if (lob == null) {
+ lob = this.lobFilestores.get(id);
+ }
+
+ if (lob == null) {
throw new TeiidComponentException(QueryPlugin.Util.getString("ProcessWorker.wrongdata")); //$NON-NLS-1$
}
return lob;
}
-
+
public void clear() {
this.lobReferences.clear();
+ this.lobFilestores.clear();
}
public static int[] getLobIndexes(List expressions) {
@@ -91,4 +116,108 @@
return Arrays.copyOf(result, resultIndex);
}
+ public Collection<Streamable<?>> getLobReferences(){
+ return lobReferences.values();
+ }
+
+ public void persist(FileStore lobStore) throws TeiidComponentException {
+ ArrayList<Streamable<?>> lobs = new ArrayList<Streamable<?>>(this.lobReferences.values());
+ for (Streamable<?> lob:lobs) {
+ persist(lob.getReferenceStreamId(), lobStore);
+ }
+ }
+
+ public Streamable<?> persist(String id, FileStore fs) throws TeiidComponentException {
+ Streamable<?> persistedLob = this.lobFilestores.get(id);
+ if (persistedLob == null) {
+ Streamable<?> lobReference = this.lobReferences.get(id);
+ if (lobReference == null) {
+ throw new TeiidComponentException(QueryPlugin.Util.getString("ProcessWorker.wrongdata")); //$NON-NLS-1$
+ }
+
+ persistedLob = persistLob(lobReference, fs);
+ synchronized (this) {
+ this.lobFilestores.put(id, persistedLob);
+ this.lobReferences.remove(id);
+ }
+ }
+ return persistedLob;
+ }
+
+ private Streamable<?> persistLob(final Streamable<?> lob, final FileStore store) throws TeiidComponentException {
+ long offset = store.getLength();
+ int length = 0;
+ Streamable<?> persistedLob;
+
+ // if this is XML and already saved to disk just return
+ if (lob.getReference() instanceof SQLXMLImpl) {
+ try {
+ SQLXMLImpl xml = (SQLXMLImpl)lob.getReference();
+ InputStreamFactory isf = xml.getStreamFactory();
+ if (isf instanceof FileStoreInputStreamFactory) {
+ return lob;
+ }
+ } catch (SQLException e) {
+ // go through regular persistence.
+ }
+ }
+
+ // stream the contents of lob into file store.
+ byte[] bytes = new byte[102400]; // 100k
+ try {
+ InputStreamFactory isf = new InputStreamFactory() {
+ @Override
+ public InputStream getInputStream() throws IOException {
+ if (lob instanceof BlobType) {
+ return new BlobInputStreamFactory((Blob)lob).getInputStream();
+ }
+ else if (lob instanceof ClobType) {
+ return new ClobInputStreamFactory((Clob)lob).getInputStream();
+ }
+ return new SQLXMLInputStreamFactory((SQLXML)lob).getInputStream();
+ }
+ };
+ InputStream is = isf.getInputStream();
+ OutputStream fsos = new BufferedOutputStream(store.createOutputStream(), IO_BUFFER_SIZE);
+ while(true) {
+ int read = is.read(bytes, 0, 102400);
+ if (read == -1) {
+ break;
+ }
+ length += read;
+ fsos.write(bytes, 0, read);
+ }
+ fsos.close();
+ is.close();
+ } catch (IOException e) {
+ throw new TeiidComponentException(e);
+ }
+
+ // re-construct the new lobs based on the file store
+ final long lobOffset = offset;
+ final int lobLength = length;
+ InputStreamFactory isf = new InputStreamFactory() {
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return store.createInputStream(lobOffset, lobLength);
+ }
+ };
+
+ try {
+ if (lob instanceof BlobType) {
+ persistedLob = new BlobType(new BlobImpl(isf));
+ }
+ else if (lob instanceof ClobType) {
+ persistedLob = new ClobType(new ClobImpl(isf, ((ClobType)lob).length()));
+ }
+ else {
+ persistedLob = new XMLType(new SQLXMLImpl(isf));
+ ((XMLType)persistedLob).setEncoding(((XMLType)lob).getEncoding());
+ ((XMLType)persistedLob).setType(((XMLType)lob).getType());
+ }
+ } catch (SQLException e) {
+ throw new TeiidComponentException(e);
+ }
+ return persistedLob;
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -77,6 +77,7 @@
private LobManager lobManager;
private int[] lobIndexes;
private String uuid;
+ private FileStore lobStore;
public TupleBuffer(BatchManager manager, String id, List<?> schema, int[] lobIndexes, int batchSize) {
this.manager = manager;
@@ -86,6 +87,8 @@
this.lobIndexes = lobIndexes;
if (this.lobIndexes != null) {
this.lobManager = new LobManager();
+ this.lobStore = this.manager.createStorage("_lobs"); //$NON-NLS-1$
+ this.lobStore.setCleanupReference(this);
}
this.batchSize = batchSize;
}
@@ -155,6 +158,13 @@
this.batches.clear();
}
+ public void persistLobs() throws TeiidComponentException {
+ if (this.lobManager != null) {
+ saveBatch(true, true);
+ this.lobManager.persist(this.lobStore);
+ }
+ }
+
/**
* Force the persistence of any rows held in memory.
* @throws TeiidComponentException
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -33,6 +33,7 @@
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
@@ -60,6 +61,7 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.Streamable;
import org.teiid.core.util.Assertion;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
@@ -104,6 +106,10 @@
this.store.setCleanupReference(this);
this.lobIndexes = lobIndexes;
}
+
+ public FileStore createStorage(String prefix) {
+ return createFileStore(id+prefix);
+ }
@Override
public ManagedBatch createManagedBatch(TupleBatch batch, boolean softCache)
@@ -312,6 +318,10 @@
if (lobManager != null) {
for (List<?> tuple : batch.getTuples()) {
lobManager.updateReferences(batchManager.lobIndexes, tuple);
+ Collection<Streamable<?>> lobs = lobManager.getLobReferences();
+ for(Streamable<?> lob: lobs) {
+ lobManager.persist(lob.getReferenceStreamId(), batchManager.store);
+ }
}
}
synchronized (batchManager.store) {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -63,6 +63,7 @@
private int batchSize;
private String uuid;
private int rowCount;
+ private boolean hasLobs;
public String getId() {
return this.uuid;
@@ -86,6 +87,7 @@
this.types = TupleBuffer.getTypeNames(results.getSchema());
this.rowCount = results.getRowCount();
this.uuid = results.getId();
+ this.hasLobs = results.isLobs();
}
public void setCommand(Command command) {
@@ -120,6 +122,9 @@
public synchronized boolean restore(Cache cache, BufferManager bufferManager) {
try {
if (this.results == null) {
+ if (this.hasLobs) {
+ return false;
+ }
List<ElementSymbol> schema = new ArrayList<ElementSymbol>(types.length);
for (String type : types) {
ElementSymbol es = new ElementSymbol("x"); //$NON-NLS-1$
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -558,6 +558,7 @@
private void clearResultSetCache(String vdbName, int version) {
//clear cache in server
if(rsCache != null){
+ LogManager.logInfo(LogConstants.CTX_DQP, QueryPlugin.Util.getString("DQPCore.clearing_resultset_cache", vdbName, version)); //$NON-NLS-1$
rsCache.clearForVDB(vdbName, version);
}
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -296,6 +296,14 @@
resultsBuffer.remove();
}
+ try {
+ if (cid != null && this.resultsBuffer.isLobs()) {
+ this.resultsBuffer.persistLobs();
+ }
+ } catch (TeiidComponentException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, QueryPlugin.Util.getString("failed_to_cache")); //$NON-NLS-1$
+ }
+
for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
connectorRequest.fullyCloseSource();
}
@@ -377,7 +385,7 @@
collector = new BatchCollector(processor, resultsBuffer) {
protected void flushBatchDirect(TupleBatch batch, boolean add) throws TeiidComponentException,TeiidProcessingException {
boolean added = false;
- if (cid != null || resultsBuffer.isLobs()) {
+ if (cid != null) {
super.flushBatchDirect(batch, add);
added = true;
}
@@ -391,7 +399,7 @@
cr.setAnalysisRecord(analysisRecord);
cr.setResults(resultsBuffer);
- if (originalCommand.getCacheHint() != null && originalCommand.getCacheHint().getDeterminism() != null) {
+ if (originalCommand.getCacheHint() != null && originalCommand.getCacheHint().getDeterminism() != null) {
determinismLevel = originalCommand.getCacheHint().getDeterminism();
LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified the query determinism from ",processor.getContext().getDeterminismLevel(), " to ", determinismLevel }); //$NON-NLS-1$ //$NON-NLS-2$
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -253,7 +253,7 @@
}
Class<?> type = DataTypeManager.determineDataTypeClass(obj);
- if (DataTypeManager.isLOB(type) || type == DataTypeManager.DefaultDataClasses.OBJECT) {
+ if (type == DataTypeManager.DefaultDataClasses.OBJECT) {
return false;
}
this.parameters.add((Serializable)obj);
Modified: trunk/engine/src/main/java/org/teiid/query/sql/lang/Query.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/sql/lang/Query.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/java/org/teiid/query/sql/lang/Query.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -431,7 +431,7 @@
public static boolean areResultsCachable(Collection<? extends SingleElementSymbol> projectedSymbols) {
for (SingleElementSymbol projectedSymbol : projectedSymbols) {
- if(DataTypeManager.isLOB(projectedSymbol.getType()) || projectedSymbol.getType() == DataTypeManager.DefaultDataClasses.OBJECT) {
+ if(projectedSymbol.getType() == DataTypeManager.DefaultDataClasses.OBJECT) {
return false;
}
}
Modified: trunk/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2010-12-23 22:11:01 UTC (rev 2800)
@@ -830,6 +830,7 @@
DQPCore.Unable_to_load_metadata_for_VDB_name__{0},_version__{1}=Unable to load metadata for VDB name= {0}, version= {1}
DQPCore.Unknown_query_metadata_exception_while_registering_query__{0}.=Unknown query metadata exception while registering query: {0}.
DQPCore.Clearing_prepared_plan_cache=Clearing prepared plan cache
+DQPCore.clearing_resultset_cache=Clearing the resultset cache for vdb {0}.{1}
DQPCore.The_request_has_been_closed.=The request {0} has been closed.
DQPCore.The_atomic_request_has_been_cancelled=The atomic request {0} has been canceled.
DQPCore.failed_to_cancel=Failed to Cancel request, as request already finished processing
@@ -888,6 +889,7 @@
RequestWorkItem.cache_nondeterministic=Caching command "{0}" at a session level, but less deterministic functions were evaluated.
not_found_cache=Results not found in cache
+failed_to_cache=Failed to store the result set contents to disk.
failed_to_unwrap_connection=Failed to unwrap the source connection.
connection_factory_not_found=Failed to find the Connection Factory with JNDI name {0}. Please check the name or deploy the Connection Factory with specified name.
Added: trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java (rev 0)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.common.buffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import org.junit.Test;
+import org.teiid.common.buffer.impl.BufferManagerImpl;
+import org.teiid.core.types.BlobImpl;
+import org.teiid.core.types.BlobType;
+import org.teiid.core.types.ClobImpl;
+import org.teiid.core.types.ClobType;
+import org.teiid.core.types.InputStreamFactory;
+import org.teiid.core.types.Streamable;
+import org.teiid.core.util.ReaderInputStream;
+
+ at SuppressWarnings("nls")
+public class TestLobManager {
+
+ @Test
+ public void testLobPeristence() throws Exception{
+
+ BufferManagerImpl buffMgr = BufferManagerFactory.createBufferManager();
+ FileStore fs = buffMgr.createFileStore("temp");
+
+ ClobType clob = new ClobType(new ClobImpl(new InputStreamFactory() {
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return new ReaderInputStream(new StringReader("Clob contents One"), Charset.forName(Streamable.ENCODING));
+ }
+
+ }, -1));
+
+ BlobType blob = new BlobType(new BlobImpl(new InputStreamFactory() {
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return new ReaderInputStream(new StringReader("Blob contents Two"), Charset.forName(Streamable.ENCODING));
+ }
+
+ }));
+
+ LobManager lobManager = new LobManager();
+ lobManager.updateReferences(new int[] {0,1}, Arrays.asList(clob, blob));
+ lobManager.persist(fs);
+
+ Streamable<?>lob = lobManager.getLobReference(clob.getReferenceStreamId());
+ assertTrue(lob.getClass().isAssignableFrom(ClobType.class));
+ ClobType clobRead = (ClobType)lob;
+ assertEquals(ClobType.getString(clob), ClobType.getString(clobRead));
+ assertTrue(clobRead.length() != -1);
+
+ lob = lobManager.getLobReference(blob.getReferenceStreamId());
+ assertTrue(lob.getClass().isAssignableFrom(BlobType.class));
+ BlobType blobRead = (BlobType)lob;
+ assertTrue(Arrays.equals(read(blob.getBinaryStream()), read(blobRead.getBinaryStream())));
+
+ }
+
+ private byte[] read(InputStream in) throws Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] bytes = new byte[100];
+ while (true) {
+ int c = in.read(bytes, 0, 100);
+ if (c == -1) {
+ break;
+ }
+ out.write(c);
+ }
+ return out.toByteArray();
+ }
+}
Property changes on: trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -30,6 +30,7 @@
import javax.sql.rowset.serial.SerialClob;
import org.junit.Test;
+import org.mockito.Mockito;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.types.ClobType;
import org.teiid.core.types.DataTypeManager;
@@ -61,6 +62,11 @@
}
};
}
+
+ @Override
+ public FileStore createStorage(String prefix) {
+ return Mockito.mock(FileStore.class);
+ }
}
@Test public void testForwardOnly() throws Exception {
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -25,6 +25,8 @@
import static org.junit.Assert.*;
import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.Map;
@@ -36,6 +38,7 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.util.UnitTestUtil;
+ at SuppressWarnings("nls")
public class TestFileStorageManager {
public FileStorageManager getStorageManager(Integer maxFileSize, Integer openFiles, String dir) throws TeiidComponentException {
@@ -111,4 +114,37 @@
assertTrue(Arrays.equals(bytes, bytesRead));
}
+
+ @Test public void testWritingMultipleFiles() throws Exception {
+ FileStorageManager sm = getStorageManager(1024, null, null);
+ String tsID = "0"; //$NON-NLS-1$
+ // Add one batch
+ FileStore store = sm.createFileStore(tsID);
+ String contentOrig = new String("some file content this will stored in same tmp file with another");
+ OutputStream out = store.createOutputStream();
+ out.write(contentOrig.getBytes(), 0, contentOrig.getBytes().length);
+ out.close();
+
+ out = store.createOutputStream();
+ long start = store.getLength();
+ byte[] bytesOrig = new byte[2048];
+ r.nextBytes(bytesOrig);
+ out.write(bytesOrig, 0, 2048);
+
+ byte[] readContent = new byte[2048];
+ InputStream in = store.createInputStream(0, contentOrig.getBytes().length);
+ int c = in.read(readContent, 0, 3000);
+ assertEquals(contentOrig, new String(readContent, 0, c));
+ c = in.read(readContent, 0, 3000);
+ assertEquals(-1, c);
+ in.close();
+
+ in = store.createInputStream(start, 2048);
+ c = in.read(readContent, 0, 3000);
+ assertTrue(Arrays.equals(bytesOrig, readContent));
+ c = in.read(readContent, 0, 3000);
+ assertEquals(-1, c);
+ in.close();
+ }
+
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -32,10 +32,12 @@
import java.util.List;
import org.junit.Test;
+import org.mockito.Mockito;
import org.teiid.cache.Cache;
import org.teiid.cache.DefaultCache;
import org.teiid.common.buffer.BatchManager;
import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.FileStore;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.core.TeiidComponentException;
@@ -70,6 +72,11 @@
}
};
}
+
+ @Override
+ public FileStore createStorage(String prefix) {
+ return Mockito.mock(FileStore.class);
+ }
}
@Test
Modified: trunk/engine/src/test/java/org/teiid/query/sql/lang/TestQuery.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/sql/lang/TestQuery.java 2010-12-23 21:35:33 UTC (rev 2799)
+++ trunk/engine/src/test/java/org/teiid/query/sql/lang/TestQuery.java 2010-12-23 22:11:01 UTC (rev 2800)
@@ -135,21 +135,21 @@
select.addSymbol(column);
query.setSelect(select);
query.setFrom(from);
- assertTrue(!query.areResultsCachable());
+ assertTrue(query.areResultsCachable());
select = new Select();
column = new ElementSymbol("y");//$NON-NLS-1$
column.setType(ClobType.class);
select.addSymbol(column);
query.setSelect(select);
query.setFrom(from);
- assertTrue(!query.areResultsCachable());
+ assertTrue(query.areResultsCachable());
select = new Select();
column = new ElementSymbol("y");//$NON-NLS-1$
column.setType(XMLType.class);
select.addSymbol(column);
query.setSelect(select);
query.setFrom(from);
- assertTrue(!query.areResultsCachable());
+ assertTrue(query.areResultsCachable());
}
public void testClone2() {
More information about the teiid-commits
mailing list