Author: shawkins
Date: 2009-08-12 13:39:14 -0400 (Wed, 12 Aug 2009)
New Revision: 1235
Added:
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerLobChunkStream.java
Removed:
trunk/common-core/src/main/java/com/metamatrix/common/types/InvalidReferenceException.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/LobTupleBatch.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java
trunk/engine/src/main/java/com/metamatrix/common/lob/
Modified:
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMBlob.java
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMClob.java
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMSQLXML.java
trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java
trunk/common-core/src/main/java/com/metamatrix/common/types/ClobType.java
trunk/common-core/src/main/java/com/metamatrix/common/types/Streamable.java
trunk/common-core/src/main/java/com/metamatrix/common/types/XMLType.java
trunk/common-core/src/main/java/com/metamatrix/common/types/basic/ClobToStringTransform.java
trunk/common-core/src/main/java/com/metamatrix/common/types/basic/SQLXMLToStringTransform.java
trunk/common-core/src/main/resources/com/metamatrix/core/i18n.properties
trunk/common-core/src/test/java/com/metamatrix/common/types/TestBlobValue.java
trunk/common-core/src/test/java/com/metamatrix/common/types/TestClobValue.java
trunk/common-core/src/test/java/com/metamatrix/common/types/TestXMLValue.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferStats.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java
trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java
trunk/engine/src/main/java/com/metamatrix/query/processor/dynamic/XMLSource.java
trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLPlan.java
trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java
trunk/engine/src/main/java/com/metamatrix/query/processor/xquery/XQueryPlan.java
trunk/engine/src/main/java/com/metamatrix/query/util/CommandContext.java
trunk/engine/src/main/java/com/metamatrix/query/xquery/saxon/SaxonXQueryExpression.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java
trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
Log:
TEIID-767 TEIID-771 simplifying the lob handling of buffermanager and merging in the
memorystate logic. reviewed by RR.
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMBlob.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMBlob.java 2009-08-12 16:36:33
UTC (rev 1234)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMBlob.java 2009-08-12 17:39:14
UTC (rev 1235)
@@ -55,7 +55,7 @@
*/
public static Blob newInstance(StreamingLobChunckProducer.Factory lobChunckFactory,
BlobType blob) {
if (!Boolean.getBoolean(Streamable.FORCE_STREAMING)) {
- Blob sourceBlob = blob.getSourceBlob();
+ Blob sourceBlob = blob.getReference();
if (sourceBlob != null) {
return sourceBlob;
}
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMClob.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMClob.java 2009-08-12 16:36:33
UTC (rev 1234)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMClob.java 2009-08-12 17:39:14
UTC (rev 1235)
@@ -72,7 +72,7 @@
public static Clob newInstance(StreamingLobChunckProducer.Factory lobChunckFactory,
ClobType clob) throws SQLException {
if (!Boolean.getBoolean(Streamable.FORCE_STREAMING)) {
- Clob sourceClob = clob.getSourceClob();
+ Clob sourceClob = clob.getReference();
if (sourceClob != null) {
return sourceClob;
}
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMSQLXML.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMSQLXML.java 2009-08-12 16:36:33
UTC (rev 1234)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMSQLXML.java 2009-08-12 17:39:14
UTC (rev 1235)
@@ -59,7 +59,7 @@
public static SQLXML newInstance(StreamingLobChunckProducer.Factory lobChunckFactory,
XMLType srcXML) throws SQLException {
if (Boolean.getBoolean(Streamable.FORCE_STREAMING)) {
- SQLXML sourceSQLXML = srcXML.getSourceSQLXML();
+ SQLXML sourceSQLXML = srcXML.getReference();
if (sourceSQLXML != null) {
return sourceSQLXML;
}
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java 2009-08-12
16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -30,20 +30,15 @@
import java.sql.SQLException;
import javax.sql.rowset.serial.SerialBlob;
-import javax.sql.rowset.serial.SerialException;
-import com.metamatrix.core.CorePlugin;
import com.metamatrix.core.MetaMatrixRuntimeException;
-import com.metamatrix.core.util.ArgCheck;
/**
* Represent a value of type "blob", which can be streamable from client
*/
-public final class BlobType implements Streamable, Blob {
+public final class BlobType extends Streamable<Blob> implements Blob {
- private transient Blob srcBlob;
- private String streamId;
- private String persistentId;
+ private static final long serialVersionUID = 1294191629070433450L;
private long length = -1;
/**
@@ -54,8 +49,7 @@
}
public BlobType(Blob blob) {
- ArgCheck.isNotNull(blob);
- this.srcBlob = blob;
+ super(blob);
try {
this.length = blob.length();
} catch (SQLException e) {
@@ -63,52 +57,18 @@
}
}
- public Blob getSourceBlob() {
- return srcBlob;
- }
-
/**
- * @see com.metamatrix.common.types.Streamable#getReferenceStreamId()
- */
- public String getReferenceStreamId() {
- return this.streamId;
- }
-
- /**
- * @see
com.metamatrix.common.types.Streamable#setReferenceStreamId(java.lang.String)
- */
- public void setReferenceStreamId(String id) {
- this.streamId = id;
- }
-
- /**
- * @see com.metamatrix.common.types.Streamable#getPersistenceStreamId()
- */
- public String getPersistenceStreamId() {
- return persistentId;
- }
-
- /**
- * @see
com.metamatrix.common.types.Streamable#setPersistenceStreamId(java.lang.String)
- */
- public void setPersistenceStreamId(String id) {
- this.persistentId = id;
- }
-
- /**
* @see java.sql.Blob#getBinaryStream()
*/
public InputStream getBinaryStream() throws SQLException {
- checkReference();
- return this.srcBlob.getBinaryStream();
+ return this.reference.getBinaryStream();
}
/**
* @see java.sql.Blob#getBytes(long, int)
*/
public byte[] getBytes(long pos, int length) throws SQLException {
- checkReference();
- return this.srcBlob.getBytes(pos, length);
+ return this.reference.getBytes(pos, length);
}
/**
@@ -121,32 +81,28 @@
}
// if did not find before then do it again.
- checkReference();
- return this.srcBlob.length();
+ return this.reference.length();
}
/**
* @see java.sql.Blob#position(java.sql.Blob, long)
*/
public long position(Blob pattern, long start) throws SQLException {
- checkReference();
- return this.srcBlob.position(pattern, start);
+ return this.reference.position(pattern, start);
}
/**
* @see java.sql.Blob#position(byte[], long)
*/
public long position(byte[] pattern, long start) throws SQLException {
- checkReference();
- return this.srcBlob.position(pattern, start);
+ return this.reference.position(pattern, start);
}
/**
* @see java.sql.Blob#setBinaryStream(long)
*/
public OutputStream setBinaryStream(long pos) throws SQLException {
- checkReference();
- return this.srcBlob.setBinaryStream(pos);
+ return this.reference.setBinaryStream(pos);
}
/**
@@ -157,63 +113,23 @@
byte[] bytes,
int offset,
int len) throws SQLException {
- checkReference();
- return this.srcBlob.setBytes(pos, bytes, offset, len);
+ return this.reference.setBytes(pos, bytes, offset, len);
}
/**
* @see java.sql.Blob#setBytes(long, byte[])
*/
public int setBytes(long pos, byte[] bytes) throws SQLException {
- checkReference();
- return this.srcBlob.setBytes(pos, bytes);
+ return this.reference.setBytes(pos, bytes);
}
/**
* @see java.sql.Blob#truncate(long)
*/
public void truncate(long len) throws SQLException {
- checkReference();
- this.srcBlob.truncate(len);
+ this.reference.truncate(len);
}
- /**
- * @see java.lang.Object#equals(java.lang.Object)
- */
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof BlobType)) {
- return false;
- }
-
- BlobType other = (BlobType)o;
-
- if (this.srcBlob != null) {
- return this.srcBlob.equals(other.srcBlob);
- }
-
- return this.persistentId == other.persistentId
- && this.streamId == other.streamId;
-
- }
-
- /**
- * @see java.lang.Object#toString()
- */
- public String toString() {
- checkReference();
- return srcBlob.toString();
- }
-
- private void checkReference() {
- if (this.srcBlob == null) {
- throw new
InvalidReferenceException(CorePlugin.Util.getString("BlobValue.InvalidReference"));
//$NON-NLS-1$
- }
- }
-
/**
* Utility Method to convert blob into byte array
* @param blob
@@ -234,14 +150,12 @@
}
//## JDBC4.0-begin ##
public void free() throws SQLException {
- checkReference();
- this.srcBlob.free();
+ this.reference.free();
}
public InputStream getBinaryStream(long pos, long length)
throws SQLException {
- checkReference();
- return this.srcBlob.getBinaryStream(pos, length);
+ return this.reference.getBinaryStream(pos, length);
}
//## JDBC4.0-end ##
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/ClobType.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/ClobType.java 2009-08-12
16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/ClobType.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -31,41 +31,27 @@
import java.sql.Clob;
import java.sql.SQLException;
-import javax.sql.rowset.serial.SerialBlob;
import javax.sql.rowset.serial.SerialClob;
-import javax.sql.rowset.serial.SerialException;
-import com.metamatrix.core.CorePlugin;
import com.metamatrix.core.MetaMatrixRuntimeException;
/**
* This is wrapper on top of a "clob" object, which implements the
"java.sql.Clob"
* interface. This class also implements the Streamable interface
*/
-public final class ClobType implements Streamable, Clob, Sequencable {
+public final class ClobType extends Streamable<Clob> implements Clob, Sequencable
{
- private transient Clob srcClob;
- private String streamId;
- private String persistentId;
+ private static final long serialVersionUID = 2753412502127824104L;
private long length = -1;
/**
* Can't construct
*/
ClobType() {
- super();
}
- public Clob getSourceClob() {
- return this.srcClob;
- }
-
public ClobType(Clob clob) {
- if (clob == null) {
- throw new
IllegalArgumentException(CorePlugin.Util.getString("ClobValue.isNUll"));
//$NON-NLS-1$
- }
- // this will serve as the in VM reference
- this.srcClob = clob;
+ super(clob);
try {
this.length = clob.length();
@@ -73,57 +59,26 @@
// ignore.
}
}
-
+
/**
- * @see com.metamatrix.common.types.Streamable#getReferenceStreamId()
- */
- public String getReferenceStreamId() {
- return this.streamId;
- }
-
- /**
- * @see
com.metamatrix.common.types.Streamable#setReferenceStreamId(java.lang.String)
- */
- public void setReferenceStreamId(String id) {
- this.streamId = id;
- }
-
- /**
- * @see com.metamatrix.common.types.Streamable#getPersistenceStreamId()
- */
- public String getPersistenceStreamId() {
- return persistentId;
- }
-
- /**
- * @see
com.metamatrix.common.types.Streamable#setPersistenceStreamId(java.lang.String)
- */
- public void setPersistenceStreamId(String id) {
- this.persistentId = id;
- }
-
- /**
* @see java.sql.Clob#getAsciiStream()
*/
public InputStream getAsciiStream() throws SQLException {
- checkReference();
- return this.srcClob.getAsciiStream();
+ return this.reference.getAsciiStream();
}
/**
* @see java.sql.Clob#getCharacterStream()
*/
public Reader getCharacterStream() throws SQLException {
- checkReference();
- return this.srcClob.getCharacterStream();
+ return this.reference.getCharacterStream();
}
/**
* @see java.sql.Clob#getSubString(long, int)
*/
public String getSubString(long pos, int length) throws SQLException {
- checkReference();
- return this.srcClob.getSubString(pos, length);
+ return this.reference.getSubString(pos, length);
}
/**
@@ -134,40 +89,35 @@
return this.length;
}
- checkReference();
- return this.srcClob.length();
+ return this.reference.length();
}
/**
* @see java.sql.Clob#position(java.sql.Clob, long)
*/
public long position(Clob searchstr, long start) throws SQLException {
- checkReference();
- return this.srcClob.position(searchstr, start);
+ return this.reference.position(searchstr, start);
}
/**
* @see java.sql.Clob#position(java.lang.String, long)
*/
public long position(String searchstr, long start) throws SQLException {
- checkReference();
- return this.srcClob.position(searchstr, start);
+ return this.reference.position(searchstr, start);
}
/**
* @see java.sql.Clob#setAsciiStream(long)
*/
public OutputStream setAsciiStream(long pos) throws SQLException {
- checkReference();
- return this.srcClob.setAsciiStream(pos);
+ return this.reference.setAsciiStream(pos);
}
/**
* @see java.sql.Clob#setCharacterStream(long)
*/
public Writer setCharacterStream(long pos) throws SQLException {
- checkReference();
- return this.srcClob.setCharacterStream(pos);
+ return this.reference.setCharacterStream(pos);
}
/**
@@ -177,62 +127,23 @@
String str,
int offset,
int len) throws SQLException {
- checkReference();
- return this.srcClob.setString(pos, str, offset, len);
+ return this.reference.setString(pos, str, offset, len);
}
/**
* @see java.sql.Clob#setString(long, java.lang.String)
*/
public int setString(long pos, String str) throws SQLException {
- checkReference();
- return this.srcClob.setString(pos, str);
+ return this.reference.setString(pos, str);
}
/**
* @see java.sql.Clob#truncate(long)
*/
public void truncate(long len) throws SQLException {
- checkReference();
- this.srcClob.truncate(len);
+ this.reference.truncate(len);
}
- /**
- * @see java.lang.Object#equals(java.lang.Object)
- */
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof ClobType)) {
- return false;
- }
-
- ClobType other = (ClobType)o;
-
- if (this.srcClob != null) {
- return this.srcClob.equals(other.srcClob);
- }
-
- return this.persistentId == other.persistentId
- && this.streamId == other.streamId;
- }
-
- /**
- * @see java.lang.Object#toString()
- */
- public String toString() {
- checkReference();
- return srcClob.toString();
- }
-
- private void checkReference() {
- if (this.srcClob == null) {
- throw new
InvalidReferenceException(CorePlugin.Util.getString("ClobValue.InvalidReference"));
//$NON-NLS-1$
- }
- }
-
/**
* Utility method to convert to String
* @param clob
@@ -255,7 +166,6 @@
private final static int CHAR_SEQUENCE_BUFFER_SIZE = 2 << 12;
public CharSequence getCharSequence() {
- checkReference();
return new CharSequence() {
private String buffer;
@@ -299,13 +209,11 @@
}
//## JDBC4.0-begin ##
public void free() throws SQLException {
- checkReference();
- this.srcClob.free();
+ this.reference.free();
}
public Reader getCharacterStream(long pos, long length) throws SQLException {
- checkReference();
- return this.srcClob.getCharacterStream(pos, length);
+ return this.reference.getCharacterStream(pos, length);
}
//## JDBC4.0-end ##
Deleted:
trunk/common-core/src/main/java/com/metamatrix/common/types/InvalidReferenceException.java
===================================================================
---
trunk/common-core/src/main/java/com/metamatrix/common/types/InvalidReferenceException.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/common-core/src/main/java/com/metamatrix/common/types/InvalidReferenceException.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -1,34 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.types;
-
-
-/**
- * A exception class to define that a invalid reference has been accessed.
- */
-public class InvalidReferenceException extends RuntimeException {
- public InvalidReferenceException() {}
- public InvalidReferenceException(String msg){
- super(msg);
- }
-}
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/Streamable.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/Streamable.java 2009-08-12
16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/Streamable.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -23,8 +23,11 @@
package com.metamatrix.common.types;
import java.io.Serializable;
+import java.lang.ref.PhantomReference;
+import com.metamatrix.core.CorePlugin;
+
/**
* A large value object which can be streamable in chunks of data each time
*
@@ -38,31 +41,71 @@
* the process worker to in case the reference object has lost its state and we
* need to reinsate the object from the disk.
*/
-public interface Streamable extends Serializable {
- static final String FORCE_STREAMING = "FORCE_STREAMING"; //$NON-NLS-1$
+public abstract class Streamable<T> implements Serializable {
+ public static final String FORCE_STREAMING = "FORCE_STREAMING";
//$NON-NLS-1$
public static final int STREAMING_BATCH_SIZE_IN_BYTES = 102400; // 100K
+
+ private String referenceStreamId;
+ private String persistenceStreamId;
+ protected transient T reference;
- /**
- * Reference Stream ID in the server
- * @return string - this is buffer managers tuple source id.
- */
- String getReferenceStreamId();
+ public Streamable() {
+
+ }
- /**
- * Reference Stream ID in the server
- * @param id this is buffer managers tuple source id.
- */
- void setReferenceStreamId(String id);
+ public Streamable(T reference) {
+ if (reference == null) {
+ throw new
IllegalArgumentException(CorePlugin.Util.getString("Streamable.isNUll"));
//$NON-NLS-1$
+ }
+
+ this.reference = reference;
+ }
- /**
- * Persitence Stream ID in the server
- * @return string - this is buffer managers tuple source id.
- */
- String getPersistenceStreamId();
+ public T getReference() {
+ return reference;
+ }
- /**
- * Persitence Stream ID in the server
- * @param id this is buffer managers tuple source id.
- */
- void setPersistenceStreamId(String id);
+ public void setReference(T reference) {
+ this.reference = reference;
+ }
+
+ public String getReferenceStreamId() {
+ return this.referenceStreamId;
+ }
+
+ public void setReferenceStreamId(String id) {
+ this.referenceStreamId = id;
+ }
+
+ public String getPersistenceStreamId() {
+ return persistenceStreamId;
+ }
+
+ public void setPersistenceStreamId(String id) {
+ this.persistenceStreamId = id;
+ }
+
+ @Override
+ public String toString() {
+ return reference.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Streamable<?>)) {
+ return false;
+ }
+ Streamable<?> other = (Streamable<?>)obj;
+
+ if (this.reference != null) {
+ return this.reference.equals(other.reference);
+ }
+
+ return this.persistenceStreamId == other.persistenceStreamId
+ && this.referenceStreamId == other.referenceStreamId;
+ }
+
}
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/XMLType.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/XMLType.java 2009-08-12
16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/XMLType.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -38,125 +38,57 @@
import javax.xml.transform.Result;
import javax.xml.transform.Source;
-import com.metamatrix.core.CorePlugin;
-
/**
* This class represents the SQLXML object along with the Streamable interface. This is
* class used everywhere in the MetaMatrix framework, but clients are restricted to use
* only SQLXML interface on top of this.
*/
-public final class XMLType implements Streamable, SQLXML {
+public final class XMLType extends Streamable<SQLXML> implements SQLXML {
- private transient SQLXML srcXML;
- private String referenceStreamId;
- private String persistenceStreamId;
+ private static final long serialVersionUID = -7922647237095135723L;
public XMLType(){
}
- public SQLXML getSourceSQLXML() {
- return srcXML;
- }
-
public XMLType(SQLXML xml) {
- if (xml == null) {
- throw new
IllegalArgumentException(CorePlugin.Util.getString("XMLValue.isNUll"));
//$NON-NLS-1$
- }
-
- // this will serve as the in VM reference
- this.srcXML = xml;
+ super(xml);
}
-
- public String getReferenceStreamId() {
- return this.referenceStreamId;
- }
-
- public void setReferenceStreamId(String id) {
- this.referenceStreamId = id;
- }
-
- public String getPersistenceStreamId() {
- return persistenceStreamId;
- }
-
- public void setPersistenceStreamId(String id) {
- this.persistenceStreamId = id;
- }
-
+
public InputStream getBinaryStream() throws SQLException {
- checkReference();
- return this.srcXML.getBinaryStream();
+ return this.reference.getBinaryStream();
}
public Reader getCharacterStream() throws SQLException {
- checkReference();
- return this.srcXML.getCharacterStream();
+ return this.reference.getCharacterStream();
}
public <T extends Source> T getSource(Class<T> sourceClass) throws
SQLException {
- checkReference();
- return this.srcXML.getSource(sourceClass);
+ return this.reference.getSource(sourceClass);
}
public String getString() throws SQLException {
- checkReference();
- return this.srcXML.getString();
+ return this.reference.getString();
}
public OutputStream setBinaryStream() throws SQLException {
- checkReference();
- return this.srcXML.setBinaryStream();
+ return this.reference.setBinaryStream();
}
public Writer setCharacterStream() throws SQLException {
- checkReference();
- return this.srcXML.setCharacterStream();
+ return this.reference.setCharacterStream();
}
public void setString(String value) throws SQLException {
- checkReference();
- this.srcXML.setString(value);
+ this.reference.setString(value);
}
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof XMLType)) {
- return false;
- }
-
- XMLType other = (XMLType)o;
-
- if (this.srcXML != null) {
- return this.srcXML.equals(other.srcXML);
- }
-
- return this.persistenceStreamId == other.persistenceStreamId
- && this.referenceStreamId == other.referenceStreamId;
- }
-
- public String toString() {
- checkReference();
- return srcXML.toString();
- }
-
- private void checkReference() {
- if (this.srcXML == null) {
- throw new
InvalidReferenceException(CorePlugin.Util.getString("XMLValue.InvalidReference"));
//$NON-NLS-1$
- }
- }
-
public void free() throws SQLException {
- checkReference();
- this.srcXML.free();
+ this.reference.free();
}
public <T extends Result> T setResult(Class<T> resultClass)
throws SQLException {
- checkReference();
- return this.srcXML.setResult(resultClass);
+ return this.reference.setResult(resultClass);
}
}
Modified:
trunk/common-core/src/main/java/com/metamatrix/common/types/basic/ClobToStringTransform.java
===================================================================
---
trunk/common-core/src/main/java/com/metamatrix/common/types/basic/ClobToStringTransform.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/common-core/src/main/java/com/metamatrix/common/types/basic/ClobToStringTransform.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -28,7 +28,6 @@
import com.metamatrix.common.types.ClobType;
import com.metamatrix.common.types.DataTypeManager;
-import com.metamatrix.common.types.InvalidReferenceException;
import com.metamatrix.common.types.TransformationException;
import com.metamatrix.core.CorePlugin;
@@ -65,9 +64,7 @@
throw new TransformationException(e,
CorePlugin.Util.getString("failed_convert", new Object[]
{getSourceType().getName(), getTargetType().getName()})); //$NON-NLS-1$
} catch(IOException e) {
throw new TransformationException(e,
CorePlugin.Util.getString("failed_convert", new Object[]
{getSourceType().getName(), getTargetType().getName()})); //$NON-NLS-1$
- } catch(InvalidReferenceException e) {
- throw new TransformationException(e,
CorePlugin.Util.getString("remote_lob_access")); //$NON-NLS-1$
- }
+ }
}
/**
Modified:
trunk/common-core/src/main/java/com/metamatrix/common/types/basic/SQLXMLToStringTransform.java
===================================================================
---
trunk/common-core/src/main/java/com/metamatrix/common/types/basic/SQLXMLToStringTransform.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/common-core/src/main/java/com/metamatrix/common/types/basic/SQLXMLToStringTransform.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -26,7 +26,6 @@
import java.sql.SQLException;
import com.metamatrix.common.types.DataTypeManager;
-import com.metamatrix.common.types.InvalidReferenceException;
import com.metamatrix.common.types.TransformationException;
import com.metamatrix.common.types.XMLType;
import com.metamatrix.core.CorePlugin;
@@ -54,8 +53,6 @@
return new String(result, 0, read);
} catch (SQLException e) {
throw new TransformationException(e,
CorePlugin.Util.getString("failed_convert", new Object[]
{getSourceType().getName(), getTargetType().getName()})); //$NON-NLS-1$
- } catch(InvalidReferenceException e) {
- throw new TransformationException(e,
CorePlugin.Util.getString("remote_lob_access")); //$NON-NLS-1$
} catch (IOException e) {
throw new TransformationException(e,
CorePlugin.Util.getString("failed_convert", new Object[]
{getSourceType().getName(), getTargetType().getName()})); //$NON-NLS-1$
}
Modified: trunk/common-core/src/main/resources/com/metamatrix/core/i18n.properties
===================================================================
--- trunk/common-core/src/main/resources/com/metamatrix/core/i18n.properties 2009-08-12
16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/resources/com/metamatrix/core/i18n.properties 2009-08-12
17:39:14 UTC (rev 1235)
@@ -325,12 +325,8 @@
BlobImpl.Invalid_start_position=The position to begin searching, "{0}", is not
valid.
-BlobValue.isNUll=Blob object argument can not be null
-BlobValue.InvalidReference=Blob Contents are not available, use the Streaming interface
to get the contents.
-ClobValue.isNUll=Clob object argument can not be null
-ClobValue.InvalidReference=Clob Contents are not available, use the Streaming interface
to get the contents.
-XMLValue.InvalidReference=XML Contents are not available, use the Streaming interface to
get the contents.
-XMLValue.isNUll=SQLXML object argument can not be null
+Streamable.isNUll=Streamable object argument can not be null
+Streamable.InvalidReference=Streamable contents are not available, use the Streaming
interface to get the contents.
WorkerPool.New_thread=Created worker thread "{0}".
WorkerPool.uncaughtException=Uncaught exception processing work
Modified: trunk/common-core/src/test/java/com/metamatrix/common/types/TestBlobValue.java
===================================================================
---
trunk/common-core/src/test/java/com/metamatrix/common/types/TestBlobValue.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/common-core/src/test/java/com/metamatrix/common/types/TestBlobValue.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -68,12 +68,7 @@
assertEquals(key, read.getReferenceStreamId());
// and lost the original object
- try {
- read.getBinaryStream();
- fail("this must thrown a reference stream exception");
//$NON-NLS-1$
- } catch (InvalidReferenceException e) {
- // pass
- }
+ assertNull(read.getReference());
saved.delete();
}
Modified: trunk/common-core/src/test/java/com/metamatrix/common/types/TestClobValue.java
===================================================================
---
trunk/common-core/src/test/java/com/metamatrix/common/types/TestClobValue.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/common-core/src/test/java/com/metamatrix/common/types/TestClobValue.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -68,12 +68,7 @@
assertEquals(key, read.getReferenceStreamId());
// and lost the original object
- try {
- read.getCharacterStream();
- fail("this must thrown a reference stream exception");
//$NON-NLS-1$
- } catch (InvalidReferenceException e) {
- // pass
- }
+ assertNull(read.getReference());
saved.delete();
}
Modified: trunk/common-core/src/test/java/com/metamatrix/common/types/TestXMLValue.java
===================================================================
---
trunk/common-core/src/test/java/com/metamatrix/common/types/TestXMLValue.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/common-core/src/test/java/com/metamatrix/common/types/TestXMLValue.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -69,12 +69,7 @@
assertEquals(pkey, read.getPersistenceStreamId());
// and lost the original object
- try {
- read.getCharacterStream();
- fail("this must thrown a reference stream exception");
//$NON-NLS-1$
- } catch (InvalidReferenceException e) {
- // pass
- }
+ assertNull(read.getReference());
saved.delete();
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2009-08-12
16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -27,6 +27,7 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.lob.LobChunk;
+import com.metamatrix.common.types.Streamable;
/**
* The buffer manager controls how memory is used and how data flows through
@@ -268,4 +269,23 @@
* to ensure that the memory can be freed.
*/
void releasePinnedBatches() throws MetaMatrixComponentException;
+
+ /**
+ * Return the LOB associated with the referenceId
+ * @param id
+ * @param referenceId
+ * @return
+ * @throws TupleSourceNotFoundException
+ * @throws MetaMatrixComponentException
+ */
+ public Streamable<?> getStreamable(TupleSourceID id, String referenceId)
+ throws TupleSourceNotFoundException, MetaMatrixComponentException;
+
+ /**
+ * Assign the tuplesource as the persistent stream for the streamable
+ * @param id
+ * @param s
+ * @throws TupleSourceNotFoundException
+ */
+ public void setPersistentTupleSource(TupleSourceID id, Streamable<? extends
Object> s) throws TupleSourceNotFoundException;
}
Copied:
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerLobChunkStream.java
(from rev 1231,
trunk/engine/src/main/java/com/metamatrix/common/lob/BufferManagerLobChunkStream.java)
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerLobChunkStream.java
(rev 0)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerLobChunkStream.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.buffer;
+
+
+import java.io.IOException;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.lob.LobChunk;
+import com.metamatrix.common.lob.LobChunkProducer;
+import com.metamatrix.common.log.LogManager;
+import com.metamatrix.dqp.DQPPlugin;
+import com.metamatrix.dqp.util.LogConstants;
+
+public class BufferManagerLobChunkStream implements LobChunkProducer {
+ private TupleSourceID sourceId;
+ private BufferManager bufferMgr;
+ private int position = 0;
+
+ public BufferManagerLobChunkStream(String persitentId, BufferManager bufferMgr) {
+ this.sourceId = new TupleSourceID(persitentId);
+ this.bufferMgr = bufferMgr;
+ }
+
+ public LobChunk getNextChunk() throws IOException {
+ try {
+ this.position++;
+ return bufferMgr.getStreamablePart(sourceId, position);
+ } catch (TupleSourceNotFoundException e) {
+ String msg =
DQPPlugin.Util.getString("BufferManagerLobChunkStream.no_tuple_source", new
Object[] {sourceId}); //$NON-NLS-1$
+ LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, msg);
+ throw new IOException(msg);
+ } catch (MetaMatrixComponentException e) {
+ String msg =
DQPPlugin.Util.getString("BufferManagerLobChunkStream.error_processing", new
Object[] {sourceId}); //$NON-NLS-1$
+ LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, msg);
+ throw new IOException(msg);
+ }
+ }
+
+ /**
+ * @see com.metamatrix.common.lob.LobChunkProducer#close()
+ */
+ public void close() throws IOException {
+ // we could remove the buffer tuple here but, this is just a stream, so we need
to delete
+ // that when we close th eplan.
+ }
+}
\ No newline at end of file
Deleted: trunk/engine/src/main/java/com/metamatrix/common/buffer/LobTupleBatch.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/LobTupleBatch.java 2009-08-12
16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/LobTupleBatch.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer;
-
-import java.util.List;
-
-
-
-/**
- * A marker class file holding the lob based data in a separate batch holder.
- */
-public class LobTupleBatch extends TupleBatch {
-
- public LobTupleBatch() {
- }
-
- public LobTupleBatch(int beginRow, List listOfTupleLists) {
- super(beginRow, listOfTupleLists);
- }
-}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -25,7 +25,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -39,7 +41,6 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.IndexedTupleSource;
-import com.metamatrix.common.buffer.LobTupleBatch;
import com.metamatrix.common.buffer.MemoryNotAvailableException;
import com.metamatrix.common.buffer.StorageManager;
import com.metamatrix.common.buffer.TupleBatch;
@@ -50,23 +51,28 @@
import com.metamatrix.common.types.Streamable;
import com.metamatrix.core.log.MessageLevel;
import com.metamatrix.core.util.Assertion;
+import com.metamatrix.dqp.DQPPlugin;
import com.metamatrix.dqp.util.LogConstants;
import com.metamatrix.query.execution.QueryExecPlugin;
/**
* <p>Default implementation of BufferManager. This buffer manager implementation
* assumes the usage of a StorageManager of type memory and optionally (preferred)
- * an additional StorageManager of type FILE or DISK. If no persistent manager
- * is specified, everything managed by this BufferManager is assumed to fit in
- * memory. This can be useful for testing or for small uses.</p>
- *
- * <p>Lots of state is cached in memory. The tupleSourceMap contains a map of
- * TupleSourceID --> TupleSourceInfo. Everything about a particular tuple
- * source is stored there. The memoryState contains everything pertaining to
- * memory management. The config contains all config info.</p>
+ * an additional StorageManager of type FILE.</p>
*/
public class BufferManagerImpl implements BufferManager {
-
+
+ //memory availability when reserveMemory() is called
+ static final int MEMORY_AVAILABLE = 1;
+ static final int MEMORY_EXCEED_MAX = 2; //exceed buffer manager max memory
+ static final int MEMORY_EXCEED_SESSION_MAX = 3; //exceed session max memory
+
+ private static ThreadLocal<Set<ManagedBatch>> PINNED_BY_THREAD = new
ThreadLocal<Set<ManagedBatch>>() {
+ protected Set<ManagedBatch> initialValue() {
+ return new HashSet<ManagedBatch>();
+ };
+ };
+
// Initialized stuff
private String lookup;
private BufferConfig config;
@@ -76,15 +82,18 @@
// groupName (String) -> TupleGroupInfo map
private Map<String, TupleGroupInfo> groupInfos = new HashMap<String,
TupleGroupInfo>();
- // Storage managers
+ // Storage manager
private StorageManager diskMgr;
// ID creator
private AtomicLong currentTuple = new AtomicLong(0);
- // Memory management
- private MemoryState memoryState;
-
+ // Keep track of how memory usage
+ private volatile long memoryUsed = 0;
+
+ // Track the currently unpinned stuff in a sorted set
+ private Set<ManagedBatch> unpinned = Collections.synchronizedSet(new
LinkedHashSet<ManagedBatch>());
+
// Trigger to handle management and stats logging
private Timer timer;
@@ -94,6 +103,7 @@
private AtomicInteger pinnedFromDisk = new AtomicInteger(0);
private AtomicInteger cleanings = new AtomicInteger(0);
private AtomicLong totalCleaned = new AtomicLong(0);
+ private AtomicInteger pinned = new AtomicInteger();
/**
* See {@link com.metamatrix.common.buffer.BufferManagerPropertyNames} for a
@@ -108,15 +118,13 @@
// Set up config based on properties
this.config = new BufferConfig(properties);
- // Set up memory state object
- this.memoryState = new MemoryState(config);
-
// Set up alarms based on config
if(this.config.getManagementInterval() > 0) {
TimerTask mgmtTask = new TimerTask() {
public void run() {
- clean(0);
+ clean(0, null);
}
+
};
getTimer().schedule(mgmtTask, 0, this.config.getManagementInterval());
}
@@ -156,7 +164,8 @@
BufferStats stats = new BufferStats();
// Get memory info
- this.memoryState.fillStats(stats);
+ stats.memoryUsed = this.memoryUsed;
+ stats.memoryFree = config.getTotalAvailableMemory() - memoryUsed;
// Get picture of what's happening
Set<TupleSourceID> copyKeys = tupleSourceMap.keySet();
@@ -245,11 +254,8 @@
TupleSourceID newID = new
TupleSourceID(String.valueOf(this.currentTuple.getAndIncrement()), this.lookup);
TupleGroupInfo tupleGroupInfo = getGroupInfo(groupName);
TupleSourceInfo info = new TupleSourceInfo(newID, schema, types, tupleGroupInfo,
tupleSourceType);
- synchronized (tupleGroupInfo) {
- tupleGroupInfo.getTupleSourceIDs().add(newID);
- }
+ tupleGroupInfo.getTupleSourceIDs().add(newID);
tupleSourceMap.put(newID, info);
-
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, new Object[]{"Creating
TupleSource:", newID, "of type "+tupleSourceType}); //$NON-NLS-1$
//$NON-NLS-2$
}
@@ -289,31 +295,25 @@
ManagedBatch batch = iter.next();
switch(batch.getLocation()) {
case ManagedBatch.UNPINNED:
- memoryState.removeUnpinned(batch);
- memoryState.releaseMemory(batch.getSize(),
info.getGroupInfo());
+ this.unpinned.remove(batch);
+ releaseMemory(batch.getSize(), info.getGroupInfo());
break;
case ManagedBatch.PINNED:
- memoryState.removePinned(info.getTupleSourceID(),
batch.getBeginRow());
- memoryState.releaseMemory(batch.getSize(),
info.getGroupInfo());
+ PINNED_BY_THREAD.get().remove(batch);
+ this.pinned.getAndDecrement();
+ releaseMemory(batch.getSize(), info.getGroupInfo());
break;
}
}
}
}
TupleGroupInfo tupleGroupInfo = info.getGroupInfo();
- synchronized (tupleGroupInfo) {
- tupleGroupInfo.getTupleSourceIDs().remove(tupleSourceID);
- }
+ tupleGroupInfo.getTupleSourceIDs().remove(tupleSourceID);
// Remove disk storage
if (this.diskMgr != null){
this.diskMgr.removeBatches(tupleSourceID);
}
-
- // remove any dependent tuple sources on this tuple source
- // lob frame work uses the parent tuple's name as group name to
- // tie it back to the original source.
- removeTupleSources(tupleSourceID.getStringID());
}
/**
@@ -338,28 +338,25 @@
return;
}
List<TupleSourceID> tupleSourceIDs = null;
- synchronized (tupleGroupInfo) {
+ synchronized (tupleGroupInfo.getTupleSourceIDs()) {
tupleSourceIDs = new
ArrayList<TupleSourceID>(tupleGroupInfo.getTupleSourceIDs());
}
- // Remove them
- if(tupleSourceIDs.size() > 0) {
- MetaMatrixComponentException ex = null;
+ MetaMatrixComponentException ex = null;
- for (TupleSourceID tsID : tupleSourceIDs) {
- try {
- this.removeTupleSource(tsID);
- } catch(TupleSourceNotFoundException e) {
- // ignore and go on
- } catch(MetaMatrixComponentException e) {
- if(ex == null) {
- ex = e;
- }
- }
- }
+ for (TupleSourceID tsID : tupleSourceIDs) {
+ try {
+ this.removeTupleSource(tsID);
+ } catch(TupleSourceNotFoundException e) {
+ // ignore and go on
+ } catch(MetaMatrixComponentException e) {
+ if(ex == null) {
+ ex = e;
+ }
+ }
+ }
- if(ex != null) {
- throw ex;
- }
+ if(ex != null) {
+ throw ex;
}
}
@@ -466,7 +463,7 @@
// if there are lobs in source then we need to keep manage then
// in a separate tuple sources.
if (info.lobsInSource()) {
- createTupleSourcesForLobs(tupleSourceID, tupleBatch);
+ correctLobReferences(info, tupleBatch);
}
// Determine where to store
@@ -474,13 +471,13 @@
tupleBatch.setSize(bytes);
short location = ManagedBatch.PERSISTENT;
- if(memoryState.reserveMemory(bytes, info.getGroupInfo()) ==
MemoryState.MEMORY_AVAILABLE) {
+ if(reserveMemory(bytes, info.getGroupInfo()) ==
BufferManagerImpl.MEMORY_AVAILABLE) {
location = ManagedBatch.UNPINNED;
}
synchronized(info) {
if(info.isRemoved()) {
- memoryState.releaseMemory(bytes, info.getGroupInfo());
+ releaseMemory(bytes, info.getGroupInfo());
throw new
TupleSourceNotFoundException(QueryExecPlugin.Util.getString("BufferManagerImpl.tuple_source_not_found",
tupleSourceID)); //$NON-NLS-1$
}
@@ -498,14 +495,14 @@
} catch(MetaMatrixComponentException e) {
// If we were storing to memory, clean up memory we reserved
if(location != ManagedBatch.PERSISTENT) {
- memoryState.releaseMemory(bytes, info.getGroupInfo());
+ releaseMemory(bytes, info.getGroupInfo());
}
throw e;
}
// Add to memory state if in memory
if(location == ManagedBatch.UNPINNED) {
- this.memoryState.addUnpinned(managedBatch);
+ this.unpinned.add(managedBatch);
}
// Update info with new rows
@@ -533,18 +530,39 @@
this.pinRequests.incrementAndGet();
- TupleBatch memoryBatch = null;
+ TupleSourceInfo info = getTupleSourceInfo(tupleSourceID, true);
+ int memoryAvailability = BufferManagerImpl.MEMORY_AVAILABLE;
+
+ ManagedBatch mbatch = null;
+ synchronized (info) {
+ mbatch = info.getBatch(beginRow);
+ }
+ if(mbatch == null) {
+ return new TupleBatch(beginRow, Collections.EMPTY_LIST);
+ }
+
int endRow = 0;
- int count = 0;
int pass = 0;
+ //if the client request previous batch, the end row
+ //is smaller than the begin row
+ if(beginRow > maxEndRow) {
+ endRow = Math.min(beginRow, mbatch.getEndRow());
+ beginRow = Math.max(maxEndRow, mbatch.getBeginRow());
+ }else {
+ endRow = Math.min(maxEndRow, mbatch.getEndRow());
+ }
+ int count = endRow - beginRow + 1;
+ if (count == 0) {
+ return new TupleBatch(beginRow, Collections.EMPTY_LIST);
+ }
+ long memoryRequiredByBatch = mbatch.getSize();
- TupleSourceInfo info = getTupleSourceInfo(tupleSourceID, true);
- long memoryRequiredByBatch = 0;
- int memoryAvailability = MemoryState.MEMORY_AVAILABLE;
+ TupleBatch memoryBatch = null;
+
while(pass < 2) {
if(pass == 1) {
- if(memoryAvailability == MemoryState.MEMORY_EXCEED_MAX) {
- clean(memoryRequiredByBatch);
+ if(memoryAvailability == BufferManagerImpl.MEMORY_EXCEED_MAX) {
+ clean(memoryRequiredByBatch, null);
}else {
//exceed session limit
clean(memoryRequiredByBatch, info.getGroupInfo());
@@ -552,28 +570,23 @@
}
synchronized(info) {
- ManagedBatch mbatch = info.getBatch(beginRow);
- if(mbatch == null) {
- return new TupleBatch(beginRow, Collections.EMPTY_LIST);
-
- } else if(mbatch.getLocation() == ManagedBatch.PINNED) {
+ if(mbatch.getLocation() == ManagedBatch.PINNED) {
// Load batch from memory - already pinned
memoryBatch = mbatch.getBatch();
} else if(mbatch.getLocation() == ManagedBatch.UNPINNED) {
// Already in memory - just move from unpinned to pinned
- mbatch.setLocation(ManagedBatch.PINNED);
- this.memoryState.removeUnpinned(mbatch);
- this.memoryState.addPinned(mbatch);
+ this.unpinned.remove(mbatch);
+ pin(mbatch);
// Load batch from memory
memoryBatch = mbatch.getBatch();
- } else if(mbatch.getLocation() == ManagedBatch.PERSISTENT) {
+ } else {
memoryRequiredByBatch = mbatch.getSize();
// Try to reserve some memory
- if((memoryAvailability =
memoryState.reserveMemory(memoryRequiredByBatch, info.getGroupInfo())) !=
MemoryState.MEMORY_AVAILABLE) {
+ if((memoryAvailability = reserveMemory(memoryRequiredByBatch,
info.getGroupInfo())) != BufferManagerImpl.MEMORY_AVAILABLE) {
if(pass == 0) {
// Break and try to clean - it is important to break out of
the synchronized block
// here so that the clean does not cause a deadlock on this
TupleSourceInfo
@@ -595,22 +608,11 @@
memoryBatch = diskMgr.getBatch(tupleSourceID, internalBeginRow,
info.getTypes());
mbatch.setBatch(memoryBatch);
- mbatch.setLocation(ManagedBatch.PINNED);
- this.memoryState.addPinned(mbatch);
+ if (info.lobsInSource()) {
+ correctLobReferences(info, memoryBatch);
+ }
+ pin(mbatch);
}
-
- //if the client request previous batch, the end row
- //is smaller than the begin row
- if(beginRow > maxEndRow) {
- endRow = Math.min(beginRow, memoryBatch.getEndRow());
- beginRow = Math.max(maxEndRow, memoryBatch.getBeginRow());
- }else {
- endRow = Math.min(maxEndRow, memoryBatch.getEndRow());
- }
- count = endRow - beginRow + 1;
- if(count > 0) {
- mbatch.pin();
- }
}
break;
@@ -618,7 +620,7 @@
// Batch should now be pinned in memory, so grab it and build a correctly
// sized batch to return
- if(memoryBatch.getRowCount() == 0 || count == 0 || (beginRow ==
memoryBatch.getBeginRow() && count == memoryBatch.getRowCount())) {
+ if(beginRow == memoryBatch.getBeginRow() && count ==
memoryBatch.getRowCount()) {
return memoryBatch;
}
@@ -628,6 +630,15 @@
System.arraycopy(memoryRows, firstOffset, rows, 0, count);
return new TupleBatch(beginRow, rows);
}
+
+ private void pin(ManagedBatch mbatch) {
+ mbatch.setLocation(ManagedBatch.PINNED);
+ PINNED_BY_THREAD.get().add(mbatch);
+ if (!mbatch.hasPinnedRows()) {
+ pinned.getAndIncrement();
+ }
+ mbatch.pin();
+ }
/**
* Unpin a tuple source batch.
@@ -655,8 +666,9 @@
// Determine whether batch itself should be unpinned
if(! mbatch.hasPinnedRows()) {
mbatch.setLocation(ManagedBatch.UNPINNED);
- memoryState.removePinned(tupleSourceID, mbatch.getBeginRow());
- memoryState.addUnpinned(mbatch);
+ PINNED_BY_THREAD.get().remove(mbatch);
+ this.unpinned.add(mbatch);
+ pinned.getAndDecrement();
}
}
}
@@ -698,83 +710,71 @@
}
return info;
}
-
+
/**
- * Clean the memory state, using LRU. This can be done either via the background
- * cleaning thread or actively if someone wants memory and none is free.
+ * This can be done actively if someone wants memory and none is free.
*/
- protected void clean(long memoryRequired) {
- // Defect 14573 - this method needs to know how much memory is required, so that
(even if we're not past the active memory
- // threshold) if the memory available is less than the memory required, we should
clean up unpinned batches.
+ protected void clean(long memoryRequired, TupleGroupInfo targetGroupInfo) {
+ cleanLobTupleSource();
+
+ long released = 0;
+
long targetLevel = config.getActiveMemoryLevel();
long totalMemory = config.getTotalAvailableMemory();
- long released = 0;
-
- Iterator<ManagedBatch> unpinnedIter = this.memoryState.getAllUnpinned();
- while(unpinnedIter.hasNext() && // If there are unpinned batches in
memory, AND
- // Defect 14573 - if we require more than what's available, then
cleanup regardless of the threshold
- (memoryRequired > totalMemory - memoryState.getMemoryUsed() || // if the
memory needed is more than what's available, or
- memoryState.getMemoryUsed() > targetLevel)){ // if we've crossed
the active memory threshold, then cleanup
-
- ManagedBatch batch = unpinnedIter.next();
+
+ boolean generalCleaningDone = false;
+ List<ManagedBatch> toClean = null;
+ synchronized (unpinned) {
+ //TODO: re-implement without having to scan and compete
+ toClean = new ArrayList<ManagedBatch>(unpinned);
+ }
+ for (ManagedBatch batch : toClean) {
TupleSourceID tsID = batch.getTupleSourceID();
-
- released += releaseMemory(batch, tsID);
- }
-
- if(released > 0) {
- this.cleanings.incrementAndGet();
- this.totalCleaned.addAndGet(released);
- }
-
- }
-
- /**
- * Over memory limit for this session. Clean the memory for this session.
- * Clean the memory state, using LRU. This can be done actively if someone wants
memory and none is free.
- */
- protected void clean(long memoryRequired, TupleGroupInfo targetGroupInfo) throws
TupleSourceNotFoundException{
- boolean cleanForSessionSucceeded = false;
- long released = 0;
-
- Iterator<ManagedBatch> unpinnedIter = this.memoryState.getAllUnpinned();
- while(unpinnedIter.hasNext()) {
- ManagedBatch batch = unpinnedIter.next();
- TupleSourceID tsID = batch.getTupleSourceID();
- TupleSourceInfo tsInfo = getTupleSourceInfo(tsID, false);
+ TupleSourceInfo tsInfo = this.tupleSourceMap.get(tsID);
if(tsInfo == null) {
//may be removed by another thread
continue;
}
- if(!tsInfo.getGroupInfo().equals(targetGroupInfo)) {
- //continue if they are not the same tuple group
- continue;
+
+ long currentMemoryUsed = memoryUsed;
+ if (!generalCleaningDone && (memoryRequired <= totalMemory -
currentMemoryUsed && // if the memory needed is more than what's available,
or
+ currentMemoryUsed <= targetLevel)) { // if we've crossed the
active memory threshold, then cleanup
+ generalCleaningDone = true;
}
+ if (generalCleaningDone) {
+ if (targetGroupInfo == null) {
+ break;
+ }
+ if (targetGroupInfo == tsInfo.getGroupInfo()) {
+ //if the memory needed is more than what is available for the session,
then cleanup. Otherwise, break the loop.
+ if(memoryRequired <= config.getMaxAvailableSession() -
targetGroupInfo.getGroupMemoryUsed()) {
+ break;
+ }
+ }
+ }
- long groupMemoryUsed = memoryState.getGroupMemoryUsed(targetGroupInfo);
- //if the memory needed is more than what is available for the session, then
cleanup. Otherwise, break the loop.
- if(memoryRequired <= config.getMaxAvailableSession() - groupMemoryUsed) {
- cleanForSessionSucceeded = true;
- break;
- }
-
- released += releaseMemory(batch, tsID);
+ released += releaseMemory(batch, tsInfo);
}
if(released > 0) {
this.cleanings.incrementAndGet();
this.totalCleaned.addAndGet(released);
}
-
- if(!cleanForSessionSucceeded) {
- //if we cannot clean enough memory for this session, it fails
- return;
+ }
+
+ //TODO: run asynch
+ private void cleanLobTupleSource() {
+ String tupleSourceId = TupleSourceInfo.getStaleLobTupleSource();
+ if (tupleSourceId != null) {
+ try {
+ removeTupleSource(new TupleSourceID(tupleSourceId));
+ } catch (TupleSourceNotFoundException e) {
+ } catch (MetaMatrixComponentException e) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Exception removing stale
lob tuple source"); //$NON-NLS-1$
+ }
}
+ }
- //make sure it is not over the buffer manager memory limit
- clean(memoryRequired);
- }
-
/**
* Release the memory for the given unpinned batch.
* @param batch Batch to be released from memory
@@ -782,48 +782,32 @@
* @return The size of memory released in bytes
* @since 4.3
*/
- private long releaseMemory(ManagedBatch batch, TupleSourceID tsID) {
+ private long releaseMemory(ManagedBatch batch, TupleSourceInfo info) {
// Find info and lock on it
- try {
- TupleSourceInfo info = getTupleSourceInfo(tsID, false);
- if(info == null) {
+ synchronized(info) {
+ if(info.isRemoved() || batch.getLocation() != ManagedBatch.UNPINNED) {
return 0;
}
- synchronized(info) {
- if(info.isRemoved()) {
- return 0;
- }
+ // This batch is still unpinned - move to persistent storage
+ TupleBatch dataBatch = batch.getBatch();
- // Re-get the batch and check that it still exists and is unpinned
- batch = info.getBatch(batch.getBeginRow());
- if(batch == null || batch.getLocation() != ManagedBatch.UNPINNED) {
- return 0;
- }
+ try {
+ diskMgr.addBatch(info.getTupleSourceID(), dataBatch, info.getTypes());
+ } catch(MetaMatrixComponentException e) {
+ // Can't move
+ return 0;
+ }
- // This batch is still unpinned - move to persistent storage
- TupleBatch dataBatch = batch.getBatch();
+ batch.setBatch(null);
- try {
- diskMgr.addBatch(tsID, dataBatch, info.getTypes());
- } catch(MetaMatrixComponentException e) {
- // Can't move
- return 0;
- }
+ // Update memory
+ batch.setLocation(ManagedBatch.PERSISTENT);
+ this.unpinned.remove(batch);
+ releaseMemory(batch.getSize(), info.getGroupInfo());
- batch.setBatch(null);
-
- // Update memory
- batch.setLocation(ManagedBatch.PERSISTENT);
- memoryState.removeUnpinned(batch);
- memoryState.releaseMemory(batch.getSize(), info.getGroupInfo());
-
- return batch.getSize();
- }
- } catch(TupleSourceNotFoundException e) {
- // ignore, go to next batch
- return 0;
- }
+ return batch.getSize();
+ }
}
/**
@@ -855,114 +839,49 @@
}
/**
- * If a tuple batch is being added with Lobs, then maintain the LOB
- * objects in a separate TupleSource than from the original, so that
- * the original can only serilize the id, but the otherone can serialize
- * the contents.
+ * If a tuple batch is being added with Lobs, then references to
+ * the lobs will be held on the {@link TupleSourceInfo}
* @param batch
*/
- private void createTupleSourcesForLobs(TupleSourceID parentId, TupleBatch batch)
- throws MetaMatrixComponentException, TupleSourceNotFoundException {
-
- TupleSourceInfo info = getTupleSourceInfo(parentId, false);
+ @SuppressWarnings("unchecked")
+ private void correctLobReferences(TupleSourceInfo info, TupleBatch batch) {
List parentSchema = info.getTupleSchema();
List[] rows = batch.getAllTuples();
-
+ int columns = parentSchema.size();
// walk through the results and find all the lobs
for (int row = 0; row < rows.length; row++) {
-
- int col = 0;
- for (Iterator i = rows[row].iterator(); i.hasNext();) {
- Object anObj = i.next();
+ for (int col = 0; col < columns; col++) {
+ Object anObj = rows[row].get(col);
- if (anObj instanceof Streamable) {
- // once lob is found check to see if this has already been assigned
- // a streming id or not; if one is not assigned create one and assign
it
- // to the lob; if one is already assigned just return;
- // this will prohibit calling lob on itself into this routine.
- Streamable lob = (Streamable)anObj;
-
- if (lob.getReferenceStreamId() == null ||
lobIsNotKnownInTupleSourceMap( lob, parentId) ) {
- List schema = new ArrayList();
- schema.add(parentSchema.get(col));
-
- TupleSourceID id = createTupleSource(schema, new String[]
{info.getTypes()[col]}, parentId.getStringID(), TupleSourceType.PROCESSOR);
- lob.setReferenceStreamId(id.getStringID());
-
- List results = new ArrayList();
- results.add(lob);
-
- List listOfRows = new ArrayList();
- listOfRows.add(results);
-
- // these batches are wrapped in a special marker batch tag
- // which are saved from forcing them to disk.
- LobTupleBatch separateBatch = new LobTupleBatch(1, listOfRows);
- separateBatch.setTerminationFlag(true);
-
- // now save this as separate tuple source.
- addTupleBatch(id, separateBatch);
- } else {
- // this means the XML object being moved from one tuple to
another tuple
- // i.e. one plan to another plan. So update the group info.
-
- // First update the reference tuple source.
- if (!lob.getReferenceStreamId().equals(parentId.getStringID()))
{
- TupleGroupInfo groupInfo =
getGroupInfo(parentId.getStringID());
- TupleSourceID id = new
TupleSourceID(lob.getReferenceStreamId());
- TupleSourceInfo lobInfo = getTupleSourceInfo(id, false);
- reassignGroup(groupInfo, lobInfo);
-
- // if the lob moving parent has a assosiated persistent
- // tuple source, then move that one to same parent too.
- if (lob.getPersistenceStreamId() != null) {
- id = new TupleSourceID(lob.getPersistenceStreamId());
- lobInfo = getTupleSourceInfo(id, false);
- reassignGroup(groupInfo, lobInfo);
- }
- }
- }
+ if (!(anObj instanceof Streamable<?>)) {
+ continue;
}
- col++;
+ Streamable lob = (Streamable)anObj;
+ info.addLobReference(lob);
+ if (lob.getReference() == null) {
+
lob.setReference(info.getLobReference(lob.getReferenceStreamId()).getReference());
+ }
}
}
}
-
- private void reassignGroup(TupleGroupInfo groupInfo, TupleSourceInfo lobInfo) {
- TupleGroupInfo tupleGroupInfo = lobInfo.getGroupInfo();
- synchronized (tupleGroupInfo) {
- tupleGroupInfo.getTupleSourceIDs().remove(lobInfo.getTupleSourceID());
- }
- lobInfo.setGroupInfo(groupInfo);
- synchronized (groupInfo) {
- groupInfo.getTupleSourceIDs().add(lobInfo.getTupleSourceID());
- }
- }
- private boolean lobIsNotKnownInTupleSourceMap( Streamable lob, TupleSourceID
parentId) throws TupleSourceNotFoundException {
- /*
- * The need for this defensive feature arises because there are multiple uses of
the TupleSourceMap which
- * are somewhat inconsistent with one another. In the case of LOBs we use the
parent/child group feature
- * of tuplesources to associate a parent tuplesource containing metadata about
the LOB with a second
- * tuplesource that contains the LOB. When such a group is no longer needed
(for example, see SubqueryProcessorUtility.close()),
- * removing the child tupleSources has the unfortunate side effect of leaving the
actual LOBs with references to
- * tuplesources that no longer exist, and are therefore no longer in the
tupleSourceMap.
- *
- * This test ensures that such orphaned LOBs will be treated correctly
(TEIID-54).
- *
- */
- if (!lob.getReferenceStreamId().equals(parentId.getStringID())) {
- TupleSourceID id = new TupleSourceID(lob.getReferenceStreamId());
- TupleSourceInfo lobInfo = getTupleSourceInfo(id, false);
-
- if ( lobInfo == null ) {
- return true; // is not known
- }
- return false; // is known
- }
- return false; // don't care if known
+ @Override
+ public Streamable<?> getStreamable(TupleSourceID id, String referenceId) throws
TupleSourceNotFoundException, MetaMatrixComponentException {
+ TupleSourceInfo tsInfo = getTupleSourceInfo(id, true);
+ Streamable<?> s = tsInfo.getLobReference(referenceId);
+ if (s == null) {
+ throw new
MetaMatrixComponentException(DQPPlugin.Util.getString("ProcessWorker.wrongdata"));
//$NON-NLS-1$
+ }
+ return s;
}
+ @Override
+ public void setPersistentTupleSource(TupleSourceID id, Streamable<?> s) throws
TupleSourceNotFoundException {
+ cleanLobTupleSource();
+ TupleSourceInfo tsInfo = getTupleSourceInfo(id, true);
+ s.setPersistenceStreamId(id.getStringID());
+ tsInfo.setContainingLobReference(s);
+ }
/**
* @see
com.metamatrix.common.buffer.BufferManager#addStreamablePart(com.metamatrix.common.buffer.TupleSourceID,
com.metamatrix.common.lob.LobChunk, int)
@@ -976,7 +895,7 @@
synchronized(info) {
- List data = new ArrayList();
+ List<LobChunk> data = new ArrayList<LobChunk>();
data.add(streamChunk);
TupleBatch batch = new TupleBatch(beginRow, new List[] {data});
this.diskMgr.addBatch(tupleSourceID, batch, info.getTypes());
@@ -1008,44 +927,62 @@
* @see com.metamatrix.common.buffer.BufferManager#releasePinnedBatches()
*/
public void releasePinnedBatches() throws MetaMatrixComponentException {
- Map<TupleSourceID, Map<Integer, ManagedBatch>> threadPinned =
memoryState.getPinnedByCurrentThread();
- if (threadPinned == null) {
- return;
- }
- for (Iterator<Map.Entry<TupleSourceID, Map<Integer,
ManagedBatch>>> i = threadPinned.entrySet().iterator(); i.hasNext();) {
- Map.Entry<TupleSourceID, Map<Integer, ManagedBatch>> entry =
i.next();
- i.remove();
- TupleSourceID tsid = entry.getKey();
- Map<Integer, ManagedBatch> pinnedBatches = entry.getValue();
- try {
- for (Iterator<ManagedBatch> j = pinnedBatches.values().iterator();
j.hasNext();) {
- ManagedBatch batch = j.next();
-
- //TODO: add trace logging about the batch that is being unpinned
- unpinTupleBatch(tsid, batch.getBeginRow(), batch.getEndRow());
- }
- } catch (TupleSourceNotFoundException err) {
- continue;
- }
- }
+ MetaMatrixComponentException e = null;
+ List<ManagedBatch> pinnedByThread = new
ArrayList<ManagedBatch>(PINNED_BY_THREAD.get());
+ for (ManagedBatch managedBatch : pinnedByThread) {
+ try {
+ //TODO: add trace logging about the batch that is being unpinned
+ unpinTupleBatch(managedBatch.getTupleSourceID(), managedBatch.getBeginRow(),
managedBatch.getEndRow());
+ } catch (TupleSourceNotFoundException err) {
+ } catch (MetaMatrixComponentException err) {
+ e = err;
+ }
+ }
+ if (e != null) {
+ throw e;
+ }
}
/**
- * for testing purposes
+ * Check for whether the specified amount of memory can be reserved,
+ * and if so reserve it. This is done in the same method so that the
+ * memory is not taken away by a different thread between checking and
+ * reserving - standard "test and set" behavior.
+ * @param bytes Bytes requested
+ * @return One of MEMORY_AVAILABLE, MEMORY_EXCEED_MAX, or MEMORY_EXCEED_SESSION_MAX
*/
- public int getPinnedCount() {
- Map<TupleSourceID, Map<Integer, ManagedBatch>> pinned =
memoryState.getAllPinned();
-
- int count = 0;
-
- if (pinned == null) {
- return count;
+ private synchronized int reserveMemory(long bytes, TupleGroupInfo groupInfo) {
+ //check session limit first
+ long sessionMax = config.getMaxAvailableSession();
+ if(sessionMax - groupInfo.getGroupMemoryUsed() < bytes) {
+ return BufferManagerImpl.MEMORY_EXCEED_SESSION_MAX;
}
- for (Iterator<Map<Integer, ManagedBatch>> i =
pinned.values().iterator(); i.hasNext();) {
- count += i.next().size();
+ //then check the total memory limit
+ long max = config.getTotalAvailableMemory();
+ if(max - memoryUsed < bytes) {
+ return BufferManagerImpl.MEMORY_EXCEED_MAX;
}
- return count;
+ groupInfo.reserveMemory(bytes);
+ memoryUsed += bytes;
+
+ return BufferManagerImpl.MEMORY_AVAILABLE;
}
+
+ /**
+ * Release memory
+ * @param bytes Bytes to release
+ */
+ private synchronized void releaseMemory(long bytes, TupleGroupInfo groupInfo) {
+ groupInfo.releaseMemory(bytes);
+ memoryUsed -= bytes;
+ }
+
+ /**
+ * for testing purposes
+ */
+ public int getPinnedCount() {
+ return pinned.get();
+ }
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferStats.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferStats.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferStats.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -54,7 +54,7 @@
public long totalCleaned;
// Pinned batch details
- public List pinnedManagedBatches = new LinkedList();
+ public List<ManagedBatch> pinnedManagedBatches = new
LinkedList<ManagedBatch>();
/**
* Constructor for BufferStats.
@@ -90,7 +90,7 @@
LogManager.logInfo(LogConstants.CTX_BUFFER_MGR, " avgCleaned = " +
avgCleaned); //$NON-NLS-1$
if ( LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.TRACE) ) {
- HashMap stackTraces = new HashMap();
+ HashMap<List<String>, Integer> stackTraces = new
HashMap<List<String>, Integer>();
if ( pinnedManagedBatches.isEmpty() ) {
return;
@@ -102,12 +102,10 @@
int stackNumber = 1;
// pinned batch details
- Iterator it = pinnedManagedBatches.iterator();
- while ( it.hasNext() ) {
- ManagedBatch batch = (ManagedBatch)it.next();
+ for (ManagedBatch batch : pinnedManagedBatches) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, " TupleSourceID:
" + batch.getTupleSourceID() + " Begin: " + batch.getBeginRow() + "
End: " + batch.getEndRow()); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
- Integer stackKey = (Integer)stackTraces.get(batch.getCallStack());
+ Integer stackKey = stackTraces.get(batch.getCallStack());
boolean isFirst = false;
@@ -119,7 +117,7 @@
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, " Pinned at:
" + batch.getCallStackTimeStamp() + " by call# " + stackKey); //$NON-NLS-1$
//$NON-NLS-2$
if (isFirst) {
- for (Iterator j = batch.getCallStack().iterator(); j.hasNext();) {
+ for (Iterator<String> j = batch.getCallStack().iterator();
j.hasNext();) {
LogManager.logTrace( LogConstants.CTX_BUFFER_MGR, "
" + j.next() ); //$NON-NLS-1$
}
}
Deleted: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -1,260 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer.impl;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.metamatrix.common.buffer.TupleSourceID;
-import com.metamatrix.common.log.LogManager;
-import com.metamatrix.core.log.MessageLevel;
-import com.metamatrix.dqp.util.LogConstants;
-
-/**
- * <p>This class represents the memory state of the BufferManagerImpl. The
- * critical thing to know is what batches are in memory but not being
- * used and what batches in memory but being used. The access patterns
- * for these two types of information are very different, so they are stored
- * in very different data structures.</p>
- *
- * <p>The unpinned batches are stored in a linked list, ordered by a least
- * recently used timestamp. This is optimized for cleanup. It's not very good
- * for finding stuff but fortunately, we don't need to most of the time.</p>
- *
- * <p>The pinned batches are stored in a Map, keyed by TupleSourceID. The value
- * is another map, keyed by beginRow, with a ManagedBatch as the value. This
- * is really good for finding batches quickly, which is exactly what we need
- * to do with pinned batches.</p>
- *
- * <p>All methods on this class are synchronized to preserve state.</p>
- */
-class MemoryState {
-
- private static ThreadLocal<Map<TupleSourceID, Map<Integer,
ManagedBatch>>> PINNED_BY_THREAD = new ThreadLocal<Map<TupleSourceID,
Map<Integer, ManagedBatch>>>();
-
- //memory availability when reserveMemory() is called
- static final int MEMORY_AVAILABLE = 1;
- static final int MEMORY_EXCEED_MAX = 2; //exceed buffer manager max memory
- static final int MEMORY_EXCEED_SESSION_MAX = 3; //exceed session max memory
-
- // Configuration, used to get available memory info
- private BufferConfig config;
-
- // Keep track of how memory we are using
- private volatile long memoryUsed = 0;
-
- // Track the currently pinned stuff by TupleSourceID for easy lookup
- private Map<TupleSourceID, Map<Integer, ManagedBatch>> pinned = new
HashMap<TupleSourceID, Map<Integer, ManagedBatch>>();
-
- // Track the currently unpinned stuff in a sorted list, sorted by access time
- private Set<ManagedBatch> unpinned = Collections.synchronizedSet(new
LinkedHashSet<ManagedBatch>());
-
- /**
- * Constructor for MemoryState, based on config.
- * @param config Configuration
- */
- public MemoryState(BufferConfig config) {
- this.config = config;
- }
-
- /**
- * Fill the stats object with stats about memory
- * @param stats Stats info to be filled
- */
- public void fillStats(BufferStats stats) {
- stats.memoryUsed = this.memoryUsed;
- stats.memoryFree = config.getTotalAvailableMemory() - memoryUsed;
- }
-
- /**
- * Get the amount of memory currently being used in bytes
- * @return Used memory, in bytes
- */
- public long getMemoryUsed() {
- return this.memoryUsed;
- }
-
- /**
- * Check for whether the specified amount of memory can be reserved,
- * and if so reserve it. This is done in the same method so that the
- * memory is not taken away by a different thread between checking and
- * reserving - standard "test and set" behavior.
- * @param bytes Bytes requested
- * @return One of MEMORY_AVAILABLE, MEMORY_EXCEED_MAX, or MEMORY_EXCEED_SESSION_MAX
- */
- public synchronized int reserveMemory(long bytes, TupleGroupInfo groupInfo) {
- //check session limit first
- long sessionMax = config.getMaxAvailableSession();
- if(sessionMax - groupInfo.getGroupMemoryUsed() < bytes) {
- return MEMORY_EXCEED_SESSION_MAX;
- }
-
- //then check the total memory limit
- long max = config.getTotalAvailableMemory();
- if(max - memoryUsed < bytes) {
- return MEMORY_EXCEED_MAX;
- }
-
- /* NOTE1
- * Since the groupInfo call is being made in the synchronized block for the
entire buffer,
- * groupInfo doesn't need additional locking.
- */
- groupInfo.reserveMemory(bytes);
- memoryUsed += bytes;
-
- return MEMORY_AVAILABLE;
- }
-
- /**
- * Release memory
- * @param bytes Bytes to release
- */
- public synchronized void releaseMemory(long bytes, TupleGroupInfo groupInfo) {
- // see NOTE1
- groupInfo.releaseMemory(bytes);
- memoryUsed -= bytes;
- }
-
- /**
- * Get the amount of memory currently being used for the specified group in bytes
- * @param groupInfo TupleGroupInfo
- * @return Used memory, in bytes
- */
- public synchronized long getGroupMemoryUsed(TupleGroupInfo groupInfo) {
- // see NOTE1
- return groupInfo.getGroupMemoryUsed();
- }
-
- /**
- * Add a pinned batch
- * @param batch Pinned batch to add
- */
- public void addPinned(ManagedBatch batch) {
- synchronized (this) {
- addPinnedInternal(pinned, batch);
- }
- Map<TupleSourceID, Map<Integer, ManagedBatch>> theadPinned =
PINNED_BY_THREAD.get();
- if (theadPinned == null) {
- theadPinned = new HashMap<TupleSourceID, Map<Integer,
ManagedBatch>>();
- PINNED_BY_THREAD.set(theadPinned);
- }
- addPinnedInternal(theadPinned, batch);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.TRACE )) {
- batch.captureCallStack();
- }
- }
-
- private void addPinnedInternal(Map<TupleSourceID, Map<Integer,
ManagedBatch>> pinnedMap, ManagedBatch batch) {
- TupleSourceID tsID = batch.getTupleSourceID();
- Map<Integer, ManagedBatch> tsPinned = pinnedMap.get(tsID);
- if(tsPinned == null) {
- tsPinned = new HashMap<Integer, ManagedBatch>();
- pinnedMap.put(tsID, tsPinned);
- }
-
- // Add batch, indexed by beginRow
- tsPinned.put(new Integer(batch.getBeginRow()), batch);
- }
-
- /**
- * Remove a pinned batch, if not found do nothing and return null
- * @param tsID Tuple source id
- * @param beginRow Beginning row
- * @return Removed batch or null if not found
- */
- public ManagedBatch removePinned(TupleSourceID tsID, int beginRow) {
- ManagedBatch result = null;
- synchronized (this) {
- result = removePinnedInternal(pinned, tsID, beginRow);
- }
- if (result != null) {
- Map<TupleSourceID, Map<Integer, ManagedBatch>> theadPinned =
PINNED_BY_THREAD.get();
- if (theadPinned != null) {
- removePinnedInternal(theadPinned, tsID, beginRow);
- }
- }
- return result;
- }
-
- private ManagedBatch removePinnedInternal(Map<TupleSourceID, Map<Integer,
ManagedBatch>> pinnedMap, TupleSourceID tsID,
- int beginRow) {
- Map<Integer, ManagedBatch> tsPinned = pinnedMap.get(tsID);
- if(tsPinned != null) {
- ManagedBatch mbatch = tsPinned.remove(new Integer(beginRow));
-
- if(tsPinned.size() == 0) {
- pinnedMap.remove(tsID);
- }
-
- return mbatch;
- }
- return null;
- }
-
- /**
- * Add an unpinned batch
- * @param batch Unpinned batch to add
- */
- public void addUnpinned(ManagedBatch batch) {
- unpinned.add(batch);
- }
-
- /**
- * Remove an unpinned batch
- * @param batch Batch to remove
- */
- public void removeUnpinned(ManagedBatch batch) {
- unpinned.remove(batch);
- }
-
- /**
- * Get an iterator on all unpinned batches, typically for clean up
- * purposes. This iterator is "safe" in that it is based on a copy
- * of the real list and will not be invalidated by changes to the original
- * list. However, this means that it also may contain batches that
- * are no longer in the unpinned list, so the user of this iterator
- * should check that each batch is still unpinned.
- * @return Safe (but possibly out of date) iterator on unpinned batches
- */
- public Iterator<ManagedBatch> getAllUnpinned() {
- synchronized (unpinned) {
- List<ManagedBatch> copy = new ArrayList<ManagedBatch>(unpinned);
- return copy.iterator();
- }
- }
-
- public synchronized Map<TupleSourceID, Map<Integer, ManagedBatch>>
getAllPinned() {
- return new HashMap<TupleSourceID, Map<Integer,
ManagedBatch>>(pinned);
- }
-
- public Map<TupleSourceID, Map<Integer, ManagedBatch>>
getPinnedByCurrentThread() {
- return PINNED_BY_THREAD.get();
- }
-
-}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -22,6 +22,7 @@
package com.metamatrix.common.buffer.impl;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -38,8 +39,8 @@
private String groupName;
/** The bytes of memory used by this tuple group*/
- private long memoryUsed;
- private Set<TupleSourceID> tupleSourceIDs = new
HashSet<TupleSourceID>();
+ private volatile long memoryUsed;
+ private Set<TupleSourceID> tupleSourceIDs = Collections.synchronizedSet(new
HashSet<TupleSourceID>());
TupleGroupInfo(String groupName) {
this.groupName = groupName;
@@ -54,17 +55,14 @@
}
long getGroupMemoryUsed() {
- // no locking required. See MemoryState.NOTE1
return memoryUsed;
}
long reserveMemory(long bytes) {
- // no locking required. See MemoryState.NOTE1
return memoryUsed += bytes;
}
long releaseMemory(long bytes) {
- // no locking required. See MemoryState.NOTE1
return memoryUsed -= bytes;
}
}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -22,30 +22,55 @@
package com.metamatrix.common.buffer.impl;
-import java.util.*;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
import com.metamatrix.common.buffer.TupleSourceID;
import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
import com.metamatrix.common.types.DataTypeManager;
+import com.metamatrix.common.types.Streamable;
/**
* Describe a TupleSource and all important information about it.
*/
public class TupleSourceInfo {
+ private static final AtomicLong LOB_ID = new AtomicLong();
+ private static final ReferenceQueue<Streamable<?>> LOB_QUEUE = new
ReferenceQueue<Streamable<?>>();
+
+ private static class LobReference extends PhantomReference<Streamable<?>> {
+
+ String persistentStreamId;
+
+ public LobReference(Streamable<?> lob) {
+ super(lob, LOB_QUEUE);
+ this.persistentStreamId = lob.getPersistenceStreamId();
+ }
+ }
+
private TupleSourceType type; // Type of TupleSource, as defined in
BufferManager constants
private TupleSourceID tsID;
private List schema;
private String[] types;
private int rowCount;
- private TupleSourceStatus status;
+ private TupleSourceStatus status = TupleSourceStatus.ACTIVE;
private TupleGroupInfo groupInfo;
private boolean removed = false;
private TreeMap<Integer, ManagedBatch> batches = new TreeMap<Integer,
ManagedBatch>();
-
+ private Map<String, Streamable<?>> lobReferences; //references to
contained lobs
private boolean lobs;
+ @SuppressWarnings("unused")
+ private LobReference containingLobReference; //reference to containing lob
+
/**
* Construct a TupleSourceInfo given information about it.
* @param tsID Identifier
@@ -58,16 +83,51 @@
this.schema = schema;
this.types = types;
this.groupInfo = groupInfo;
- this.status = TupleSourceStatus.ACTIVE;
- this.rowCount = 0;
this.type = type;
this.lobs = checkForLobs();
}
+ public void setContainingLobReference(Streamable<?> s) {
+ this.containingLobReference = new LobReference(s);
+ }
+
+ public void addLobReference(Streamable<Object> lob) {
+ String id = lob.getReferenceStreamId();
+ if (id == null) {
+ id = String.valueOf(LOB_ID.getAndIncrement());
+ lob.setReferenceStreamId(id);
+ }
+ if (this.lobReferences == null) {
+ this.lobReferences = Collections.synchronizedMap(new HashMap<String,
Streamable<?>>());
+ }
+ this.lobReferences.put(id, lob);
+ }
+
+ public static String getStaleLobTupleSource() {
+ LobReference ref = (LobReference)LOB_QUEUE.poll();
+ if (ref == null) {
+ return null;
+ }
+ return ref.persistentStreamId;
+ }
+
+ public Streamable<?> getLobReference(String id) {
+ if (this.lobReferences == null) {
+ return null;
+ }
+ return this.lobReferences.get(id);
+ }
+
public void addBatch(ManagedBatch batch) {
batches.put(batch.getBeginRow(), batch);
}
+ /**
+ * Returns the batch containing the begin row or null
+ * if it doesn't exist
+ * @param beginRow
+ * @return
+ */
public ManagedBatch getBatch(int beginRow) {
Map.Entry<Integer, ManagedBatch> entry = batches.floorEntry(beginRow);
if (entry != null && entry.getValue().getEndRow() >= beginRow) {
@@ -184,19 +244,16 @@
return "TupleSourceInfo[" + this.tsID + "]";
//$NON-NLS-1$ //$NON-NLS-2$
}
-
private boolean checkForLobs() {
- boolean lob = false;
- if (types != null) {
- for (int i = 0; i < types.length; i++) {
- lob |= DataTypeManager.isLOB(types[i]);
+ if (types == null) {
+ // assume the worst
+ return true;
+ }
+ for (int i = 0; i < types.length; i++) {
+ if (DataTypeManager.isLOB(types[i]) || types[i] ==
DataTypeManager.DefaultDataTypes.OBJECT) {
+ return true;
}
}
- else {
- // if incase the user did not specify the types, then make
- // them walk the batch; pay the penalty of performence
- return true;
- }
- return lob;
+ return false;
}
}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -41,7 +41,6 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.buffer.BufferManagerPropertyNames;
-import com.metamatrix.common.buffer.LobTupleBatch;
import com.metamatrix.common.buffer.StorageManager;
import com.metamatrix.common.buffer.TupleBatch;
import com.metamatrix.common.buffer.TupleSourceID;
@@ -172,16 +171,6 @@
public void addBatch(TupleSourceID sourceID, TupleBatch batch, String[] types)
throws MetaMatrixComponentException {
- /* Right now we do not support the saving of the lobs to the disk.
- * by throwing an exception the memory is never released for lobs, which is same
- * as keeping them in a map. This is not going to be memory hog because, the
actual
- * lob (clob or blob) are backed by connector, xml is backed by already persisted
- * tuple source. Here we are only saving the referenes to the actual objects.
- */
- if (batch instanceof LobTupleBatch) {
- throw new
MetaMatrixComponentException(QueryExecPlugin.Util.getString("FileStorageManager.can_not_save_lobs"));
//$NON-NLS-1$
- }
-
// Defect 13342 - addBatch method now creates spill files if the total bytes
exceeds the max file size limit
TupleSourceInfo tsInfo = getTupleSourceInfo(sourceID, true);
synchronized (tsInfo) {
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -181,7 +181,10 @@
}
throw new MetaMatrixComponentException(e);
} finally {
- bufferMgr.releasePinnedBatches();
+ //iff this is the root command context release any pinned (Unclosed
tuplesources)
+ if (this.context.getParent() == null) {
+ bufferMgr.releasePinnedBatches();
+ }
}
if(done || requestClosed) {
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/dynamic/XMLSource.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/dynamic/XMLSource.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/dynamic/XMLSource.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -24,7 +24,6 @@
import java.io.StringReader;
import java.util.List;
-import java.util.Properties;
import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;
@@ -32,10 +31,7 @@
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.TupleSource;
-import com.metamatrix.common.buffer.TupleSourceID;
-import com.metamatrix.common.types.InvalidReferenceException;
import com.metamatrix.common.types.XMLType;
-import com.metamatrix.query.processor.xml.XMLUtil;
/**
@@ -59,12 +55,7 @@
// as processing excceptions.
if (value instanceof XMLType) {
XMLType xml = (XMLType)value;
- try {
- return xml.getSource(null);
- } catch (InvalidReferenceException e) {
- xml = XMLUtil.getFromBufferManager(bufferMgr, new
TupleSourceID(xml.getPersistenceStreamId()), new Properties());
- return xml.getSource(null);
- }
+ return xml.getSource(null);
}
return new StreamSource(new StringReader((String)value));
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -561,7 +561,6 @@
String rsKey = rsName.toUpperCase();
CursorState state = this.cursorStates.remove(rsKey);
if(state != null) {
- state.ts.closeSource();
try {
this.bufferMgr.removeTupleSource(state.tsID);
} catch (TupleSourceNotFoundException e) {
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -36,7 +36,6 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.query.ExpressionEvaluationException;
import com.metamatrix.common.buffer.BlockedException;
-import com.metamatrix.common.buffer.TupleSource;
import com.metamatrix.common.buffer.TupleSourceNotFoundException;
import com.metamatrix.query.rewriter.QueryRewriter;
import com.metamatrix.query.sql.lang.AbstractSetCriteria;
@@ -88,12 +87,6 @@
sortSymbols.add(dependentSetStates.get(i).valueExpression);
}
DependentValueSource originalVs =
(DependentValueSource)dependentNode.getContext().getVariableContext().getGlobalValue(valueSource);
- TupleSource ts;
- try {
- ts =
dependentNode.getBufferManager().getTupleSource(originalVs.getTupleSourceID());
- } catch (TupleSourceNotFoundException e) {
- throw new MetaMatrixComponentException(e);
- }
this.sortUtility = new SortUtility(originalVs.getTupleSourceID(),
sortSymbols, sortDirection, true, dependentNode.getBufferManager(),
dependentNode.getConnectionID());
}
dvs = new DependentValueSource(sortUtility.sort(),
dependentNode.getBufferManager());
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -103,6 +103,7 @@
result.add(value);
}
}
+ its.closeSource();
if (cachedSets == null) {
cachedSets = new HashMap<Expression, HashSet<Object>>();
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -368,7 +368,7 @@
currentGroupTuple = null;
groupBegin++;
}
-
+ this.groupTupleSource.closeSource();
if(rowCount != 0 || sortElements == null) {
// Close last group
List row = new ArrayList(functions.length);
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -299,6 +299,7 @@
matchEnd = -1;
partitionedTuple = null;
}
+ currentSource.closeSource();
currentSource = null;
currentPartition++;
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -363,6 +363,7 @@
} finally {
tc.saveBatch();
}
+ outTs.closeSource();
outTs = null;
}
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLPlan.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLPlan.java 2009-08-12
16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLPlan.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -71,7 +71,6 @@
import com.metamatrix.common.lob.LobChunk;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.types.DataTypeManager;
-import com.metamatrix.common.types.InvalidReferenceException;
import com.metamatrix.common.types.SQLXMLImpl;
import com.metamatrix.common.types.Streamable;
import com.metamatrix.common.types.XMLType;
@@ -274,7 +273,7 @@
// if this is the first chunk, then create a tuple source id for this
sequence of chunks
if (!this.docInProgress) {
this.docInProgress = true;
- this.docInProgressTupleSourceId =
XMLUtil.createXMLTupleSource(this.bufferMgr, this.resultsTupleSourceId.getStringID());
+ this.docInProgressTupleSourceId =
XMLUtil.createXMLTupleSource(this.bufferMgr, this.getContext().getConnectionID());
this.chunkPosition = 1;
}
@@ -287,8 +286,8 @@
// we want this to be naturally feed by chunks whether inside
// or out side the processor
- xml = new XMLType();
-
xml.setPersistenceStreamId(this.docInProgressTupleSourceId.getStringID());
+ xml = new XMLType(XMLUtil.getFromBufferManager(bufferMgr,
this.docInProgressTupleSourceId, getProperties()));
+ this.bufferMgr.setPersistentTupleSource(this.docInProgressTupleSourceId,
xml);
//reset current document state.
this.docInProgress = false;
@@ -356,12 +355,7 @@
Reader source = null;
try {
- try {
- source = xmlDoc.getCharacterStream();
- } catch (InvalidReferenceException e) {
- xmlDoc = XMLUtil.getFromBufferManager(this.bufferMgr, new
TupleSourceID(xmlDoc.getPersistenceStreamId()), props);
- source = xmlDoc.getCharacterStream();
- }
+ source = xmlDoc.getCharacterStream();
// Validate against schema
if(this.shouldValidate) {
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java 2009-08-12
16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -32,18 +32,17 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.buffer.BufferManager;
+import com.metamatrix.common.buffer.BufferManagerLobChunkStream;
import com.metamatrix.common.buffer.TupleSourceID;
import com.metamatrix.common.buffer.TupleSourceNotFoundException;
import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
-import com.metamatrix.common.lob.BufferManagerLobChunkStream;
import com.metamatrix.common.lob.ByteLobChunkStream;
import com.metamatrix.common.lob.LobChunk;
import com.metamatrix.common.lob.LobChunkInputStream;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.common.types.SQLXMLImpl;
import com.metamatrix.common.types.XMLReaderFactory;
-import com.metamatrix.common.types.XMLType;
import com.metamatrix.query.sql.symbol.ElementSymbol;
@@ -95,15 +94,8 @@
* This will reconstruct the XML object from the buffer manager from given
* buffer manager id.
*/
- public static XMLType getFromBufferManager(final BufferManager bufferMgr, final
TupleSourceID sourceId, Properties props) {
- SQLXML sqlXML = new SQLXMLImpl(new BufferMangerXMLReaderFactory(bufferMgr,
sourceId), props);
-
- // this is object to be sent to the client. The reference
- // id will be set by the buffer manager.
- XMLType xml = new XMLType(sqlXML);
- xml.setPersistenceStreamId(sourceId.getStringID());
-
- return xml;
+ public static SQLXML getFromBufferManager(final BufferManager bufferMgr, final
TupleSourceID sourceId, Properties props) {
+ return new SQLXMLImpl(new BufferMangerXMLReaderFactory(bufferMgr, sourceId),
props);
}
/**
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/xquery/XQueryPlan.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/xquery/XQueryPlan.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/xquery/XQueryPlan.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -160,21 +160,15 @@
List rows = new ArrayList(1);
List row = new ArrayList(1);
- // this may be little confusing, but the top layer is not immediately going
- // to disk for saving; when it does it only saves the streaming id, not the
- // contents. The below one saves immediately to disk, and when client refers
to top
- // id, processor know about the *saved* and gets the contents from it.
+ TupleSourceID savedId = XMLUtil.saveToBufferManager(this.bufferMgr,
this.getContext().getConnectionID(), srcXML, this.chunkSize);
+
+ //for large documents use the buffermanager version instead
+ if (this.bufferMgr.getFinalRowCount(savedId) > 1) {
+ srcXML = XMLUtil.getFromBufferManager(this.bufferMgr, savedId,
getFormatProperties());
+ }
- // the one which saves to disk
- TupleSourceID savedId = XMLUtil.saveToBufferManager(this.bufferMgr,
this.resultsTupleSourceId.getStringID(), srcXML, this.chunkSize);
-
- // here we have 2 options; create xml from original source or from buffer
- // manager. since buffer manager is slow we will choose the first option.
- // incase this xml used in processor it will be faster; if it used in the
- // client using steaming will be slow.
XMLType xml = new XMLType(srcXML);
- xml.setPersistenceStreamId(savedId.getStringID());
- //XMLValue xml = XMLUtil.getFromBufferManager(this.bufferMgr, savedId,
this.chunkSize, getFormatProperties());
+ this.bufferMgr.setPersistentTupleSource(savedId, xml);
// now build the top batch with information from the saved one.
row.add(xml);
Modified: trunk/engine/src/main/java/com/metamatrix/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/util/CommandContext.java 2009-08-12
16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/util/CommandContext.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -30,18 +30,14 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.query.QueryProcessingException;
-import com.metamatrix.common.util.PropertiesUtils;
import com.metamatrix.core.util.ArgCheck;
import com.metamatrix.query.QueryPlugin;
import com.metamatrix.query.eval.SecurityFunctionEvaluator;
import com.metamatrix.query.execution.QueryExecPlugin;
import com.metamatrix.query.optimizer.relational.PlanToProcessConverter;
import com.metamatrix.query.processor.QueryProcessor;
-import com.metamatrix.query.sql.symbol.ContextReference;
import com.metamatrix.query.sql.symbol.ElementSymbol;
import com.metamatrix.query.sql.symbol.Expression;
-import com.metamatrix.query.sql.util.ValueIterator;
-import com.metamatrix.query.sql.util.ValueIteratorSource;
import com.metamatrix.query.sql.util.VariableContext;
/**
@@ -159,6 +155,10 @@
public CommandContext() {
}
+ public CommandContext getParent() {
+ return parent;
+ }
+
public boolean isSessionFunctionEvaluated() {
if (parent != null) {
return parent.isSessionFunctionEvaluated();
Modified:
trunk/engine/src/main/java/com/metamatrix/query/xquery/saxon/SaxonXQueryExpression.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/xquery/saxon/SaxonXQueryExpression.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/com/metamatrix/query/xquery/saxon/SaxonXQueryExpression.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -44,10 +44,7 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.types.SQLXMLImpl;
-import com.metamatrix.core.util.StringUtil;
import com.metamatrix.query.QueryPlugin;
-import com.metamatrix.query.eval.Evaluator;
-import com.metamatrix.query.sql.symbol.Expression;
import com.metamatrix.query.util.XMLFormatConstants;
import com.metamatrix.query.xquery.XQueryExpression;
import com.metamatrix.query.xquery.XQuerySQLEvaluator;
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -27,17 +27,14 @@
import java.sql.SQLException;
import com.metamatrix.common.buffer.BufferManager;
-import com.metamatrix.common.lob.BufferManagerLobChunkStream;
import com.metamatrix.common.lob.ByteLobChunkStream;
import com.metamatrix.common.lob.LobChunk;
import com.metamatrix.common.lob.LobChunkProducer;
import com.metamatrix.common.lob.ReaderInputStream;
import com.metamatrix.common.types.BlobType;
import com.metamatrix.common.types.ClobType;
-import com.metamatrix.common.types.InvalidReferenceException;
import com.metamatrix.common.types.Streamable;
import com.metamatrix.common.types.XMLType;
-import com.metamatrix.dqp.DQPPlugin;
/**
* A Lob Stream builder class. Given the Lob object this object can build
@@ -48,7 +45,7 @@
LobChunkProducer internalStream = null;
- public LobChunkStream(Streamable streamable, int chunkSize, BufferManager bufferMgr)
+ public LobChunkStream(Streamable<?> streamable, int chunkSize, BufferManager
bufferMgr)
throws IOException {
try {
@@ -64,14 +61,6 @@
BlobType blob = (BlobType)streamable;
this.internalStream = new ByteLobChunkStream(blob.getBinaryStream(),
chunkSize);
}
- } catch (InvalidReferenceException e) {
- // if the lob did not have a persistent id, there is no way for us to
re-create the
- // object. so throw an error.
- if (streamable.getPersistenceStreamId() == null) {
- throw new
IOException(DQPPlugin.Util.getString("LobStream.noreference")); //$NON-NLS-1$
- }
- // otherwise read directly from the buffer manager.
- this.internalStream = new
BufferManagerLobChunkStream(streamable.getPersistenceStreamId(), bufferMgr);
} catch(SQLException e) {
IOException ex = new IOException();
ex.initCause(e);
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java 2009-08-12
16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -23,14 +23,9 @@
package org.teiid.dqp.internal.process;
import java.io.IOException;
-import java.util.List;
import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.common.CommonPlugin;
import com.metamatrix.common.buffer.BlockedOnMemoryException;
-import com.metamatrix.common.buffer.MemoryNotAvailableException;
-import com.metamatrix.common.buffer.TupleBatch;
-import com.metamatrix.common.buffer.TupleSourceID;
import com.metamatrix.common.buffer.TupleSourceNotFoundException;
import com.metamatrix.common.comm.api.ResultsReceiver;
import com.metamatrix.common.lob.LobChunk;
@@ -73,7 +68,7 @@
// If no previous stream is not found for this request create one and
// save for future
if (stream == null) {
- stream = createLobStream(new TupleSourceID(streamId));
+ stream = createLobStream(streamId);
}
// now get the chunk from stream
@@ -119,35 +114,13 @@
* Create a object which can create a sequence of LobChunk objects on a given
* LOB object
*/
- private LobChunkStream createLobStream(TupleSourceID referenceStreamId)
+ private LobChunkStream createLobStream(String referenceStreamId)
throws BlockedOnMemoryException, MetaMatrixComponentException, IOException,
TupleSourceNotFoundException {
// get the reference object in the buffer manager, and try to stream off
// the original sources.
- TupleBatch batch = null;
- try {
- batch = dqpCore.getBufferManager().pinTupleBatch(referenceStreamId, 1, 1);
- List[] tuples = batch.getAllTuples();
-
- if (tuples != null && tuples.length > 0) {
- Object anObj = tuples[0].get(0);
- if (anObj instanceof Streamable) {
- Streamable streamable = (Streamable)anObj;
- return new LobChunkStream(streamable, chunkSize,
dqpCore.getBufferManager());
- }
- }
- throw new
MetaMatrixComponentException(DQPPlugin.Util.getString("ProcessWorker.wrongdata"));
//$NON-NLS-1$
- } catch (MemoryNotAvailableException e) {
- throw BlockedOnMemoryException.INSTANCE;
- } finally {
- try {
- if (batch != null) {
- dqpCore.getBufferManager().unpinTupleBatch(referenceStreamId,
batch.getBeginRow(), batch.getEndRow());
- }
- } catch (MetaMatrixComponentException e) {
- LogManager.logDetail(LogConstants.CTX_DQP, e, "Call to unpin failed
during lob stream creation"); //$NON-NLS-1$
- }
- }
+ Streamable<?> streamable =
dqpCore.getBufferManager().getStreamable(parent.resultsID, referenceStreamId);
+ return new LobChunkStream(streamable, chunkSize, dqpCore.getBufferManager());
}
synchronized void setResultsReceiver(ResultsReceiver<LobChunk> resultsReceiver)
{
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -141,7 +141,7 @@
protected Command originalCommand;
private AnalysisRecord analysisRecord;
private TransactionContext transactionContext;
- private TupleSourceID resultsID;
+ protected TupleSourceID resultsID;
private Collection schemas; // These are schemas associated with XML results
private boolean returnsUpdateCount;
Modified:
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -236,19 +236,7 @@
assertTrue(xml1.getPersistenceStreamId() == null);
assertTrue(xml2.getPersistenceStreamId() == null);
- TupleSourceInfo info = mgr.getTupleSourceInfo(new
TupleSourceID(xml1.getReferenceStreamId()), true);
- // make sure the group name of the reference lob, is same as part batch id
- assertEquals(id.getStringID(), info.getGroupInfo().getGroupName());
-
- // now delete the parent tuple source, this should delete the
- // all the kids with same name
- mgr.removeTupleSource(id);
-
- try {
- mgr.getTupleSource(new TupleSourceID(xml2.getReferenceStreamId()));
- fail("this is already should have been cleaned up by above one");
//$NON-NLS-1$
- } catch (TupleSourceNotFoundException e) {
- }
+ assertNotNull(mgr.getStreamable(id, xml1.getReferenceStreamId()));
}
public void testAddStreamablePart() throws Exception {
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -50,6 +50,7 @@
import com.metamatrix.common.buffer.BufferManagerFactory;
import com.metamatrix.common.buffer.TupleSource;
import com.metamatrix.common.buffer.TupleSourceID;
+import com.metamatrix.common.buffer.TupleSourceNotFoundException;
import com.metamatrix.common.buffer.impl.BufferManagerImpl;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.common.types.XMLType;
@@ -215,16 +216,18 @@
}
}
- private void helpProcessException(ProcessorPlan plan, ProcessorDataManager
dataManager) {
+ private void helpProcessException(ProcessorPlan plan, ProcessorDataManager
dataManager) throws TupleSourceNotFoundException, MetaMatrixComponentException {
helpProcessException(plan, dataManager, null);
}
- private void helpProcessException(ProcessorPlan plan, ProcessorDataManager
dataManager, String expectedErrorMessage) {
-
+ private void helpProcessException(ProcessorPlan plan, ProcessorDataManager
dataManager, String expectedErrorMessage) throws TupleSourceNotFoundException,
MetaMatrixComponentException {
+ TupleSourceID tsId = null;
+ BufferManager bufferMgr = null;
try {
- BufferManager bufferMgr = BufferManagerFactory.getStandaloneBufferManager();
+ bufferMgr = BufferManagerFactory.getStandaloneBufferManager();
CommandContext context = new CommandContext("0", "test",
null, null, null); //$NON-NLS-1$ //$NON-NLS-2$
QueryProcessor processor = new QueryProcessor(plan, context, bufferMgr,
dataManager);
+ tsId = processor.getResultsID();
processor.process();
fail("Expected error during processing, but got none.");
//$NON-NLS-1$
} catch(MetaMatrixCoreException e) {
@@ -232,6 +235,8 @@
if(expectedErrorMessage != null) {
assertEquals(expectedErrorMessage, e.getMessage());
}
+ } finally {
+ bufferMgr.removeTupleSource(tsId);
}
}
@@ -2296,7 +2301,7 @@
* Tests a scalar subquery which returns more than one rows
* causes the expected Exception
*/
- @Test public void testSubqueryScalarException() {
+ @Test public void testSubqueryScalarException() throws Exception {
String sql = "SELECT e1, (SELECT e2 FROM pm2.g1) FROM pm1.g1";
//$NON-NLS-1$
// Construct data manager with data
Modified:
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -107,13 +107,12 @@
}
public static int getIntBatchSize() {
- List[] expected = new List[] {
- Arrays.asList(new Object[] { new Integer(0) }),
- };
+ List[] expected = new List[BATCH_SIZE];
+ Arrays.fill(expected, Arrays.asList(1));
String[] types = { "integer" }; //$NON-NLS-1$
- int size = (int)SizeUtility.getBatchSize( types, expected ) * BATCH_SIZE;
+ int size = (int)SizeUtility.getBatchSize( types, expected );
return size;
}
Modified:
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
===================================================================
---
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java 2009-08-12
16:36:33 UTC (rev 1234)
+++
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java 2009-08-12
17:39:14 UTC (rev 1235)
@@ -54,7 +54,6 @@
private static final String DEFAULT_MANAGEMENT_INTERVAL = "0";
//$NON-NLS-1$
private static final String DEFAULT_LOG_STATS_INTERVAL =
DEFAULT_MANAGEMENT_INTERVAL;
private static final String DEFAULT_SESSION_USE_PERCENTAGE = "100";
//$NON-NLS-1$
- private static final String DEFAULT_ID_CREATOR =
"com.metamatrix.common.buffer.impl.LongIDCreator"; //$NON-NLS-1$
private static final String DEFAULT_MAX_OPEN_FILES = "10"; //$NON-NLS-1$
// Instance