Author: shawkins
Date: 2010-12-24 13:47:11 -0500 (Fri, 24 Dec 2010)
New Revision: 2803
Modified:
trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java
trunk/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java
trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java
trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLUtil.java
Log:
TEIID-1227 expanding the persistent check and ensuring that procedures with lob params are
not cached.
Modified: trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java
===================================================================
---
trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java 2010-12-24
02:09:29 UTC (rev 2802)
+++
trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java 2010-12-24
18:47:11 UTC (rev 2803)
@@ -83,6 +83,10 @@
return null;
}
+ public boolean isPersistent() {
+ return false;
+ }
+
public static class FileInputStreamFactory extends InputStreamFactory {
private File f;
@@ -102,6 +106,11 @@
return new BufferedInputStream(new FileInputStream(f));
}
+ @Override
+ public boolean isPersistent() {
+ return true;
+ }
+
}
public static class ClobInputStreamFactory extends InputStreamFactory implements
DataSource {
Modified: trunk/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java
===================================================================
---
trunk/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java 2010-12-24
02:09:29 UTC (rev 2802)
+++
trunk/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java 2010-12-24
18:47:11 UTC (rev 2803)
@@ -95,10 +95,9 @@
return out.toByteArray();
}
- public static void write(final OutputStream out, final InputStream is, int length)
throws IOException {
+ public static void write(final OutputStream out, final InputStream is, byte[]
l_buffer, int length) throws IOException {
int writen = 0;
try {
- byte[] l_buffer = new byte[DEFAULT_READING_SIZE]; // buffer holding bytes to be
transferred
int l_nbytes = 0; // Number of bytes read
while ((l_nbytes = is.read(l_buffer)) != -1) {
if (length != -1 && writen > length - l_nbytes) {
@@ -117,6 +116,10 @@
}
}
+ public static void write(final OutputStream out, final InputStream is, int length)
throws IOException {
+ write(out, is, new byte[DEFAULT_READING_SIZE], length); // buffer holding bytes to
be transferred
+ }
+
public static void write(final Writer out, final Reader is, int length) throws
IOException {
int writen = 0;
try {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java 2010-12-24 02:09:29
UTC (rev 2802)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java 2010-12-24 18:47:11
UTC (rev 2803)
@@ -22,7 +22,6 @@
package org.teiid.common.buffer;
-import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -30,14 +29,13 @@
import java.sql.Clob;
import java.sql.SQLException;
import java.sql.SQLXML;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.types.BaseLob;
import org.teiid.core.types.BlobImpl;
import org.teiid.core.types.BlobType;
import org.teiid.core.types.ClobImpl;
@@ -47,8 +45,8 @@
import org.teiid.core.types.SQLXMLImpl;
import org.teiid.core.types.Streamable;
import org.teiid.core.types.XMLType;
+import org.teiid.core.util.ObjectConverterUtil;
import org.teiid.query.QueryPlugin;
-import org.teiid.query.processor.xml.XMLUtil.FileStoreInputStreamFactory;
import org.teiid.query.sql.symbol.Expression;
/**
@@ -56,9 +54,7 @@
* TODO: for temp tables we may need to have a copy by value management strategy
*/
public class LobManager {
- private static final int IO_BUFFER_SIZE = 1 << 14;
private Map<String, Streamable<?>> lobReferences = new
ConcurrentHashMap<String, Streamable<?>>();
- private Map<String, Streamable<?>> lobFilestores = new
ConcurrentHashMap<String, Streamable<?>>();
public void updateReferences(int[] lobIndexes, List<?> tuple)
throws TeiidComponentException {
@@ -78,26 +74,13 @@
}
public Streamable<?> getLobReference(String id) throws TeiidComponentException
{
- Streamable<?> lob = null;
- if (this.lobReferences != null) {
- lob = this.lobReferences.get(id);
- }
-
+ Streamable<?> lob = this.lobReferences.get(id);
if (lob == null) {
- lob = this.lobFilestores.get(id);
- }
-
- if (lob == null) {
throw new
TeiidComponentException(QueryPlugin.Util.getString("ProcessWorker.wrongdata"));
//$NON-NLS-1$
}
return lob;
}
- public void clear() {
- this.lobReferences.clear();
- this.lobFilestores.clear();
- }
-
public static int[] getLobIndexes(List expressions) {
if (expressions == null) {
return null;
@@ -116,54 +99,33 @@
return Arrays.copyOf(result, resultIndex);
}
- public Collection<Streamable<?>> getLobReferences(){
- return lobReferences.values();
- }
-
public void persist(FileStore lobStore) throws TeiidComponentException {
- ArrayList<Streamable<?>> lobs = new
ArrayList<Streamable<?>>(this.lobReferences.values());
- for (Streamable<?> lob:lobs) {
- persist(lob.getReferenceStreamId(), lobStore);
+ // stream the contents of lob into file store.
+ byte[] bytes = new byte[102400]; // 100k
+
+ for (Map.Entry<String, Streamable<?>> entry :
this.lobReferences.entrySet()) {
+ entry.setValue(persistLob(entry.getValue(), lobStore, bytes));
}
}
- public Streamable<?> persist(String id, FileStore fs) throws
TeiidComponentException {
- Streamable<?> persistedLob = this.lobFilestores.get(id);
- if (persistedLob == null) {
- Streamable<?> lobReference = this.lobReferences.get(id);
- if (lobReference == null) {
- throw new
TeiidComponentException(QueryPlugin.Util.getString("ProcessWorker.wrongdata"));
//$NON-NLS-1$
- }
-
- persistedLob = persistLob(lobReference, fs);
- synchronized (this) {
- this.lobFilestores.put(id, persistedLob);
- this.lobReferences.remove(id);
- }
- }
- return persistedLob;
- }
-
- private Streamable<?> persistLob(final Streamable<?> lob, final FileStore
store) throws TeiidComponentException {
- long offset = store.getLength();
- int length = 0;
- Streamable<?> persistedLob;
+ private Streamable<?> persistLob(final Streamable<?> lob, final FileStore
store, byte[] bytes) throws TeiidComponentException {
- // if this is XML and already saved to disk just return
- if (lob.getReference() instanceof SQLXMLImpl) {
+ // if this is already saved to disk just return
+ if (lob.getReference() instanceof BaseLob) {
try {
- SQLXMLImpl xml = (SQLXMLImpl)lob.getReference();
- InputStreamFactory isf = xml.getStreamFactory();
- if (isf instanceof FileStoreInputStreamFactory) {
+ BaseLob baseLob = (BaseLob)lob.getReference();
+ InputStreamFactory isf = baseLob.getStreamFactory();
+ if (isf.isPersistent()) {
return lob;
}
} catch (SQLException e) {
// go through regular persistence.
}
}
+ long offset = store.getLength();
+ int length = 0;
+ Streamable<?> persistedLob;
- // stream the contents of lob into file store.
- byte[] bytes = new byte[102400]; // 100k
try {
InputStreamFactory isf = new InputStreamFactory() {
@Override
@@ -178,20 +140,11 @@
}
};
InputStream is = isf.getInputStream();
- OutputStream fsos = new BufferedOutputStream(store.createOutputStream(),
IO_BUFFER_SIZE);
- while(true) {
- int read = is.read(bytes, 0, 102400);
- if (read == -1) {
- break;
- }
- length += read;
- fsos.write(bytes, 0, read);
- }
- fsos.close();
- is.close();
+ OutputStream fsos = store.createOutputStream();
+ ObjectConverterUtil.write(fsos, is, bytes, -1);
} catch (IOException e) {
throw new TeiidComponentException(e);
- }
+ }
// re-construct the new lobs based on the file store
final long lobOffset = offset;
@@ -201,6 +154,11 @@
public InputStream getInputStream() throws IOException {
return store.createInputStream(lobOffset, lobLength);
}
+
+ @Override
+ public boolean isPersistent() {
+ return true;
+ }
};
try {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2010-12-24
02:09:29 UTC (rev 2802)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2010-12-24
18:47:11 UTC (rev 2803)
@@ -160,7 +160,6 @@
public void persistLobs() throws TeiidComponentException {
if (this.lobManager != null) {
- saveBatch(true, true);
this.lobManager.persist(this.lobStore);
}
}
@@ -236,6 +235,9 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Removing
TupleBuffer:", this.tupleSourceID); //$NON-NLS-1$
}
+ if (this.lobStore != null) {
+ this.lobStore.remove();
+ }
this.batchBuffer = null;
purge();
this.manager.remove();
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2010-12-24
02:09:29 UTC (rev 2802)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2010-12-24
18:47:11 UTC (rev 2803)
@@ -33,7 +33,6 @@
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
@@ -61,7 +60,6 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.types.DataTypeManager;
-import org.teiid.core.types.Streamable;
import org.teiid.core.util.Assertion;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
@@ -318,10 +316,6 @@
if (lobManager != null) {
for (List<?> tuple : batch.getTuples()) {
lobManager.updateReferences(batchManager.lobIndexes, tuple);
- Collection<Streamable<?>> lobs = lobManager.getLobReferences();
- for(Streamable<?> lob: lobs) {
- lobManager.persist(lob.getReferenceStreamId(), batchManager.store);
- }
}
}
synchronized (batchManager.store) {
@@ -368,9 +362,6 @@
if (info != null) {
batchManager.unusedSpace.addAndGet(info[1]);
}
- if (lobManager != null) {
- lobManager.clear();
- }
activeBatch = null;
batchReference = null;
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-12-24
02:09:29 UTC (rev 2802)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-12-24
18:47:11 UTC (rev 2803)
@@ -294,14 +294,12 @@
rowcount = resultsBuffer.getRowCount();
if (this.cid == null || !this.doneProducingBatches) {
resultsBuffer.remove();
- }
-
- try {
- if (cid != null && this.resultsBuffer.isLobs()) {
+ } else {
+ try {
this.resultsBuffer.persistLobs();
+ } catch (TeiidComponentException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP,
QueryPlugin.Util.getString("failed_to_cache")); //$NON-NLS-1$
}
- } catch (TeiidComponentException e) {
- LogManager.logDetail(LogConstants.CTX_DQP,
QueryPlugin.Util.getString("failed_to_cache")); //$NON-NLS-1$
}
for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
Modified:
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java 2010-12-24
02:09:29 UTC (rev 2802)
+++
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java 2010-12-24
18:47:11 UTC (rev 2803)
@@ -39,6 +39,7 @@
import org.teiid.api.exception.query.QueryValidatorException;
import org.teiid.client.plan.Annotation;
import org.teiid.client.plan.Annotation.Priority;
+import org.teiid.common.buffer.LobManager;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.id.IDGenerator;
@@ -582,7 +583,7 @@
if (container instanceof StoredProcedure) {
boolean noCache = isNoCacheGroup(metadata, ((StoredProcedure)
container).getProcedureID(), option);
if (!noCache) {
- if (container.areResultsCachable() &&
Query.areResultsCachable(container.getProcedureParameters().keySet()) &&
context.isResultSetCacheEnabled()) {
+ if (context.isResultSetCacheEnabled() && container.areResultsCachable()
&& LobManager.getLobIndexes(new
ArrayList<ElementSymbol>(container.getProcedureParameters().keySet())) == null) {
container.getGroup().setGlobalTable(true);
container.setCacheHint(c.getCacheHint());
recordAnnotation(analysisRecord, Annotation.CACHED_PROCEDURE, Priority.LOW,
"SimpleQueryResolver.procedure_cache_used", container.getGroup());
//$NON-NLS-1$
Modified: trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLUtil.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLUtil.java 2010-12-24
02:09:29 UTC (rev 2802)
+++ trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLUtil.java 2010-12-24
18:47:11 UTC (rev 2803)
@@ -86,6 +86,11 @@
public void free() throws IOException {
lobBuffer.remove();
}
+
+ @Override
+ public boolean isPersistent() {
+ return true;
+ }
}
/**