teiid SVN: r1242 - trunk/build/kit-runtime/deploy.
by teiid-commits@lists.jboss.org
Author: rareddy
Date: 2009-08-13 15:08:38 -0400 (Thu, 13 Aug 2009)
New Revision: 1242
Added:
trunk/build/kit-runtime/deploy/log4j.xml
Removed:
trunk/build/kit-runtime/deploy/log4j.properties
Log:
TEIID-769
Deleted: trunk/build/kit-runtime/deploy/log4j.properties
===================================================================
--- trunk/build/kit-runtime/deploy/log4j.properties 2009-08-13 18:40:27 UTC (rev 1241)
+++ trunk/build/kit-runtime/deploy/log4j.properties 2009-08-13 19:08:38 UTC (rev 1242)
@@ -1,34 +0,0 @@
-#### Log4J configuration for Teiid
-## Available log levels are TRACE,DEBUG,INFO,WARN,ERROR and FATAL
-##
-log4j.rootCategory=FATAL, TEIID
-
-# Print only messages of priority ERROR or higher for your category
-# This is for JGroups
-log4j.category.org.jgroups=WARN
-
-# JBoss transactions
-log4j.category.com.arjuna=WARN
-
-# This one controls for JBoss Cache
-log4j.category.org.jboss=WARN
-
-# This one controls Teiid
-log4j.category.org.teiid=WARN
-
-# un-comment for command logging (user, source commands submitted)
-#log4j.category.org.teiid.COMMAND_LOG=INFO
-
-# un-comment for audit logging
-#log4j.category.org.teiid.AUDIT_LOG=INFO
-
-# Appender writes to a file
-# Control the maximum log file size, number of backup files
-log4j.appender.TEIID=org.apache.log4j.RollingFileAppender
-log4j.appender.TEIID.File=${dqp.log4jFile}
-log4j.appender.TEIID.MaxFileSize=1000KB
-log4j.appender.TEIID.MaxBackupIndex=25
-
-# layout {date, priority, thread, category, message, newline}
-log4j.appender.TEIID.layout=org.apache.log4j.PatternLayout
-log4j.appender.TEIID.layout.ConversionPattern=%d %p [%t] %c - %m%n
\ No newline at end of file
Added: trunk/build/kit-runtime/deploy/log4j.xml
===================================================================
--- trunk/build/kit-runtime/deploy/log4j.xml (rev 0)
+++ trunk/build/kit-runtime/deploy/log4j.xml 2009-08-13 19:08:38 UTC (rev 1242)
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <appender name="ASYNC" class="org.apache.log4j.AsyncAppender">
+ <appender-ref ref="FILE"/>
+ </appender>
+
+ <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="${dqp.log4jFile}"/>
+ <param name="MaxFileSize" value="1000KB"/>
+ <param name="MaxBackupIndex" value="25"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %p [%t] %c - %m%n"/>
+ </layout>
+ </appender>
+
+ <!-- Console Appender
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <param name="Threshold" value="INFO"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %p [%t] %c - %m%n"/>
+ </layout>
+ </appender>
+ -->
+
+ <!-- ================ -->
+ <!-- categories -->
+ <!-- ================ -->
+
+ <category name="org.jgroups">
+ <priority value="WARN"/>
+ </category>
+
+ <category name="com.arjuna">
+ <priority value="WARN" />
+ </category>
+
+ <category name="org.jboss">
+ <priority value="WARN"/>
+ </category>
+
+ <category name="org.teiid">
+ <priority value="WARN" />
+ </category>
+
+ <!-- un-comment to enable COMMAND log
+ <appender name="COMMAND" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="log/command.log"/>
+ <param name="MaxFileSize" value="1000KB"/>
+ <param name="MaxBackupIndex" value="25"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %p [%t] %c - %m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org.teiid.COMMAND_LOG">
+ <priority value="INFO"/>
+ <appender-ref ref="COMMAND"/>
+ </category>
+ -->
+
+ <!-- Un-comment to enable AUDIT log
+ <appender name="AUDIT" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="log/audit.log"/>
+ <param name="MaxFileSize" value="1000KB"/>
+ <param name="MaxBackupIndex" value="25"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %p [%t] %c - %m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org.teiid.AUDIT_LOG">
+ <priority value="INFO"/>
+ <appender-ref ref="AUDIT"/>
+ </category>
+ -->
+
+ <root>
+ <appender-ref ref="ASYNC"/>
+ </root>
+
+</log4j:configuration>
Property changes on: trunk/build/kit-runtime/deploy/log4j.xml
___________________________________________________________________
Name: svn:mime-type
+ text/plain
15 years, 5 months
teiid SVN: r1241 - trunk/adminshell/src/main/resources/commands.
by teiid-commits@lists.jboss.org
Author: rareddy
Date: 2009-08-13 14:40:27 -0400 (Thu, 13 Aug 2009)
New Revision: 1241
Modified:
trunk/adminshell/src/main/resources/commands/mparse.bsh
Log:
TEIID-224
Modified: trunk/adminshell/src/main/resources/commands/mparse.bsh
===================================================================
--- trunk/adminshell/src/main/resources/commands/mparse.bsh 2009-08-13 14:58:47 UTC (rev 1240)
+++ trunk/adminshell/src/main/resources/commands/mparse.bsh 2009-08-13 18:40:27 UTC (rev 1241)
@@ -8,7 +8,7 @@
String str = orig_str.toLowerCase();
boolean record = false;
- if (str.endsWith(";") && str.matches("(select|insert|delete|update|exec|execute|create|drop)\\W.+")){
+ if (str.endsWith(";") && str.matches("(select|insert|delete|update|exec|create|drop)\\W.+")){
mmstr="execute(\""+orig_str.substring(0,orig_str.length()-1)+"\");";
record = true;
}
15 years, 5 months
teiid SVN: r1240 - in trunk: client-jdbc/src/main/java/com/metamatrix/jdbc and 3 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2009-08-13 10:58:47 -0400 (Thu, 13 Aug 2009)
New Revision: 1240
Modified:
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchResults.java
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMDatabaseMetaData.java
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java
trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java
trunk/client/src/main/java/com/metamatrix/dqp/message/ResultsMessage.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedRequestWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/test/java/com/metamatrix/dqp/message/TestResultsMessage.java
Log:
TEIID-783 allowing fetchsize to be set on resultsets and removing batch caching for forward only iteration.
Modified: trunk/client/src/main/java/com/metamatrix/dqp/message/ResultsMessage.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/dqp/message/ResultsMessage.java 2009-08-13 03:32:14 UTC (rev 1239)
+++ trunk/client/src/main/java/com/metamatrix/dqp/message/ResultsMessage.java 2009-08-13 14:58:47 UTC (rev 1240)
@@ -66,9 +66,6 @@
/** Last row index */
private int lastRow;
- /** Flag indicating whether this result set is part or all of the entire result set */
- private boolean partialResultsFlag;
-
/** Final row index in complete result set, if known */
private int finalRow = -1;
@@ -81,12 +78,6 @@
/** This object represents the time when results are produced on the server. */
private Date completedTimestamp;
- /** Fetch size for the results, if appropriate */
- private int fetchSize;
-
- /** Cursor type for the results, if appropriate */
- private int cursorType;
-
/** OPTION DEBUG log if OPTION DEBUG was used */
private String debugLog;
@@ -115,8 +106,6 @@
if(requestMsg != null){
this.processingTimestamp = requestMsg.getProcessingTimestamp();
this.completedTimestamp = new Date();
- this.fetchSize = requestMsg.getFetchSize();
- this.cursorType = requestMsg.getCursorType();
}
this.results = new ArrayList[0];
@@ -127,7 +116,6 @@
setResults( results );
setFirstRow( 1 );
setLastRow( results.length );
- setPartialResults( false );
this.columnNames = columnNames;
this.dataTypes = dataTypes;
@@ -180,13 +168,6 @@
/**
* @return
*/
- public boolean isPartialResults() {
- return partialResultsFlag;
- }
-
- /**
- * @return
- */
public Map getPlanDescription() {
return planDescription;
}
@@ -239,13 +220,6 @@
}
/**
- * @param b
- */
- public void setPartialResults(boolean b) {
- partialResultsFlag = b;
- }
-
- /**
* @param object
*/
public void setPlanDescription(Map object) {
@@ -301,34 +275,6 @@
this.dataTypes = dataTypes;
}
- /**
- * @return
- */
- public int getFetchSize() {
- return fetchSize;
- }
-
- /**
- * @param i
- */
- public void setFetchSize(int fetchSize) {
- this.fetchSize = fetchSize;
- }
-
- /**
- * @return
- */
- public int getCursorType() {
- return cursorType;
- }
-
- /**
- * @param i
- */
- public void setCursorType(int cursorType) {
- this.cursorType = cursorType;
- }
-
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
columnNames = ExternalizeUtil.readStringArray(in);
@@ -354,7 +300,6 @@
firstRow = in.readInt();
lastRow = in.readInt();
- partialResultsFlag = in.readBoolean();
finalRow = in.readInt();
//Parameters
@@ -362,8 +307,6 @@
processingTimestamp = (Date)in.readObject();
completedTimestamp = (Date)in.readObject();
- fetchSize = in.readInt();
- cursorType = in.readInt();
debugLog = (String)in.readObject();
annotations = (Collection)in.readObject();
isUpdateResult = in.readBoolean();
@@ -395,7 +338,6 @@
ExternalizeUtil.writeCollection(out, schemas);
out.writeInt(firstRow);
out.writeInt(lastRow);
- out.writeBoolean(partialResultsFlag);
out.writeInt(finalRow);
// Parameters
@@ -403,8 +345,6 @@
out.writeObject(processingTimestamp);
out.writeObject(completedTimestamp);
- out.writeInt(fetchSize);
- out.writeInt(cursorType);
out.writeObject(debugLog);
out.writeObject(annotations);
out.writeBoolean(isUpdateResult);
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchResults.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchResults.java 2009-08-13 03:32:14 UTC (rev 1239)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchResults.java 2009-08-13 14:58:47 UTC (rev 1240)
@@ -67,22 +67,24 @@
}
}
+
+ static final int DEFAULT_SAVED_BATCHES = 3;
- private static final int SAVED_BATCHES = 3;
-
- private ArrayList<Batch> batches = new ArrayList<Batch>(SAVED_BATCHES + 1);
+ private ArrayList<Batch> batches = new ArrayList<Batch>();
private int currentRowNumber;
private int lastRowNumber = -1;
private int highestRowNumber;
private BatchFetcher batchFetcher;
+ private int savedBatches = DEFAULT_SAVED_BATCHES;
public BatchResults(List[] batch, int beginRow, int endRow, boolean isLast) {
this.setBatch(new Batch(batch, beginRow, endRow, isLast));
}
- public BatchResults(BatchFetcher batchFetcher, Batch batch) {
+ public BatchResults(BatchFetcher batchFetcher, Batch batch, int savedBatches) {
this.batchFetcher = batchFetcher;
+ this.savedBatches = savedBatches;
this.setBatch(batch);
}
@@ -197,6 +199,9 @@
}
private void requestBatchAndWait(int beginRow) throws SQLException{
+ if (batches.size() == savedBatches) {
+ batches.remove(savedBatches - 1);
+ }
setBatch(batchFetcher.requestBatch(beginRow));
}
@@ -207,9 +212,6 @@
}
highestRowNumber = Math.max(batch.getEndRow(), highestRowNumber);
this.batches.add(0, batch);
- if (batches.size() > SAVED_BATCHES) {
- batches.remove(SAVED_BATCHES);
- }
}
public boolean hasNext() throws SQLException {
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMDatabaseMetaData.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMDatabaseMetaData.java 2009-08-13 03:32:14 UTC (rev 1239)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMDatabaseMetaData.java 2009-08-13 14:58:47 UTC (rev 1240)
@@ -124,8 +124,6 @@
final private static class RUNTIME_MODEL{
public final static String VIRTUAL_MODEL_NAME = "System"; //$NON-NLS-1$
- public final static String ODBC_SYSTEM_MODEL_NAME = "System.ODBC"; //$NON-NLS-1$
- public final static String WSDL_SYSTEM_MODEL_NAME = "DataServiceSystemModel"; //$NON-NLS-1$
public final static String JDBC_SYSTEM_MODEL_NAME = "System.JDBC"; //$NON-NLS-1$
}
@@ -508,13 +506,10 @@
ResultsMessage resultsMsg = new ResultsMessage();
resultsMsg.setColumnNames(columnNames);
resultsMsg.setDataTypes(dataTypes);
- resultsMsg.setPartialResults(false);
resultsMsg.setFirstRow(1);
resultsMsg.setLastRow(records.size());
resultsMsg.setFinalRow(records.size());
resultsMsg.setResults((List[])records.toArray(new List[records.size()]));
- resultsMsg.setFetchSize(500);
-
return resultsMsg;
}
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java 2009-08-13 03:32:14 UTC (rev 1239)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java 2009-08-13 14:58:47 UTC (rev 1240)
@@ -122,9 +122,9 @@
// server latency-related timestamp
this.processingTimestamp = resultsMsg.getProcessingTimestamp();
this.requestID = statement.getCurrentRequestID();
- this.batchResults = new BatchResults(this, getCurrentBatch(resultsMsg));
+ this.cursorType = statement.getResultSetType();
+ this.batchResults = new BatchResults(this, getCurrentBatch(resultsMsg), this.cursorType == ResultSet.TYPE_FORWARD_ONLY ? 1 : BatchResults.DEFAULT_SAVED_BATCHES);
setResultsData(resultsMsg);
- cursorType = statement.getResultSetType();
this.serverTimeZone = statement.getServerTimeZone();
if (metadata == null) {
@@ -142,7 +142,7 @@
if (this.parameters > 0) {
rmetadata = FilteredResultsMetadata.newInstance(rmetadata, resultColumns);
}
- this.fetchSize = resultsMsg.getFetchSize();
+ this.fetchSize = statement.getFetchSize();
}
public void setMaxFieldSize(int maxFieldSize) {
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java 2009-08-13 03:32:14 UTC (rev 1239)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java 2009-08-13 14:58:47 UTC (rev 1240)
@@ -662,7 +662,6 @@
*/
public void setFetchDirection(int direction) throws SQLException {
checkStatement();
- this.fetchDirection = direction;
}
/**
Modified: trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java
===================================================================
--- trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java 2009-08-13 03:32:14 UTC (rev 1239)
+++ trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java 2009-08-13 14:58:47 UTC (rev 1240)
@@ -52,7 +52,6 @@
private static final long REQUEST_ID = 0;
private static final int TYPE_FORWARD_ONLY = ResultSet.TYPE_FORWARD_ONLY;
private static final int TYPE_SCROLL_SENSITIVE = ResultSet.TYPE_SCROLL_SENSITIVE;
- private static final int TYPE_SCROLL_INSENSITIVE = ResultSet.TYPE_SCROLL_INSENSITIVE;
private MMStatement statement;
@@ -290,15 +289,13 @@
verify(statement, times(0)).close();
}
- public void testGetFetchSize() {
- try {
- MMResultSet rs = new MMResultSet(exampleResultsMsg2(),
- statement);
- assertEquals(500, rs.getFetchSize());
- rs.close();
- } catch (SQLException se) {
- // should never happen;
- }
+ public void testGetFetchSize() throws Exception {
+ MMStatement s = mock(MMStatement.class);
+ stub(s.getFetchSize()).toReturn(500);
+ MMResultSet rs = new MMResultSet(exampleResultsMsg2(), s);
+ assertEquals(500, rs.getFetchSize());
+ rs.setFetchSize(100);
+ assertEquals(100, rs.getFetchSize());
}
// //////////////////////Functions refer to ResultSet's TYPE_FORWARD_ONLY///
@@ -778,7 +775,7 @@
MetaMatrixProcessingException, SQLException {
ClientSideDQP dqp = mock(ClientSideDQP.class);
stub(statement.getDQP()).toReturn(dqp);
-
+ stub(statement.getFetchSize()).toReturn(fetchSize);
for (int i = batchLength; i < totalLength; i += batchLength) {
//forward requests
ResultsFuture<ResultsMessage> nextBatch = mock(ResultsFuture.class);
@@ -843,14 +840,12 @@
private MMResultSet helpGetResultSetImpl(int type)
throws SQLException {
ResultsMessage rsMsg = exampleResultsMsg2();
- rsMsg.setCursorType(type);
MMResultSet rs = new MMResultSet(rsMsg, statement);
return rs;
}
private MMResultSet helpGetNoResults(int type) throws SQLException {
ResultsMessage rsMsg = exampleResultsMsg3();
- rsMsg.setCursorType(type);
MMResultSet rs = new MMResultSet(rsMsg, statement);
return rs;
}
@@ -870,8 +865,6 @@
resultsMsg.setFinalRow(results.length);
resultsMsg.setLastRow(results.length);
resultsMsg.setFirstRow(1);
- resultsMsg.setFetchSize(500);
- resultsMsg.setCursorType(TYPE_SCROLL_INSENSITIVE);
return resultsMsg;
}
@@ -891,9 +884,6 @@
resultsMsg.setFinalRow(results.length);
resultsMsg.setLastRow(results.length);
resultsMsg.setFirstRow(1);
- resultsMsg.setFetchSize(500);
- resultsMsg.setCursorType(TYPE_SCROLL_INSENSITIVE);
-
return resultsMsg;
}
@@ -911,14 +901,11 @@
resultsMsg.setResults(results);
resultsMsg.setColumnNames(new String[] { "IntKey" }); //$NON-NLS-1$
resultsMsg.setDataTypes(new String[] { MMJDBCSQLTypeInfo.INTEGER });
- resultsMsg.setPartialResults(false);
resultsMsg.setFirstRow(begin);
if (lastBatch) {
resultsMsg.setFinalRow(begin + results.length - 1);
}
resultsMsg.setLastRow(begin + results.length - 1);
- resultsMsg.setFetchSize(fetchSize);
- resultsMsg.setCursorType(TYPE_SCROLL_INSENSITIVE);
return resultsMsg;
}
@@ -942,12 +929,9 @@
resultsMsg.setResults(new List[] {Arrays.asList(new Timestamp(0))});
resultsMsg.setColumnNames(new String[] { "TS" }); //$NON-NLS-1$
resultsMsg.setDataTypes(new String[] { MMJDBCSQLTypeInfo.TIMESTAMP });
- resultsMsg.setPartialResults(false);
resultsMsg.setFirstRow(1);
resultsMsg.setFinalRow(1);
resultsMsg.setLastRow(1);
- resultsMsg.setFetchSize(1);
- resultsMsg.setCursorType(TYPE_SCROLL_INSENSITIVE);
MMResultSet rs = new MMResultSet(resultsMsg, statement);
assertTrue(rs.next());
//assumes the mock statement is setup with GMT-5 server and GMT-6 client
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedRequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedRequestWorkItem.java 2009-08-13 03:32:14 UTC (rev 1239)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedRequestWorkItem.java 2009-08-13 14:58:47 UTC (rev 1240)
@@ -83,7 +83,6 @@
if(isFinal){
response.setFinalRow(cResult.getFinalRow());
}
- response.setPartialResults(!isFinal);
this.resultsCursor.resultsSent();
this.resultsReceiver.receiveResults(response);
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-08-13 03:32:14 UTC (rev 1239)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-08-13 14:58:47 UTC (rev 1240)
@@ -447,13 +447,6 @@
response.setUpdateResult(this.returnsUpdateCount);
// set final row
response.setFinalRow(finalRowCount);
- // Results are partial if the rowcount is not yet known,
- // or if the last row of this batch is less than the row count
- boolean isPartialResultSet = finalRowCount < 0
- || batch.getEndRow() < finalRowCount;
-
- // set parital result
- response.setPartialResults(isPartialResultSet);
// send any schemas associated with the results
response.setSchemas(this.schemas);
Modified: trunk/engine/src/test/java/com/metamatrix/dqp/message/TestResultsMessage.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/dqp/message/TestResultsMessage.java 2009-08-13 03:32:14 UTC (rev 1239)
+++ trunk/engine/src/test/java/com/metamatrix/dqp/message/TestResultsMessage.java 2009-08-13 14:58:47 UTC (rev 1240)
@@ -50,7 +50,6 @@
DataTypeManager.DefaultDataTypes.BIG_INTEGER,
DataTypeManager.DefaultDataTypes.BIG_INTEGER,
DataTypeManager.DefaultDataTypes.BIG_INTEGER});
- message.setFetchSize(100);
message.setFinalRow(200);
message.setFirstRow(1);
message.setLastRow(100);
@@ -58,7 +57,6 @@
parameters.add(new ParameterInfo(ParameterInfo.IN, 0));
parameters.add(new ParameterInfo(ParameterInfo.RESULT_SET, 5));
message.setParameters(parameters);
- message.setPartialResults(false);
Map planDescs = new HashMap();
planDescs.put("key1", "val1"); //$NON-NLS-1$ //$NON-NLS-2$
planDescs.put("key2", "val2"); //$NON-NLS-1$ //$NON-NLS-2$
@@ -103,7 +101,6 @@
assertEquals(DataTypeManager.DefaultDataTypes.BIG_INTEGER, copy.getDataTypes()[2]);
assertEquals(DataTypeManager.DefaultDataTypes.BIG_INTEGER, copy.getDataTypes()[3]);
- assertEquals(100, copy.getFetchSize());
assertEquals(200, copy.getFinalRow());
assertEquals(1, copy.getFirstRow());
assertEquals(100, copy.getLastRow());
@@ -117,8 +114,6 @@
assertEquals(ParameterInfo.RESULT_SET, info2.getType());
assertEquals(5, info2.getNumColumns());
- assertFalse(copy.isPartialResults());
-
assertNotNull(copy.getPlanDescription());
assertEquals(4, copy.getPlanDescription().size());
assertTrue(copy.getPlanDescription().containsKey("key1")); //$NON-NLS-1$
15 years, 5 months
teiid SVN: r1239 - in trunk: client-jdbc/src/main/java/com/metamatrix/jdbc and 8 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2009-08-12 23:32:14 -0400 (Wed, 12 Aug 2009)
New Revision: 1239
Modified:
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchFetcher.java
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchResults.java
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java
trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java
trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestBatchResults.java
trunk/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceImpl.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
Log:
TEIID-775 TEIID-767 changing cursor requests to use begin row and fetchsize, simplifying the buffermanager to not reshape pinned batches or require an endrow.
Modified: trunk/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -39,7 +39,7 @@
ResultsFuture<ResultsMessage> executeRequest(long reqID, RequestMessage message) throws MetaMatrixProcessingException, MetaMatrixComponentException;
- ResultsFuture<ResultsMessage> processCursorRequest(long reqID, int batchFirst, int batchLast) throws MetaMatrixProcessingException;
+ ResultsFuture<ResultsMessage> processCursorRequest(long reqID, int batchFirst, int fetchSize) throws MetaMatrixProcessingException;
ResultsFuture<?> closeRequest(long requestID) throws MetaMatrixProcessingException, MetaMatrixComponentException;
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchFetcher.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchFetcher.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchFetcher.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -30,5 +30,5 @@
* @since 4.3
*/
public interface BatchFetcher {
- Batch requestBatch(int beginRow, int endRow) throws SQLException;
+ Batch requestBatch(int beginRow) throws SQLException;
}
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchResults.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchResults.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchResults.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -76,16 +76,13 @@
private int lastRowNumber = -1;
private int highestRowNumber;
private BatchFetcher batchFetcher;
- private int fetchSize;
- public BatchResults(List[] batch, int beginRow, int endRow, boolean isLast, int fetchSize) {
- this.fetchSize = fetchSize;
+ public BatchResults(List[] batch, int beginRow, int endRow, boolean isLast) {
this.setBatch(new Batch(batch, beginRow, endRow, isLast));
}
- public BatchResults(BatchFetcher batchFetcher, int fetchSize, Batch batch) {
+ public BatchResults(BatchFetcher batchFetcher, Batch batch) {
this.batchFetcher = batchFetcher;
- this.fetchSize = fetchSize;
this.setBatch(batch);
}
@@ -97,20 +94,12 @@
if (currentRowNumber == 0 || (lastRowNumber != -1 && currentRowNumber > lastRowNumber)) {
return null;
}
- int closestMax = currentRowNumber + fetchSize - 1;
- int closestMin = Math.max(1, currentRowNumber - fetchSize + 1);
for (int i = 0; i < batches.size(); i++) {
Batch batch = batches.get(i);
if (currentRowNumber < batch.getBeginRow()) {
- if (i != SAVED_BATCHES - 1) {
- closestMax = Math.min(batch.getBeginRow(), closestMax);
- }
continue;
}
if (currentRowNumber > batch.getEndRow()) {
- if (i != SAVED_BATCHES - 1) {
- closestMin = Math.max(batch.getEndRow(), closestMin);
- }
continue;
}
if (i != 0) {
@@ -118,17 +107,13 @@
}
return batch.getRow(currentRowNumber);
}
- if (closestMax - currentRowNumber >= currentRowNumber - closestMin) {
- requestBatchAndWait(currentRowNumber, closestMax);
- } else {
- requestBatchAndWait(currentRowNumber, closestMin);
- }
+ requestBatchAndWait(currentRowNumber);
Batch batch = batches.get(0);
return batch.getRow(currentRowNumber);
}
private void requestNextBatch() throws SQLException {
- requestBatchAndWait(highestRowNumber + 1, highestRowNumber + fetchSize);
+ requestBatchAndWait(highestRowNumber + 1);
}
public boolean next() throws SQLException{
@@ -165,10 +150,6 @@
this.batchFetcher = batchFetcher;
}
- public int getFetchSize() {
- return fetchSize;
- }
-
public boolean absolute(int row) throws SQLException {
return absolute(row, 0);
}
@@ -215,8 +196,8 @@
return currentRowNumber;
}
- private void requestBatchAndWait(int beginRow, int endRow) throws SQLException{
- setBatch(batchFetcher.requestBatch(beginRow, endRow));
+ private void requestBatchAndWait(int beginRow) throws SQLException{
+ setBatch(batchFetcher.requestBatch(beginRow));
}
private void setBatch(Batch batch) {
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -101,6 +101,7 @@
private Map updatedPlanDescription;
private ResultsMessage resultsMsg;
private int maxFieldSize;
+ private int fetchSize;
/**
* Constructor.
@@ -121,7 +122,7 @@
// server latency-related timestamp
this.processingTimestamp = resultsMsg.getProcessingTimestamp();
this.requestID = statement.getCurrentRequestID();
- this.batchResults = new BatchResults(this, resultsMsg.getFetchSize(), getCurrentBatch(resultsMsg));
+ this.batchResults = new BatchResults(this, getCurrentBatch(resultsMsg));
setResultsData(resultsMsg);
cursorType = statement.getResultSetType();
this.serverTimeZone = statement.getServerTimeZone();
@@ -141,6 +142,7 @@
if (this.parameters > 0) {
rmetadata = FilteredResultsMetadata.newInstance(rmetadata, resultColumns);
}
+ this.fetchSize = resultsMsg.getFetchSize();
}
public void setMaxFieldSize(int maxFieldSize) {
@@ -230,7 +232,7 @@
* have been reset by the server.
*/
public int getFetchSize() throws SQLException {
- return this.batchResults.getFetchSize();
+ return this.fetchSize;
}
/**
@@ -341,11 +343,11 @@
return updatedPlanDescription;
}
- public Batch requestBatch(int beginRow, int endRow) throws SQLException{
- logger.fine("CursorResultsImpl.requestBatch] thread name: " + Thread.currentThread().getName() + " requestID: " + requestID + " beginRow: " + beginRow + " endinRow: " + endRow ); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
+ public Batch requestBatch(int beginRow) throws SQLException{
+ logger.fine("CursorResultsImpl.requestBatch] thread name: " + Thread.currentThread().getName() + " requestID: " + requestID + " beginRow: " + beginRow ); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
checkClosed();
try {
- ResultsFuture<ResultsMessage> results = statement.getDQP().processCursorRequest(requestID, beginRow, endRow);
+ ResultsFuture<ResultsMessage> results = statement.getDQP().processCursorRequest(requestID, beginRow, fetchSize);
ResultsMessage currentResultMsg = results.get();
this.setResultsData(currentResultMsg);
this.updatedPlanDescription = currentResultMsg.getPlanDescription();
@@ -1516,7 +1518,16 @@
}
public void setFetchSize(int rows) throws SQLException {
- throw SqlUtil.createFeatureNotSupportedException();
+ checkClosed();
+ if ( rows < 0 ) {
+ throw new MMSQLException(JDBCPlugin.Util.getString("MMStatement.Invalid_fetch_size")); //$NON-NLS-1$
+ }
+ // sets the fetch size on this statement
+ if (rows == 0) {
+ this.fetchSize = BaseDataSource.DEFAULT_FETCH_SIZE;
+ } else {
+ this.fetchSize = rows;
+ }
}
public void updateArray(int columnIndex, Array x) throws SQLException {
@@ -1527,7 +1538,6 @@
throw SqlUtil.createFeatureNotSupportedException();
}
-
public void updateAsciiStream(int columnIndex, InputStream x, int length)
throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
Modified: trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java
===================================================================
--- trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -783,13 +783,13 @@
//forward requests
ResultsFuture<ResultsMessage> nextBatch = mock(ResultsFuture.class);
stub(nextBatch.get()).toReturn(exampleResultsMsg4(i + 1, Math.min(batchLength, totalLength - i), fetchSize, i + batchLength >= totalLength));
- stub(dqp.processCursorRequest(REQUEST_ID, i + 1, i + fetchSize)).toReturn(nextBatch);
+ stub(dqp.processCursorRequest(REQUEST_ID, i + 1, fetchSize)).toReturn(nextBatch);
if (i + batchLength < totalLength) {
//backward requests
ResultsFuture<ResultsMessage> previousBatch = mock(ResultsFuture.class);
stub(previousBatch.get()).toReturn(exampleResultsMsg4(i - batchLength + 1, i, fetchSize, false));
- stub(dqp.processCursorRequest(REQUEST_ID, i, i - fetchSize + 1)).toReturn(previousBatch);
+ stub(dqp.processCursorRequest(REQUEST_ID, i, fetchSize)).toReturn(previousBatch);
}
}
Modified: trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestBatchResults.java
===================================================================
--- trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestBatchResults.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestBatchResults.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -41,7 +41,7 @@
private int totalRows = 50;
private boolean throwException;
- List batchCalls = new ArrayList<int[]>();
+ List<Integer> batchCalls = new ArrayList<Integer>();
public MockBatchFetcher() {
@@ -51,12 +51,16 @@
this.totalRows = totalRows;
}
- public Batch requestBatch(int beginRow, int endRow) throws SQLException {
- batchCalls.add(new int[] {beginRow, endRow});
+ public Batch requestBatch(int beginRow) throws SQLException {
+ batchCalls.add(beginRow);
if (throwException) {
throw new SQLException();
}
boolean isLast = false;
+ int endRow = beginRow + 9;
+ if (beginRow%10==0) {
+ endRow = beginRow - 9;
+ }
if(beginRow > endRow) {
if(endRow < 1) {
endRow = 1;
@@ -98,14 +102,14 @@
public void testGetCurrentRow1() throws Exception{
//empty batch
- BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true, 100);
+ BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true);
assertNull(batchResults.getCurrentRow());
batchResults.next();
assertNull(batchResults.getCurrentRow());
}
public void testGetCurrentRow2() throws Exception{
- BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, true);
assertNull(batchResults.getCurrentRow());
batchResults.next();
List expectedResult = new ArrayList();
@@ -115,30 +119,30 @@
public void testHasNext1() throws Exception{
//empty batch
- BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true, 100);
+ BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true);
assertFalse(batchResults.hasNext());
}
public void testHasNext2() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true);
assertTrue(batchResults.hasNext());
}
public void testHasNext3() throws Exception{
- BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, true);
assertTrue(batchResults.hasNext());
}
public void testNext1() throws Exception{
//empty batch
- BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true, 100);
+ BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true);
assertFalse(batchResults.next());
}
public void testNext2() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true);
assertTrue(batchResults.next());
List expectedResult = new ArrayList();
expectedResult.add(new Integer(1));
@@ -148,7 +152,7 @@
public void testNext3() throws Exception{
//one row batch, multiple batches
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false);
batchResults.setBatchFetcher(new MockBatchFetcher());
assertTrue(batchResults.next());
assertTrue(batchResults.next());
@@ -158,7 +162,7 @@
}
public void testNext4() throws Exception{
- BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false);
batchResults.setBatchFetcher(new MockBatchFetcher());
int i;
for(i=0; i<10; i++) {
@@ -177,13 +181,13 @@
public void testHasPrevious1() throws Exception{
//empty batch
- BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true, 100);
+ BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true);
assertFalse(batchResults.hasPrevious());
}
public void testHasPrevious2() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true);
assertFalse(batchResults.hasPrevious());
batchResults.next();
assertFalse(batchResults.hasPrevious());
@@ -193,13 +197,13 @@
public void testPrevious1() throws Exception{
//empty batch
- BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true, 100);
+ BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true);
assertFalse(batchResults.previous());
}
public void testPrevious2() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true);
assertTrue(batchResults.next());
assertFalse(batchResults.previous());
List expectedResult = new ArrayList();
@@ -212,7 +216,7 @@
public void testPrevious3() throws Exception{
//one row batch, multiple batches
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false);
batchResults.setBatchFetcher(new MockBatchFetcher());
assertFalse(batchResults.previous());
assertTrue(batchResults.next());
@@ -232,7 +236,7 @@
}
public void testPrevious4() throws Exception{
- BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false);
batchResults.setBatchFetcher(new MockBatchFetcher());
int i;
for(i=0; i<=10; i++) {
@@ -248,14 +252,14 @@
public void testAbsolute1() throws Exception{
//empty batch
- BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true, 100);
+ BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true);
assertFalse(batchResults.absolute(0));
assertFalse(batchResults.absolute(1));
}
public void testAbsolute2() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true);
batchResults.setBatchFetcher(new MockBatchFetcher());
assertFalse(batchResults.absolute(0));
assertTrue(batchResults.absolute(1));
@@ -266,7 +270,7 @@
}
public void testAbsolute3() throws Exception{
- BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false);
batchResults.setBatchFetcher(new MockBatchFetcher(200));
assertFalse(batchResults.absolute(0));
assertTrue(batchResults.absolute(11));
@@ -286,7 +290,7 @@
//move backwards with absolute
public void testAbsolute4() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false);
batchResults.setBatchFetcher(new MockBatchFetcher());
assertTrue(batchResults.absolute(10));
assertTrue(batchResults.absolute(2));
@@ -297,7 +301,7 @@
public void testAbsolute5() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false);
batchResults.setBatchFetcher(new MockBatchFetcher());
assertTrue(batchResults.absolute(-1));
List expectedResult = new ArrayList();
@@ -308,7 +312,7 @@
}
public void testCurrentRowNumber() throws Exception {
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true);
assertEquals(0, batchResults.getCurrentRowNumber());
batchResults.next();
assertEquals(1, batchResults.getCurrentRowNumber());
@@ -319,7 +323,7 @@
}
public void testSetException() throws Exception {
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false);
MockBatchFetcher batchFetcher = new MockBatchFetcher();
batchResults.setBatchFetcher(batchFetcher);
batchFetcher.throwException();
@@ -332,7 +336,7 @@
}
public void testBatching() throws Exception {
- BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false, 10);
+ BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false);
MockBatchFetcher batchFetcher = new MockBatchFetcher(60);
batchResults.setBatchFetcher(batchFetcher);
@@ -346,30 +350,29 @@
}
// verify batch calls
- checkResults(new int[][] {
+ checkResults(new int[] {
// going forwards - end > begin
- new int[] { 11, 20 },
- new int[] { 21, 30 },
- new int[] { 31, 40 },
- new int[] { 41, 50 },
+ 11,
+ 21,
+ 31,
+ 41,
// going backwards - begin > end
// last 3 batches were saved, only need the first 2 again
- new int[] { 20, 11 },
- new int[] { 10, 1 },
+ 20,
+ 10,
}, batchFetcher.batchCalls);
assertTrue(batchResults.absolute(50));
assertEquals(new Integer(50), batchResults.getCurrentRow().get(0));
}
- private void checkResults(int[][] expectedCalls, List<int[]> batchCalls) {
+ private void checkResults(int[] expectedCalls, List<Integer> batchCalls) {
assertEquals(expectedCalls.length, batchCalls.size());
for(int i=0; i<batchCalls.size(); i++) {
- int[] range = batchCalls.get(i);
- int[] expected = expectedCalls[i];
- assertEquals("On call " + i + " expected different begin", expected[0], range[0]);
- assertEquals("On call " + i + " expected different end", expected[1], range[1]);
+ int range = batchCalls.get(i);
+ int expected = expectedCalls[i];
+ assertEquals("On call " + i + " expected different begin", expected, range);
}
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -147,26 +147,24 @@
*
* @param tupleSourceID Tuple source identifier
* @param beginRow First row index to return
- * @param maxEndRow Maximum last row index to return, may be less actually returned
* @return Batch of rows starting from beginRow and not past maxEndRow
* @throws TupleSourceNotFoundException if tuple source could not be found
* @throws MetaMatrixComponentException indicating a non-business-related
* exception (such as a communication exception)
* @throws MemoryNotAvailableException If memory was not available for the pin
*/
- TupleBatch pinTupleBatch(TupleSourceID tupleSourceID, int beginRow, int maxEndRow)
+ TupleBatch pinTupleBatch(TupleSourceID tupleSourceID, int beginRow)
throws TupleSourceNotFoundException, MemoryNotAvailableException, MetaMatrixComponentException;
/**
* Unpins a range of rows from the given tuple source
* @param tupleSourceID Tuple source identifier
* @param firstRow First row to unpin
- * @param lastRow Last row to unpin (inclusive)
* @throws TupleSourceNotFoundException if tuple source could not be found
* @throws MetaMatrixComponentException indicating a non-business-related
* exception (such as a communication exception)
*/
- void unpinTupleBatch(TupleSourceID tupleSourceID, int firstRow, int lastRow)
+ void unpinTupleBatch(TupleSourceID tupleSourceID, int firstRow)
throws TupleSourceNotFoundException, MetaMatrixComponentException;
/**
@@ -278,14 +276,16 @@
* @throws TupleSourceNotFoundException
* @throws MetaMatrixComponentException
*/
- public Streamable<?> getStreamable(TupleSourceID id, String referenceId)
+ Streamable<?> getStreamable(TupleSourceID id, String referenceId)
throws TupleSourceNotFoundException, MetaMatrixComponentException;
/**
- * Assign the tuplesource as the persistent stream for the streamable
+ * Assign the {@link TupleSource} as the persistent stream for the {@link Streamable}
* @param id
* @param s
* @throws TupleSourceNotFoundException
*/
- public void setPersistentTupleSource(TupleSourceID id, Streamable<? extends Object> s) throws TupleSourceNotFoundException;
+ void setPersistentTupleSource(TupleSourceID id, Streamable<? extends Object> s)
+ throws TupleSourceNotFoundException;
+
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -460,8 +460,6 @@
// Look up info
TupleSourceInfo info = getTupleSourceInfo(tupleSourceID, true);
- // if there are lobs in source then we need to keep manage then
- // in a separate tuple sources.
if (info.lobsInSource()) {
correctLobReferences(info, tupleBatch);
}
@@ -477,7 +475,9 @@
synchronized(info) {
if(info.isRemoved()) {
- releaseMemory(bytes, info.getGroupInfo());
+ if(location != ManagedBatch.PERSISTENT) {
+ releaseMemory(bytes, info.getGroupInfo());
+ }
throw new TupleSourceNotFoundException(QueryExecPlugin.Util.getString("BufferManagerImpl.tuple_source_not_found", tupleSourceID)); //$NON-NLS-1$
}
@@ -510,22 +510,22 @@
info.setRowCount(tupleBatch.getEndRow());
}
}
-
+
/**
- * Pin a tuple source in memory and return it. This batch must be unpinned by
+ * Pin a tuple batch in memory and return it. This batch must be unpinned by
* passed the identical tuple source ID and beginning row.
+ * NOTE: the returned {@link TupleBatch} will have the exact same bounds as the {@link ManagedBatch}
* @param tupleSourceID Tuple source identifier
* @param beginRow Beginning row
- * @param maxEndRow Max end row to return
* @throws TupleSourceNotFoundException If tuple source not found
* @throws MetaMatrixComponentException If an internal server error occurred
* @throws MemoryNotAvailableException If memory was not available for the pin
*/
- public TupleBatch pinTupleBatch(TupleSourceID tupleSourceID, int beginRow, int maxEndRow)
+ public TupleBatch pinTupleBatch(TupleSourceID tupleSourceID, int beginRow)
throws TupleSourceNotFoundException, MemoryNotAvailableException, MetaMatrixComponentException {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, new Object[]{"Pinning tupleBatch for", tupleSourceID, "beginRow:", beginRow, "maxEndRow:", maxEndRow}); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, new Object[]{"Pinning tupleBatch for", tupleSourceID, "beginRow:", beginRow}); //$NON-NLS-1$ //$NON-NLS-2$
}
this.pinRequests.incrementAndGet();
@@ -537,98 +537,63 @@
synchronized (info) {
mbatch = info.getBatch(beginRow);
}
- if(mbatch == null) {
- return new TupleBatch(beginRow, Collections.EMPTY_LIST);
- }
-
- int endRow = 0;
- int pass = 0;
- //if the client request previous batch, the end row
- //is smaller than the begin row
- if(beginRow > maxEndRow) {
- endRow = Math.min(beginRow, mbatch.getEndRow());
- beginRow = Math.max(maxEndRow, mbatch.getBeginRow());
- }else {
- endRow = Math.min(maxEndRow, mbatch.getEndRow());
- }
- int count = endRow - beginRow + 1;
- if (count == 0) {
+ if (mbatch == null || beginRow == mbatch.getEndRow() + 1) {
return new TupleBatch(beginRow, Collections.EMPTY_LIST);
}
long memoryRequiredByBatch = mbatch.getSize();
TupleBatch memoryBatch = null;
- while(pass < 2) {
- if(pass == 1) {
- if(memoryAvailability == BufferManagerImpl.MEMORY_EXCEED_MAX) {
- clean(memoryRequiredByBatch, null);
- }else {
- //exceed session limit
- clean(memoryRequiredByBatch, info.getGroupInfo());
- }
- }
+ for (int pass = 0; pass < 2; pass++) {
+ if (memoryAvailability == BufferManagerImpl.MEMORY_EXCEED_MAX) {
+ clean(memoryRequiredByBatch, null);
+ } else if (memoryAvailability == BufferManagerImpl.MEMORY_EXCEED_SESSION_MAX) {
+ clean(memoryRequiredByBatch, info.getGroupInfo());
+ }
synchronized(info) {
if(mbatch.getLocation() == ManagedBatch.PINNED) {
// Load batch from memory - already pinned
memoryBatch = mbatch.getBatch();
-
- } else if(mbatch.getLocation() == ManagedBatch.UNPINNED) {
+ break;
+ }
+ if(mbatch.getLocation() == ManagedBatch.UNPINNED) {
// Already in memory - just move from unpinned to pinned
this.unpinned.remove(mbatch);
pin(mbatch);
// Load batch from memory
memoryBatch = mbatch.getBatch();
-
- } else {
- memoryRequiredByBatch = mbatch.getSize();
-
- // Try to reserve some memory
- if((memoryAvailability = reserveMemory(memoryRequiredByBatch, info.getGroupInfo())) != BufferManagerImpl.MEMORY_AVAILABLE) {
- if(pass == 0) {
- // Break and try to clean - it is important to break out of the synchronized block
- // here so that the clean does not cause a deadlock on this TupleSourceInfo
- pass++;
- continue;
- }
+ break;
+ }
+ memoryRequiredByBatch = mbatch.getSize();
+
+ // Try to reserve some memory
+ if((memoryAvailability = reserveMemory(memoryRequiredByBatch, info.getGroupInfo())) != BufferManagerImpl.MEMORY_AVAILABLE) {
+ continue;
+ }
- // Failed to reserve the memory even after a clean, so record the failure and fail the pin
- this.pinFailures.incrementAndGet();
+ this.pinnedFromDisk.incrementAndGet();
- // Couldn't reserve memory, so throw exception
- throw new MemoryNotAvailableException(QueryExecPlugin.Util.getString("BufferManagerImpl.no_memory_available")); //$NON-NLS-1$
- }
+ // Memory was reserved, so move from persistent to memory and pin
+ int internalBeginRow = mbatch.getBeginRow();
+ memoryBatch = diskMgr.getBatch(tupleSourceID, internalBeginRow, info.getTypes());
- this.pinnedFromDisk.incrementAndGet();
-
- // Memory was reserved, so move from persistent to memory and pin
- int internalBeginRow = mbatch.getBeginRow();
- memoryBatch = diskMgr.getBatch(tupleSourceID, internalBeginRow, info.getTypes());
-
- mbatch.setBatch(memoryBatch);
- if (info.lobsInSource()) {
- correctLobReferences(info, memoryBatch);
- }
- pin(mbatch);
+ mbatch.setBatch(memoryBatch);
+ if (info.lobsInSource()) {
+ correctLobReferences(info, memoryBatch);
}
+ pin(mbatch);
}
-
- break;
}
-
- // Batch should now be pinned in memory, so grab it and build a correctly
- // sized batch to return
- if(beginRow == memoryBatch.getBeginRow() && count == memoryBatch.getRowCount()) {
- return memoryBatch;
+
+ if (memoryBatch == null) {
+ // Failed to reserve the memory even after a clean, so record the failure and fail the pin
+ this.pinFailures.incrementAndGet();
+ throw new MemoryNotAvailableException(QueryExecPlugin.Util.getString("BufferManagerImpl.no_memory_available")); //$NON-NLS-1$
}
- int firstOffset = beginRow - memoryBatch.getBeginRow();
- List[] memoryRows = memoryBatch.getAllTuples();
- List[] rows = new List[count];
- System.arraycopy(memoryRows, firstOffset, rows, 0, count);
- return new TupleBatch(beginRow, rows);
+ return memoryBatch;
}
private void pin(ManagedBatch mbatch) {
@@ -647,7 +612,7 @@
* @throws TupleSourceNotFoundException If tuple source not found
* @throws MetaMatrixComponentException If an internal server error occurred
*/
- public void unpinTupleBatch(TupleSourceID tupleSourceID, int beginRow, int endRow)
+ public void unpinTupleBatch(TupleSourceID tupleSourceID, int beginRow)
throws TupleSourceNotFoundException, MetaMatrixComponentException {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
@@ -891,10 +856,8 @@
// Look up info
TupleSourceInfo info = getTupleSourceInfo(tupleSourceID, true);
- short location = ManagedBatch.PERSISTENT;
synchronized(info) {
-
List<LobChunk> data = new ArrayList<LobChunk>();
data.add(streamChunk);
TupleBatch batch = new TupleBatch(beginRow, new List[] {data});
@@ -902,7 +865,7 @@
// Update tuple source state (we could calculate the size of stream if need to)
ManagedBatch managedBatch = new ManagedBatch(tupleSourceID, beginRow, batch.getEndRow(), 0);
- managedBatch.setLocation(location);
+ managedBatch.setLocation(ManagedBatch.PERSISTENT);
// Update info with new rows
info.addBatch(managedBatch);
@@ -932,7 +895,7 @@
for (ManagedBatch managedBatch : pinnedByThread) {
try {
//TODO: add trace logging about the batch that is being unpinned
- unpinTupleBatch(managedBatch.getTupleSourceID(), managedBatch.getBeginRow(), managedBatch.getEndRow());
+ unpinTupleBatch(managedBatch.getTupleSourceID(), managedBatch.getBeginRow());
} catch (TupleSourceNotFoundException err) {
} catch (MetaMatrixComponentException err) {
e = err;
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -192,7 +192,7 @@
return true;
}
- if(obj == null || ! (obj instanceof ManagedBatch)) {
+ if(! (obj instanceof ManagedBatch)) {
return false;
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceImpl.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceImpl.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -40,7 +40,6 @@
private final BufferManagerImpl bufferManagerImpl;
private TupleSourceID tupleSourceID;
private List<?> schema;
- private int batchSize;
private WeakReference<TupleBatch> currentBatch;
private int currentRow = 1;
private int mark = 1;
@@ -50,7 +49,6 @@
this.bufferManagerImpl = bufferManagerImpl;
this.tupleSourceID = tupleSourceID;
this.schema = schema;
- this.batchSize = batchSize;
}
@Override
@@ -122,7 +120,7 @@
TupleBatch batch = getCurrentBatch();
if(batch != null ) {
try {
- this.bufferManagerImpl.unpinTupleBatch(this.tupleSourceID, batch.getBeginRow(), batch.getEndRow());
+ this.bufferManagerImpl.unpinTupleBatch(this.tupleSourceID, batch.getBeginRow());
} catch (TupleSourceNotFoundException e) {
throw new MetaMatrixComponentException(e);
} finally {
@@ -143,7 +141,7 @@
}
try{
- batch = this.bufferManagerImpl.pinTupleBatch(this.tupleSourceID, currentRow, (currentRow + batchSize -1));
+ batch = this.bufferManagerImpl.pinTupleBatch(this.tupleSourceID, currentRow);
currentBatch = new WeakReference<TupleBatch>(batch);
} catch (MemoryNotAvailableException e) {
/* Defect 18499 - ProcessWorker doesn't know how to handle MemoryNotAvailableException properly,
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -106,7 +106,7 @@
this.matchEnd = -1;
}
- //TODO: save partial work
+ //TODO: save partial work or combine with the sort operation
public void computeBatchBounds(SourceState state) throws TupleSourceNotFoundException, MetaMatrixComponentException {
if (endTuples != null) {
return;
@@ -117,12 +117,12 @@
while (beginRow <= state.getRowCount()) {
TupleBatch batch = null;
try {
- batch = this.joinNode.getBufferManager().pinTupleBatch(state.getTupleSourceID(), beginRow, beginRow + this.joinNode.getBatchSize() - 1);
+ batch = this.joinNode.getBufferManager().pinTupleBatch(state.getTupleSourceID(), beginRow);
if (batch.getRowCount() == 0) {
break;
}
beginRow = batch.getEndRow() + 1;
- this.joinNode.getBufferManager().unpinTupleBatch(state.getTupleSourceID(), batch.getBeginRow(), batch.getEndRow());
+ this.joinNode.getBufferManager().unpinTupleBatch(state.getTupleSourceID(), batch.getBeginRow());
if (!bounds.isEmpty()) {
overlap.add(comp.compare(bounds.get(bounds.size() - 1), batch.getTuple(batch.getBeginRow())) == 0);
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -148,9 +148,8 @@
return terminationBatch;
}
int beginPinned = this.outputBeginRow;
- int endPinned = this.outputBeginRow+getBatchSize()-1;
try {
- TupleBatch outputBatch = getBufferManager().pinTupleBatch(outputID, beginPinned, endPinned);
+ TupleBatch outputBatch = getBufferManager().pinTupleBatch(outputID, beginPinned);
this.outputBeginRow += outputBatch.getRowCount();
@@ -164,7 +163,7 @@
} catch(MemoryNotAvailableException e) {
throw BlockedOnMemoryException.INSTANCE;
} finally {
- getBufferManager().unpinTupleBatch(outputID, beginPinned, endPinned);
+ getBufferManager().unpinTupleBatch(outputID, beginPinned);
}
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -164,7 +164,7 @@
while(!doneReading) {
try {
// Load and pin batch
- TupleBatch batch = bufferManager.pinTupleBatch(sourceID, sortPhaseRow, sortPhaseRow + batchSize - 1);
+ TupleBatch batch = bufferManager.pinTupleBatch(sourceID, sortPhaseRow);
if (batch.getRowCount() == 0) {
if (bufferManager.getStatus(sourceID) == TupleSourceStatus.FULL) {
@@ -205,7 +205,7 @@
// Clean up - unpin rows
for (int[] bounds : pinned) {
- bufferManager.unpinTupleBatch(sourceID, bounds[0], bounds[1]);
+ bufferManager.unpinTupleBatch(sourceID, bounds[0]);
}
}
@@ -245,7 +245,7 @@
for(; sortedIndex<activeTupleIDs.size(); sortedIndex++) {
TupleSourceID activeID = activeTupleIDs.get(sortedIndex);
try {
- TupleBatch sortedBatch = bufferManager.pinTupleBatch(activeID, 1, this.batchSize);
+ TupleBatch sortedBatch = bufferManager.pinTupleBatch(activeID, 1);
workingBatches.add(sortedBatch);
} catch(MemoryNotAvailableException e) {
break;
@@ -379,10 +379,9 @@
TupleSourceID tsID = unpinWorkingBatch(batchIndex, currentBatch);
int beginRow = workingPointers[batchIndex];
- int endRow = beginRow + this.batchSize - 1;
try {
- TupleBatch newBatch = bufferManager.pinTupleBatch(tsID, beginRow, endRow);
+ TupleBatch newBatch = bufferManager.pinTupleBatch(tsID, beginRow);
if(newBatch.getRowCount() == 0) {
// Done with this working batch
workingBatches.set(batchIndex, null);
@@ -400,8 +399,7 @@
MetaMatrixComponentException {
TupleSourceID tsID = activeTupleIDs.get(batchIndex);
int lastBeginRow = currentBatch.getBeginRow();
- int lastEndRow = currentBatch.getEndRow();
- bufferManager.unpinTupleBatch(tsID, lastBeginRow, lastEndRow);
+ bufferManager.unpinTupleBatch(tsID, lastBeginRow);
return tsID;
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -269,14 +269,14 @@
}
public ResultsFuture<ResultsMessage> processCursorRequest(long reqID,
- int batchFirst, int batchLast) throws MetaMatrixProcessingException {
+ int batchFirst, int fetchSize) throws MetaMatrixProcessingException {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "DQP process cursor request from " + batchFirst + " to " + batchLast); //$NON-NLS-1$//$NON-NLS-2$
+ LogManager.logDetail(LogConstants.CTX_DQP, "DQP process cursor request from " + batchFirst); //$NON-NLS-1$
}
DQPWorkContext workContext = DQPWorkContext.getWorkContext();
ResultsFuture<ResultsMessage> resultsFuture = new ResultsFuture<ResultsMessage>();
RequestWorkItem workItem = getRequestWorkItem(workContext.getRequestID(reqID));
- workItem.requestMore(batchFirst, batchLast, resultsFuture.getResultsReceiver());
+ workItem.requestMore(batchFirst, batchFirst + Math.min(fetchSize, maxFetchSize) - 1, resultsFuture.getResultsReceiver());
return resultsFuture;
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -26,7 +26,6 @@
import java.nio.charset.Charset;
import java.sql.SQLException;
-import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.lob.ByteLobChunkStream;
import com.metamatrix.common.lob.LobChunk;
import com.metamatrix.common.lob.LobChunkProducer;
@@ -45,7 +44,7 @@
LobChunkProducer internalStream = null;
- public LobChunkStream(Streamable<?> streamable, int chunkSize, BufferManager bufferMgr)
+ public LobChunkStream(Streamable<?> streamable, int chunkSize)
throws IOException {
try {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -120,7 +120,7 @@
// get the reference object in the buffer manager, and try to stream off
// the original sources.
Streamable<?> streamable = dqpCore.getBufferManager().getStreamable(parent.resultsID, referenceStreamId);
- return new LobChunkStream(streamable, chunkSize, dqpCore.getBufferManager());
+ return new LobChunkStream(streamable, chunkSize);
}
synchronized void setResultsReceiver(ResultsReceiver<LobChunk> resultsReceiver) {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -59,7 +59,6 @@
import com.metamatrix.common.xa.XATransactionException;
import com.metamatrix.core.MetaMatrixCoreException;
import com.metamatrix.core.log.MessageLevel;
-import com.metamatrix.core.util.Assertion;
import com.metamatrix.dqp.DQPPlugin;
import com.metamatrix.dqp.exception.SourceWarning;
import com.metamatrix.dqp.message.AtomicRequestID;
@@ -104,7 +103,6 @@
}
this.begin = beginRow;
this.end = endRow;
- Assertion.assertTrue(end - begin >= 0);
this.resultsRequested = true;
}
@@ -427,8 +425,19 @@
try {
if (batch == null || batch.getBeginRow() > this.resultsCursor.begin) {
- batch = this.bufferMgr.pinTupleBatch(resultsID, resultsCursor.begin, resultsCursor.end);
+ batch = this.bufferMgr.pinTupleBatch(resultsID, resultsCursor.begin);
pinned = true;
+ //TODO: support fetching more than 1 batch
+ int count = this.resultsCursor.end - this.resultsCursor.begin + 1;
+ if (batch.getRowCount() > count) {
+ int beginRow = Math.min(this.resultsCursor.begin, batch.getEndRow() - count + 1);
+ int endRow = Math.min(beginRow + count - 1, batch.getEndRow());
+ int firstOffset = beginRow - batch.getBeginRow();
+ List[] memoryRows = batch.getAllTuples();
+ List[] rows = new List[count];
+ System.arraycopy(memoryRows, firstOffset, rows, 0, endRow - beginRow + 1);
+ batch = new TupleBatch(beginRow, rows);
+ }
}
int finalRowCount = doneProducingBatches?this.processor.getHighestRow():-1;
@@ -480,7 +489,7 @@
} finally {
try {
if (pinned) {
- this.bufferMgr.unpinTupleBatch(this.resultsID, batch.getBeginRow(), batch.getEndRow());
+ this.bufferMgr.unpinTupleBatch(this.resultsID, batch.getBeginRow());
}
} catch (Exception e) {
// ignore - nothing more we can do
Modified: trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -22,6 +22,8 @@
package com.metamatrix.common.buffer.impl;
+import static org.junit.Assert.*;
+
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
@@ -32,7 +34,7 @@
import java.util.Properties;
import java.util.Random;
-import junit.framework.TestCase;
+import org.junit.Test;
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
@@ -59,20 +61,12 @@
/**
*/
-public class TestBufferManagerImpl extends TestCase {
+public class TestBufferManagerImpl {
- /**
- * Constructor for TestBufferManagerImpl.
- * @param arg0
- */
- public TestBufferManagerImpl(String arg0) {
- super(arg0);
- }
-
public static BufferManager getTestBufferManager(long bytesAvailable, StorageManager sm2) throws MetaMatrixComponentException {
// Get the properties for BufferManager
Properties bmProps = new Properties();
- bmProps.setProperty(BufferManagerPropertyNames.MEMORY_AVAILABLE, "" + bytesAvailable); //$NON-NLS-1$
+ bmProps.setProperty(BufferManagerPropertyNames.MEMORY_AVAILABLE, String.valueOf(bytesAvailable));
bmProps.setProperty(BufferManagerPropertyNames.MANAGEMENT_INTERVAL, "0"); //$NON-NLS-1$
BufferManager bufferManager = new BufferManagerImpl();
bufferManager.initialize("local", bmProps); //$NON-NLS-1$
@@ -164,14 +158,14 @@
ts.closeSource();
}
- public void testSpanStorage() throws Exception {
+ @Test public void testSpanStorage() throws Exception {
helpTestAddBatches(createFakeDatabaseStorageManager(),
1,
50,
100);
}
- public void testStandalone() throws Exception {
+ @Test public void testStandalone() throws Exception {
helpTestAddBatches(null,
10,
5,
@@ -212,7 +206,7 @@
return subRows;
}
- public void testCreateLobReference() throws Exception {
+ @Test public void testCreateLobReference() throws Exception {
final BufferManagerImpl mgr = (BufferManagerImpl)getTestBufferManager(1, createFakeDatabaseStorageManager());
XMLType xml1 = new XMLType(new SQLXMLImpl("<foo/>")); //$NON-NLS-1$
@@ -231,15 +225,24 @@
// when adding to the tuple source the reference id is assigned
// but not the persistence stream id.
- assertTrue(xml1.getReferenceStreamId() != null);
- assertTrue(xml2.getReferenceStreamId() != null);
- assertTrue(xml1.getPersistenceStreamId() == null);
- assertTrue(xml2.getPersistenceStreamId() == null);
+ assertNotNull(xml1.getReferenceStreamId());
+ assertNotNull(xml2.getReferenceStreamId());
+ assertNull(xml1.getPersistenceStreamId());
+ assertNull(xml2.getPersistenceStreamId());
assertNotNull(mgr.getStreamable(id, xml1.getReferenceStreamId()));
+
+ final TupleSourceID id1 = mgr.createTupleSource(schema, new String[] {DataTypeManager.DefaultDataTypes.XML}, "GROUP1", TupleSourceType.PROCESSOR); //$NON-NLS-1$
+
+ TupleBatch batch1 = new TupleBatch(1, new List[] {xmlList1, xmlList2});
+ mgr.addTupleBatch(id1, batch1);
+
+ //assure that even though we've removed a later tuple source, the reference still exists
+ mgr.removeTupleSource(id1);
+ assertNotNull(mgr.getStreamable(id, xml1.getReferenceStreamId()));
}
- public void testAddStreamablePart() throws Exception {
+ @Test public void testAddStreamablePart() throws Exception {
final BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
// save the lob
@@ -300,7 +303,7 @@
}
- public void testPinning1() throws Exception {
+ @Test public void testPinning1() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
@@ -321,9 +324,9 @@
int readPerBatch = 1000;
for(int i=1; i<maxRows; i=i+readPerBatch) {
int end = i+readPerBatch-1;
- TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i, end);
+ TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i);
helpCompareBatches(exampleBatch(i, end), checkBatch);
- mgr.unpinTupleBatch(tsID, i, end);
+ mgr.unpinTupleBatch(tsID, i);
}
// Remove
@@ -331,7 +334,7 @@
}
- public void testUnpinOfUnpinnedBatch() throws Exception {
+ @Test public void testUnpinOfUnpinnedBatch() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
@@ -352,20 +355,28 @@
int readPerBatch = 100;
for(int i=1; i<maxRows; i=i+readPerBatch) {
int end = i+readPerBatch-1;
- mgr.unpinTupleBatch(tsID, i, end);
+ mgr.unpinTupleBatch(tsID, i);
}
// Walk through by 100's pinning and unpinning
- for(int i=1; i<maxRows; i=i+readPerBatch) {
- int end = i+readPerBatch-1;
- TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i, end);
- helpCompareBatches(exampleBatch(i, end), checkBatch);
- mgr.unpinTupleBatch(tsID, i, end);
- }
+ helpTestPinAndUnpin(mgr, tsID, maxRows, writePerBatch, readPerBatch);
// Remove
mgr.removeTupleSource(tsID);
}
+
+ private void helpTestPinAndUnpin(BufferManager mgr, TupleSourceID tsID,
+ int maxRows, int writePerBatch, int readPerBatch)
+ throws TupleSourceNotFoundException, MemoryNotAvailableException,
+ MetaMatrixComponentException {
+ for(int i=1; i<maxRows; i=i+readPerBatch) {
+ int end = (1+i/writePerBatch)*writePerBatch;
+ int begin = end - writePerBatch + 1;
+ TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i);
+ helpCompareBatches(exampleBatch(begin, end), checkBatch);
+ mgr.unpinTupleBatch(tsID, i);
+ }
+ }
private TupleBatch exampleBigBatch(int begin, int end, int charsPerRow) {
@@ -385,7 +396,7 @@
return new TupleBatch(begin, rows);
}
- public void testDeadlockOnMultiThreadClean() throws Exception {
+ @Test public void testDeadlockOnMultiThreadClean() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
@@ -422,7 +433,7 @@
}
}
- public void testSessionMax_Fail() throws Exception {
+ @Test public void testSessionMax_Fail() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
TupleSourceID tsID = null;
try {
@@ -443,8 +454,8 @@
// Walk through by 1000's pinning and unpinning
int readPerBatch = 1000;
for(int i=1; i<maxRows; i=i+readPerBatch) {
- int end = i+readPerBatch-1;
- TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i, end);
+ int end = (1+i/writePerBatch)*writePerBatch;
+ TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i);
helpCompareBatches(exampleBatch(i, end), checkBatch);
}
fail("Should have failed"); //$NON-NLS-1$
@@ -463,7 +474,7 @@
* @throws Exception
* @since 4.3
*/
- public void testDefect_18499() throws Exception {
+ @Test public void testDefect_18499() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
schema.add("col"); //$NON-NLS-1$
@@ -484,7 +495,7 @@
}
}
- public void testDefect18497() throws Exception {
+ @Test public void testDefect18497() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
TupleSourceID tsID = null;
try {
@@ -504,12 +515,7 @@
// Walk through by 1000's pinning and unpinning
int readPerBatch = 1000;
- for(int i=1; i<maxRows; i=i+readPerBatch) {
- int end = i+readPerBatch-1;
- TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i, end);
- helpCompareBatches(exampleBatch(i, end), checkBatch);
- mgr.unpinTupleBatch(tsID, i, end);
- }
+ helpTestPinAndUnpin(mgr, tsID, maxRows, writePerBatch, readPerBatch);
} finally {
// Remove
mgr.removeTupleSource(tsID);
@@ -518,7 +524,7 @@
}
//two threads do the cleaning at the same time
- public void testDefect19325() throws Exception{
+ @Test public void testDefect19325() throws Exception{
BufferManagerImpl mgr = (BufferManagerImpl)getTestBufferManager(1, createFakeDatabaseStorageManager());
TupleSourceID tsID = null;
List schema = new ArrayList();
@@ -554,7 +560,7 @@
}
//test many small batches
- public void testSmallBatches() throws Exception{
+ @Test public void testSmallBatches() throws Exception{
BufferManager mgr = getTestBufferManager(50, createFakeDatabaseStorageManager());
TupleSourceID tsID = null;
try {
@@ -588,7 +594,7 @@
}
//going backward
- public void testPinning2() throws Exception {
+ @Test public void testPinning2() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
@@ -607,21 +613,11 @@
// Walk through by 100's pinning and unpinning
int readPerBatch = 100;
- for(int i=maxRows; i>=1; i=i-readPerBatch) {
- int end = i-readPerBatch+1;
- TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i, end);
- helpCompareBatches(exampleBatch(end, i), checkBatch);
- mgr.unpinTupleBatch(tsID, end, i);
- }
+ helpTestPinAndUnpin(mgr, tsID, maxRows, writePerBatch, readPerBatch);
// Walk through by 2000's pinning and unpinning
readPerBatch = 2000;
- for(int i=maxRows; i>=1; i=i-readPerBatch) {
- int end = i-readPerBatch+1;
- TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i, end);
- helpCompareBatches(exampleBatch(end + (readPerBatch - writePerBatch), i), checkBatch);
- mgr.unpinTupleBatch(tsID, end + (readPerBatch - writePerBatch), i);
- }
+ helpTestPinAndUnpin(mgr, tsID, maxRows, writePerBatch, readPerBatch);
// Remove
mgr.removeTupleSource(tsID);
@@ -679,8 +675,8 @@
int batch = random.nextInt(batches);
begin = 1 + (batch * rowsPerBatch);
end = begin + rowsPerBatch - 1;
- bufferMgr.pinTupleBatch(tsID, begin, end);
- bufferMgr.unpinTupleBatch(tsID, begin, end);
+ bufferMgr.pinTupleBatch(tsID, begin);
+ bufferMgr.unpinTupleBatch(tsID, begin);
} catch(MemoryNotAvailableException e) {
}
@@ -690,7 +686,7 @@
e.printStackTrace();
} finally {
try {
- bufferMgr.unpinTupleBatch(tsID, begin, end);
+ bufferMgr.unpinTupleBatch(tsID, begin);
} catch(Exception e) {
// ignore
}
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -95,11 +95,10 @@
private boolean wasBlocked;
/**
- * @see com.metamatrix.common.buffer.impl.BufferManagerImpl#pinTupleBatch(com.metamatrix.common.buffer.TupleSourceID, int, int)
+ * @see com.metamatrix.common.buffer.impl.BufferManagerImpl#pinTupleBatch(com.metamatrix.common.buffer.TupleSourceID, int)
*/
public TupleBatch pinTupleBatch(TupleSourceID tupleSourceID,
- int beginRow,
- int maxEndRow) throws TupleSourceNotFoundException,
+ int beginRow) throws TupleSourceNotFoundException,
MemoryNotAvailableException,
MetaMatrixComponentException {
if (blockOn != null && blockOn.contains(new Integer(++pinCount))) {
@@ -109,7 +108,7 @@
wasBlocked = false;
- return super.pinTupleBatch(tupleSourceID, beginRow, maxEndRow);
+ return super.pinTupleBatch(tupleSourceID, beginRow);
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -29,12 +29,15 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import org.teiid.dqp.internal.datamgr.impl.FakeTransactionService;
import org.teiid.dqp.internal.process.DQPCore;
import org.teiid.dqp.internal.process.DQPWorkContext;
import org.teiid.dqp.internal.process.DQPCore.ConnectorCapabilitiesCache;
-import junit.framework.TestCase;
+import static org.junit.Assert.*;
import com.metamatrix.api.exception.query.QueryResolverException;
import com.metamatrix.common.application.ApplicationEnvironment;
@@ -55,16 +58,11 @@
import com.metamatrix.query.unittest.FakeMetadataFactory;
-public class TestDQPCore extends TestCase {
+public class TestDQPCore {
- public TestDQPCore(String name) {
- super(name);
- }
-
private DQPCore core;
- @Override
- protected void setUp() throws Exception {
+ @Before public void setUp() throws Exception {
DQPWorkContext workContext = new DQPWorkContext();
workContext.setVdbName("bqt"); //$NON-NLS-1$
workContext.setVdbVersion("1"); //$NON-NLS-1$
@@ -96,8 +94,7 @@
core.start(new Properties());
}
- @Override
- protected void tearDown() throws Exception {
+ @After public void tearDown() throws Exception {
DQPWorkContext.setWorkContext(new DQPWorkContext());
core.stop();
}
@@ -112,84 +109,84 @@
return msg;
}
- public void testRequest1() throws Exception {
+ @Test public void testRequest1() throws Exception {
helpExecute("SELECT IntKey FROM BQT1.SmallA", "a"); //$NON-NLS-1$ //$NON-NLS-2$
}
- public void testUser1() throws Exception {
+ @Test public void testUser1() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE user() = 'logon'"; //$NON-NLS-1$
String userName = "logon"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser2() throws Exception {
+ @Test public void testUser2() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE user() LIKE 'logon'"; //$NON-NLS-1$
String userName = "logon"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser3() throws Exception {
+ @Test public void testUser3() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE user() IN ('logon3') AND StringKey LIKE '1'"; //$NON-NLS-1$
String userName = "logon3"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser4() throws Exception {
+ @Test public void testUser4() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE 'logon4' = user() AND StringKey = '1'"; //$NON-NLS-1$
String userName = "logon4"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser5() throws Exception {
+ @Test public void testUser5() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE user() IS NULL "; //$NON-NLS-1$
String userName = "logon"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser6() throws Exception {
+ @Test public void testUser6() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE user() = 'logon33' "; //$NON-NLS-1$
String userName = "logon"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser7() throws Exception {
+ @Test public void testUser7() throws Exception {
String sql = "UPDATE BQT1.SmallA SET IntKey = 2 WHERE user() = 'logon' AND StringKey = '1' "; //$NON-NLS-1$
String userName = "logon"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser8() throws Exception {
+ @Test public void testUser8() throws Exception {
String sql = "SELECT user(), StringKey FROM BQT1.SmallA WHERE IntKey = 1 "; //$NON-NLS-1$
String userName = "logon"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser9() throws Exception {
+ @Test public void testUser9() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE user() = StringKey AND StringKey = '1' "; //$NON-NLS-1$
String userName = "1"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testEnvSessionId() throws Exception {
+ @Test public void testEnvSessionId() throws Exception {
String sql = "SELECT env('sessionid') as SessionID"; //$NON-NLS-1$
String userName = "1"; //$NON-NLS-1$
ResultsMessage rm = helpExecute(sql, userName);
assertEquals("1", rm.getResults()[0].get(0)); //$NON-NLS-1$
}
- public void testEnvSessionIdMixedCase() throws Exception {
+ @Test public void testEnvSessionIdMixedCase() throws Exception {
String sql = "SELECT env('sEsSIonId') as SessionID"; //$NON-NLS-1$
String userName = "1"; //$NON-NLS-1$
ResultsMessage rm = helpExecute(sql, userName);
assertEquals("1", rm.getResults()[0].get(0)); //$NON-NLS-1$
}
- public void testTxnAutoWrap() throws Exception {
+ @Test public void testTxnAutoWrap() throws Exception {
String sql = "SELECT * FROM BQT1.SmallA"; //$NON-NLS-1$
helpExecute(sql, "a", 1, true); //$NON-NLS-1$
}
- public void testPlanOnly() throws Exception {
+ @Test public void testPlanOnly() throws Exception {
String sql = "SELECT * FROM BQT1.SmallA option planonly"; //$NON-NLS-1$
helpExecute(sql,"a"); //$NON-NLS-1$
}
@@ -198,7 +195,7 @@
* Tests whether an exception result is sent when an exception occurs
* @since 4.3
*/
- public void testPlanningException() throws Exception {
+ @Test public void testPlanningException() throws Exception {
String sql = "SELECT IntKey FROM BQT1.BadIdea "; //$NON-NLS-1$
RequestMessage reqMsg = exampleRequestMessage(sql);
@@ -211,7 +208,7 @@
}
}
- public void testCapabilitesCache() {
+ @Test public void testCapabilitesCache() {
ConnectorCapabilitiesCache cache = new ConnectorCapabilitiesCache();
DQPWorkContext workContext = new DQPWorkContext();
workContext.setVdbName("foo"); //$NON-NLS-1$
@@ -226,11 +223,11 @@
assertNull(vdbCapabilites.get("model1")); //$NON-NLS-1$
}
- public void testLookupVisibility() throws Exception {
+ @Test public void testLookupVisibility() throws Exception {
helpTestVisibilityFails("select lookup('bqt3.smalla', 'intkey', 'stringkey', '?')"); //$NON-NLS-1$
}
- public void testCancel() throws Exception {
+ @Test public void testCancel() throws Exception {
assertFalse(this.core.cancelRequest(new RequestID(1)));
}
@@ -242,7 +239,7 @@
assertEquals("[QueryValidatorException]Group does not exist: BQT3.SmallA", results.getException().toString()); //$NON-NLS-1$
}
- public void testXQueryVisibility() throws Exception {
+ @Test public void testXQueryVisibility() throws Exception {
String xquery = "<Items>\r\n" + //$NON-NLS-1$
"{\r\n" + //$NON-NLS-1$
"for $x in doc(\"select * from bqt3.smalla\")//Item\r\n" + //$NON-NLS-1$
@@ -268,7 +265,9 @@
Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
ResultsMessage results = message.get(50000, TimeUnit.MILLISECONDS);
- assertNull(results.getException());
+ if (results.getException() != null) {
+ throw results.getException();
+ }
return results;
}
}
15 years, 5 months
teiid SVN: r1238 - trunk/runtime/src/test/resources/dqp.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2009-08-12 19:58:48 -0400 (Wed, 12 Aug 2009)
New Revision: 1238
Modified:
trunk/runtime/src/test/resources/dqp/TestEmpty.vdb
Log:
TEIID-772 the configuration def file was .DEF instead of .def
Modified: trunk/runtime/src/test/resources/dqp/TestEmpty.vdb
===================================================================
(Binary files differ)
15 years, 5 months
teiid SVN: r1237 - in trunk: engine/src/main/java/com/metamatrix/platform/security/api/service and 1 other directories.
by teiid-commits@lists.jboss.org
Author: rareddy
Date: 2009-08-12 18:18:47 -0400 (Wed, 12 Aug 2009)
New Revision: 1237
Modified:
trunk/build/kit-runtime/deploy.properties
trunk/engine/src/main/java/com/metamatrix/platform/security/api/service/SessionServiceInterface.java
trunk/runtime/src/main/java/com/metamatrix/platform/security/session/service/SessionServiceImpl.java
Log:
TEIID-774
Modified: trunk/build/kit-runtime/deploy.properties
===================================================================
--- trunk/build/kit-runtime/deploy.properties 2009-08-12 19:42:56 UTC (rev 1236)
+++ trunk/build/kit-runtime/deploy.properties 2009-08-12 22:18:47 UTC (rev 1237)
@@ -88,8 +88,8 @@
#Maximum number of sessions allowed by the system
session.maxSessions=5000
-#Max allowed time before the session is timed out between two ping calls
-session.timeoutInMilli=1800000
+#Max allowed time before the session is terminated by the system (default unlimited, below value is 24hrs)
+#session.expirationTimeInMilli=86400000
#
# Membership Service Settings (handles the authentication of the user)
Modified: trunk/engine/src/main/java/com/metamatrix/platform/security/api/service/SessionServiceInterface.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/platform/security/api/service/SessionServiceInterface.java 2009-08-12 19:42:56 UTC (rev 1236)
+++ trunk/engine/src/main/java/com/metamatrix/platform/security/api/service/SessionServiceInterface.java 2009-08-12 22:18:47 UTC (rev 1237)
@@ -58,10 +58,10 @@
public static String NAME = "SessionService"; //$NON-NLS-1$
public static final String DEFAULT_MAX_SESSIONS = "5000"; //$NON-NLS-1$
- public static final String DEFAULT_SESSION_TIMEOUT = "600000"; //$NON-NLS-1$
+ public static final String DEFAULT_SESSION_EXPIRATION = "0"; //$NON-NLS-1$
public static final String MAX_SESSIONS = "session.maxSessions"; //$NON-NLS-1$
- public static final String SESSION_TIMEOUT = "session.timeoutInMilli"; //$NON-NLS-1$
+ public static final String SESSION_EXPIRATION = "session.expirationTimeInMilli"; //$NON-NLS-1$
/**
* Create a session for the given user authenticating against the given <code>Credentials</code>.
Modified: trunk/runtime/src/main/java/com/metamatrix/platform/security/session/service/SessionServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/com/metamatrix/platform/security/session/service/SessionServiceImpl.java 2009-08-12 19:42:56 UTC (rev 1236)
+++ trunk/runtime/src/main/java/com/metamatrix/platform/security/session/service/SessionServiceImpl.java 2009-08-12 22:18:47 UTC (rev 1237)
@@ -74,7 +74,7 @@
* Configuration state
*/
private long sessionMaxLimit;
- private long sessionTimeLimit;
+ private long sessionExpirationTimeLimit;
/*
* Injected state
@@ -99,7 +99,7 @@
if (currentTime - info.getLastPingTime() > ServerConnection.PING_INTERVAL * 5) {
LogManager.logInfo(LogConstants.CTX_SESSION, DQPEmbeddedPlugin.Util.getString( "SessionServiceImpl.keepaliveFailed", info.getSessionID())); //$NON-NLS-1$
closeSession(info.getSessionID());
- } else if (sessionTimeLimit > 0 && currentTime - info.getTimeCreated() > sessionTimeLimit) {
+ } else if (sessionExpirationTimeLimit > 0 && currentTime - info.getTimeCreated() > sessionExpirationTimeLimit) {
LogManager.logInfo(LogConstants.CTX_SESSION, DQPEmbeddedPlugin.Util.getString( "SessionServiceImpl.expireSession", info.getSessionID())); //$NON-NLS-1$
closeSession(info.getSessionID());
}
@@ -302,23 +302,7 @@
public void setMembershipService(MembershipServiceInterface membershipService) {
this.membershipService = membershipService;
}
-
- public long getSessionMaxLimit() {
- return sessionMaxLimit;
- }
-
- public void setSessionMaxLimit(long sessionMaxLimit) {
- this.sessionMaxLimit = sessionMaxLimit;
- }
-
- public long getSessionTimeLimit() {
- return sessionTimeLimit;
- }
-
- public void setSessionTimeLimit(long sessionTimeLimit) {
- this.sessionTimeLimit = sessionTimeLimit;
- }
-
+
@Inject
public void setDqpCore(DQPCore dqpCore) {
this.dqpCore = dqpCore;
@@ -327,7 +311,7 @@
@Override
public void initialize(Properties props) throws ApplicationInitializationException {
this.sessionMaxLimit = Long.parseLong(props.getProperty(MAX_SESSIONS, DEFAULT_MAX_SESSIONS));
- this.sessionTimeLimit = Long.parseLong(props.getProperty(SESSION_TIMEOUT, DEFAULT_SESSION_TIMEOUT));
+ this.sessionExpirationTimeLimit = Long.parseLong(props.getProperty(SESSION_EXPIRATION, DEFAULT_SESSION_EXPIRATION));
}
@Override
15 years, 5 months
teiid SVN: r1236 - trunk/runtime/src/test/java/com/metamatrix/dqp/embedded/services.
by teiid-commits@lists.jboss.org
Author: vhalbert(a)redhat.com
Date: 2009-08-12 15:42:56 -0400 (Wed, 12 Aug 2009)
New Revision: 1236
Modified:
trunk/runtime/src/test/java/com/metamatrix/dqp/embedded/services/TestEmbeddedVDBService.java
Log:
TEIID-772 Fixed TestEmbeddedVDBService test to look for TestEmpty.vdb
Modified: trunk/runtime/src/test/java/com/metamatrix/dqp/embedded/services/TestEmbeddedVDBService.java
===================================================================
--- trunk/runtime/src/test/java/com/metamatrix/dqp/embedded/services/TestEmbeddedVDBService.java 2009-08-12 17:39:14 UTC (rev 1235)
+++ trunk/runtime/src/test/java/com/metamatrix/dqp/embedded/services/TestEmbeddedVDBService.java 2009-08-12 19:42:56 UTC (rev 1236)
@@ -199,13 +199,14 @@
configService.addVDB(vdb, true);
assertEquals(3, vdbService.getAvailableVDBs().size());
- vdb = vdbService.getVDB("Empty", "1"); //$NON-NLS-1$ //$NON-NLS-2$
- assertEquals("Empty", vdb.getName()); //$NON-NLS-1$
+
+ vdb = vdbService.getVDB("TestEmpty", "1"); //$NON-NLS-1$ //$NON-NLS-2$
+ assertEquals("TestEmpty", vdb.getName()); //$NON-NLS-1$
assertEquals("1", vdb.getVersion()); //$NON-NLS-1$
// no bindings should be added because this function only worries
// about the vdb not bindings. Adding is for the Data service and the admin API
- assertEquals(3, configService.getConnectorBindings().size());
+ assertEquals(4, configService.getConnectorBindings().size());
}
// when we deploy the already deployed VDB it should take on the next version
15 years, 5 months
teiid SVN: r1235 - in trunk: common-core/src/main/java/com/metamatrix/common/types and 20 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2009-08-12 13:39:14 -0400 (Wed, 12 Aug 2009)
New Revision: 1235
Added:
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerLobChunkStream.java
Removed:
trunk/common-core/src/main/java/com/metamatrix/common/types/InvalidReferenceException.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/LobTupleBatch.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java
trunk/engine/src/main/java/com/metamatrix/common/lob/
Modified:
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMBlob.java
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMClob.java
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMSQLXML.java
trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java
trunk/common-core/src/main/java/com/metamatrix/common/types/ClobType.java
trunk/common-core/src/main/java/com/metamatrix/common/types/Streamable.java
trunk/common-core/src/main/java/com/metamatrix/common/types/XMLType.java
trunk/common-core/src/main/java/com/metamatrix/common/types/basic/ClobToStringTransform.java
trunk/common-core/src/main/java/com/metamatrix/common/types/basic/SQLXMLToStringTransform.java
trunk/common-core/src/main/resources/com/metamatrix/core/i18n.properties
trunk/common-core/src/test/java/com/metamatrix/common/types/TestBlobValue.java
trunk/common-core/src/test/java/com/metamatrix/common/types/TestClobValue.java
trunk/common-core/src/test/java/com/metamatrix/common/types/TestXMLValue.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferStats.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java
trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java
trunk/engine/src/main/java/com/metamatrix/query/processor/dynamic/XMLSource.java
trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLPlan.java
trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java
trunk/engine/src/main/java/com/metamatrix/query/processor/xquery/XQueryPlan.java
trunk/engine/src/main/java/com/metamatrix/query/util/CommandContext.java
trunk/engine/src/main/java/com/metamatrix/query/xquery/saxon/SaxonXQueryExpression.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java
trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
Log:
TEIID-767 TEIID-771 simplifying the lob handling of buffermanager and merging in the memorystate logic. reviewed by RR.
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMBlob.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMBlob.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMBlob.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -55,7 +55,7 @@
*/
public static Blob newInstance(StreamingLobChunckProducer.Factory lobChunckFactory, BlobType blob) {
if (!Boolean.getBoolean(Streamable.FORCE_STREAMING)) {
- Blob sourceBlob = blob.getSourceBlob();
+ Blob sourceBlob = blob.getReference();
if (sourceBlob != null) {
return sourceBlob;
}
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMClob.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMClob.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMClob.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -72,7 +72,7 @@
public static Clob newInstance(StreamingLobChunckProducer.Factory lobChunckFactory, ClobType clob) throws SQLException {
if (!Boolean.getBoolean(Streamable.FORCE_STREAMING)) {
- Clob sourceClob = clob.getSourceClob();
+ Clob sourceClob = clob.getReference();
if (sourceClob != null) {
return sourceClob;
}
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMSQLXML.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMSQLXML.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMSQLXML.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -59,7 +59,7 @@
public static SQLXML newInstance(StreamingLobChunckProducer.Factory lobChunckFactory, XMLType srcXML) throws SQLException {
if (Boolean.getBoolean(Streamable.FORCE_STREAMING)) {
- SQLXML sourceSQLXML = srcXML.getSourceSQLXML();
+ SQLXML sourceSQLXML = srcXML.getReference();
if (sourceSQLXML != null) {
return sourceSQLXML;
}
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -30,20 +30,15 @@
import java.sql.SQLException;
import javax.sql.rowset.serial.SerialBlob;
-import javax.sql.rowset.serial.SerialException;
-import com.metamatrix.core.CorePlugin;
import com.metamatrix.core.MetaMatrixRuntimeException;
-import com.metamatrix.core.util.ArgCheck;
/**
* Represent a value of type "blob", which can be streamable from client
*/
-public final class BlobType implements Streamable, Blob {
+public final class BlobType extends Streamable<Blob> implements Blob {
- private transient Blob srcBlob;
- private String streamId;
- private String persistentId;
+ private static final long serialVersionUID = 1294191629070433450L;
private long length = -1;
/**
@@ -54,8 +49,7 @@
}
public BlobType(Blob blob) {
- ArgCheck.isNotNull(blob);
- this.srcBlob = blob;
+ super(blob);
try {
this.length = blob.length();
} catch (SQLException e) {
@@ -63,52 +57,18 @@
}
}
- public Blob getSourceBlob() {
- return srcBlob;
- }
-
/**
- * @see com.metamatrix.common.types.Streamable#getReferenceStreamId()
- */
- public String getReferenceStreamId() {
- return this.streamId;
- }
-
- /**
- * @see com.metamatrix.common.types.Streamable#setReferenceStreamId(java.lang.String)
- */
- public void setReferenceStreamId(String id) {
- this.streamId = id;
- }
-
- /**
- * @see com.metamatrix.common.types.Streamable#getPersistenceStreamId()
- */
- public String getPersistenceStreamId() {
- return persistentId;
- }
-
- /**
- * @see com.metamatrix.common.types.Streamable#setPersistenceStreamId(java.lang.String)
- */
- public void setPersistenceStreamId(String id) {
- this.persistentId = id;
- }
-
- /**
* @see java.sql.Blob#getBinaryStream()
*/
public InputStream getBinaryStream() throws SQLException {
- checkReference();
- return this.srcBlob.getBinaryStream();
+ return this.reference.getBinaryStream();
}
/**
* @see java.sql.Blob#getBytes(long, int)
*/
public byte[] getBytes(long pos, int length) throws SQLException {
- checkReference();
- return this.srcBlob.getBytes(pos, length);
+ return this.reference.getBytes(pos, length);
}
/**
@@ -121,32 +81,28 @@
}
// if did not find before then do it again.
- checkReference();
- return this.srcBlob.length();
+ return this.reference.length();
}
/**
* @see java.sql.Blob#position(java.sql.Blob, long)
*/
public long position(Blob pattern, long start) throws SQLException {
- checkReference();
- return this.srcBlob.position(pattern, start);
+ return this.reference.position(pattern, start);
}
/**
* @see java.sql.Blob#position(byte[], long)
*/
public long position(byte[] pattern, long start) throws SQLException {
- checkReference();
- return this.srcBlob.position(pattern, start);
+ return this.reference.position(pattern, start);
}
/**
* @see java.sql.Blob#setBinaryStream(long)
*/
public OutputStream setBinaryStream(long pos) throws SQLException {
- checkReference();
- return this.srcBlob.setBinaryStream(pos);
+ return this.reference.setBinaryStream(pos);
}
/**
@@ -157,63 +113,23 @@
byte[] bytes,
int offset,
int len) throws SQLException {
- checkReference();
- return this.srcBlob.setBytes(pos, bytes, offset, len);
+ return this.reference.setBytes(pos, bytes, offset, len);
}
/**
* @see java.sql.Blob#setBytes(long, byte[])
*/
public int setBytes(long pos, byte[] bytes) throws SQLException {
- checkReference();
- return this.srcBlob.setBytes(pos, bytes);
+ return this.reference.setBytes(pos, bytes);
}
/**
* @see java.sql.Blob#truncate(long)
*/
public void truncate(long len) throws SQLException {
- checkReference();
- this.srcBlob.truncate(len);
+ this.reference.truncate(len);
}
- /**
- * @see java.lang.Object#equals(java.lang.Object)
- */
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof BlobType)) {
- return false;
- }
-
- BlobType other = (BlobType)o;
-
- if (this.srcBlob != null) {
- return this.srcBlob.equals(other.srcBlob);
- }
-
- return this.persistentId == other.persistentId
- && this.streamId == other.streamId;
-
- }
-
- /**
- * @see java.lang.Object#toString()
- */
- public String toString() {
- checkReference();
- return srcBlob.toString();
- }
-
- private void checkReference() {
- if (this.srcBlob == null) {
- throw new InvalidReferenceException(CorePlugin.Util.getString("BlobValue.InvalidReference")); //$NON-NLS-1$
- }
- }
-
/**
* Utility Method to convert blob into byte array
* @param blob
@@ -234,14 +150,12 @@
}
//## JDBC4.0-begin ##
public void free() throws SQLException {
- checkReference();
- this.srcBlob.free();
+ this.reference.free();
}
public InputStream getBinaryStream(long pos, long length)
throws SQLException {
- checkReference();
- return this.srcBlob.getBinaryStream(pos, length);
+ return this.reference.getBinaryStream(pos, length);
}
//## JDBC4.0-end ##
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/ClobType.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/ClobType.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/ClobType.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -31,41 +31,27 @@
import java.sql.Clob;
import java.sql.SQLException;
-import javax.sql.rowset.serial.SerialBlob;
import javax.sql.rowset.serial.SerialClob;
-import javax.sql.rowset.serial.SerialException;
-import com.metamatrix.core.CorePlugin;
import com.metamatrix.core.MetaMatrixRuntimeException;
/**
* This is wrapper on top of a "clob" object, which implements the "java.sql.Clob"
* interface. This class also implements the Streamable interface
*/
-public final class ClobType implements Streamable, Clob, Sequencable {
+public final class ClobType extends Streamable<Clob> implements Clob, Sequencable {
- private transient Clob srcClob;
- private String streamId;
- private String persistentId;
+ private static final long serialVersionUID = 2753412502127824104L;
private long length = -1;
/**
* Can't construct
*/
ClobType() {
- super();
}
- public Clob getSourceClob() {
- return this.srcClob;
- }
-
public ClobType(Clob clob) {
- if (clob == null) {
- throw new IllegalArgumentException(CorePlugin.Util.getString("ClobValue.isNUll")); //$NON-NLS-1$
- }
- // this will serve as the in VM reference
- this.srcClob = clob;
+ super(clob);
try {
this.length = clob.length();
@@ -73,57 +59,26 @@
// ignore.
}
}
-
+
/**
- * @see com.metamatrix.common.types.Streamable#getReferenceStreamId()
- */
- public String getReferenceStreamId() {
- return this.streamId;
- }
-
- /**
- * @see com.metamatrix.common.types.Streamable#setReferenceStreamId(java.lang.String)
- */
- public void setReferenceStreamId(String id) {
- this.streamId = id;
- }
-
- /**
- * @see com.metamatrix.common.types.Streamable#getPersistenceStreamId()
- */
- public String getPersistenceStreamId() {
- return persistentId;
- }
-
- /**
- * @see com.metamatrix.common.types.Streamable#setPersistenceStreamId(java.lang.String)
- */
- public void setPersistenceStreamId(String id) {
- this.persistentId = id;
- }
-
- /**
* @see java.sql.Clob#getAsciiStream()
*/
public InputStream getAsciiStream() throws SQLException {
- checkReference();
- return this.srcClob.getAsciiStream();
+ return this.reference.getAsciiStream();
}
/**
* @see java.sql.Clob#getCharacterStream()
*/
public Reader getCharacterStream() throws SQLException {
- checkReference();
- return this.srcClob.getCharacterStream();
+ return this.reference.getCharacterStream();
}
/**
* @see java.sql.Clob#getSubString(long, int)
*/
public String getSubString(long pos, int length) throws SQLException {
- checkReference();
- return this.srcClob.getSubString(pos, length);
+ return this.reference.getSubString(pos, length);
}
/**
@@ -134,40 +89,35 @@
return this.length;
}
- checkReference();
- return this.srcClob.length();
+ return this.reference.length();
}
/**
* @see java.sql.Clob#position(java.sql.Clob, long)
*/
public long position(Clob searchstr, long start) throws SQLException {
- checkReference();
- return this.srcClob.position(searchstr, start);
+ return this.reference.position(searchstr, start);
}
/**
* @see java.sql.Clob#position(java.lang.String, long)
*/
public long position(String searchstr, long start) throws SQLException {
- checkReference();
- return this.srcClob.position(searchstr, start);
+ return this.reference.position(searchstr, start);
}
/**
* @see java.sql.Clob#setAsciiStream(long)
*/
public OutputStream setAsciiStream(long pos) throws SQLException {
- checkReference();
- return this.srcClob.setAsciiStream(pos);
+ return this.reference.setAsciiStream(pos);
}
/**
* @see java.sql.Clob#setCharacterStream(long)
*/
public Writer setCharacterStream(long pos) throws SQLException {
- checkReference();
- return this.srcClob.setCharacterStream(pos);
+ return this.reference.setCharacterStream(pos);
}
/**
@@ -177,62 +127,23 @@
String str,
int offset,
int len) throws SQLException {
- checkReference();
- return this.srcClob.setString(pos, str, offset, len);
+ return this.reference.setString(pos, str, offset, len);
}
/**
* @see java.sql.Clob#setString(long, java.lang.String)
*/
public int setString(long pos, String str) throws SQLException {
- checkReference();
- return this.srcClob.setString(pos, str);
+ return this.reference.setString(pos, str);
}
/**
* @see java.sql.Clob#truncate(long)
*/
public void truncate(long len) throws SQLException {
- checkReference();
- this.srcClob.truncate(len);
+ this.reference.truncate(len);
}
- /**
- * @see java.lang.Object#equals(java.lang.Object)
- */
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof ClobType)) {
- return false;
- }
-
- ClobType other = (ClobType)o;
-
- if (this.srcClob != null) {
- return this.srcClob.equals(other.srcClob);
- }
-
- return this.persistentId == other.persistentId
- && this.streamId == other.streamId;
- }
-
- /**
- * @see java.lang.Object#toString()
- */
- public String toString() {
- checkReference();
- return srcClob.toString();
- }
-
- private void checkReference() {
- if (this.srcClob == null) {
- throw new InvalidReferenceException(CorePlugin.Util.getString("ClobValue.InvalidReference")); //$NON-NLS-1$
- }
- }
-
/**
* Utility method to convert to String
* @param clob
@@ -255,7 +166,6 @@
private final static int CHAR_SEQUENCE_BUFFER_SIZE = 2 << 12;
public CharSequence getCharSequence() {
- checkReference();
return new CharSequence() {
private String buffer;
@@ -299,13 +209,11 @@
}
//## JDBC4.0-begin ##
public void free() throws SQLException {
- checkReference();
- this.srcClob.free();
+ this.reference.free();
}
public Reader getCharacterStream(long pos, long length) throws SQLException {
- checkReference();
- return this.srcClob.getCharacterStream(pos, length);
+ return this.reference.getCharacterStream(pos, length);
}
//## JDBC4.0-end ##
Deleted: trunk/common-core/src/main/java/com/metamatrix/common/types/InvalidReferenceException.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/InvalidReferenceException.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/InvalidReferenceException.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -1,34 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.types;
-
-
-/**
- * A exception class to define that a invalid reference has been accessed.
- */
-public class InvalidReferenceException extends RuntimeException {
- public InvalidReferenceException() {}
- public InvalidReferenceException(String msg){
- super(msg);
- }
-}
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/Streamable.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/Streamable.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/Streamable.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -23,8 +23,11 @@
package com.metamatrix.common.types;
import java.io.Serializable;
+import java.lang.ref.PhantomReference;
+import com.metamatrix.core.CorePlugin;
+
/**
* A large value object which can be streamable in chunks of data each time
*
@@ -38,31 +41,71 @@
* the process worker to in case the reference object has lost its state and we
* need to reinsate the object from the disk.
*/
-public interface Streamable extends Serializable {
- static final String FORCE_STREAMING = "FORCE_STREAMING"; //$NON-NLS-1$
+public abstract class Streamable<T> implements Serializable {
+ public static final String FORCE_STREAMING = "FORCE_STREAMING"; //$NON-NLS-1$
public static final int STREAMING_BATCH_SIZE_IN_BYTES = 102400; // 100K
+
+ private String referenceStreamId;
+ private String persistenceStreamId;
+ protected transient T reference;
- /**
- * Reference Stream ID in the server
- * @return string - this is buffer managers tuple source id.
- */
- String getReferenceStreamId();
+ public Streamable() {
+
+ }
- /**
- * Reference Stream ID in the server
- * @param id this is buffer managers tuple source id.
- */
- void setReferenceStreamId(String id);
+ public Streamable(T reference) {
+ if (reference == null) {
+ throw new IllegalArgumentException(CorePlugin.Util.getString("Streamable.isNUll")); //$NON-NLS-1$
+ }
+
+ this.reference = reference;
+ }
- /**
- * Persitence Stream ID in the server
- * @return string - this is buffer managers tuple source id.
- */
- String getPersistenceStreamId();
+ public T getReference() {
+ return reference;
+ }
- /**
- * Persitence Stream ID in the server
- * @param id this is buffer managers tuple source id.
- */
- void setPersistenceStreamId(String id);
+ public void setReference(T reference) {
+ this.reference = reference;
+ }
+
+ public String getReferenceStreamId() {
+ return this.referenceStreamId;
+ }
+
+ public void setReferenceStreamId(String id) {
+ this.referenceStreamId = id;
+ }
+
+ public String getPersistenceStreamId() {
+ return persistenceStreamId;
+ }
+
+ public void setPersistenceStreamId(String id) {
+ this.persistenceStreamId = id;
+ }
+
+ @Override
+ public String toString() {
+ return reference.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Streamable<?>)) {
+ return false;
+ }
+ Streamable<?> other = (Streamable<?>)obj;
+
+ if (this.reference != null) {
+ return this.reference.equals(other.reference);
+ }
+
+ return this.persistenceStreamId == other.persistenceStreamId
+ && this.referenceStreamId == other.referenceStreamId;
+ }
+
}
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/XMLType.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/XMLType.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/XMLType.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -38,125 +38,57 @@
import javax.xml.transform.Result;
import javax.xml.transform.Source;
-import com.metamatrix.core.CorePlugin;
-
/**
* This class represents the SQLXML object along with the Streamable interface. This is
* class used everywhere in the MetaMatrix framework, but clients are restricted to use
* only SQLXML interface on top of this.
*/
-public final class XMLType implements Streamable, SQLXML {
+public final class XMLType extends Streamable<SQLXML> implements SQLXML {
- private transient SQLXML srcXML;
- private String referenceStreamId;
- private String persistenceStreamId;
+ private static final long serialVersionUID = -7922647237095135723L;
public XMLType(){
}
- public SQLXML getSourceSQLXML() {
- return srcXML;
- }
-
public XMLType(SQLXML xml) {
- if (xml == null) {
- throw new IllegalArgumentException(CorePlugin.Util.getString("XMLValue.isNUll")); //$NON-NLS-1$
- }
-
- // this will serve as the in VM reference
- this.srcXML = xml;
+ super(xml);
}
-
- public String getReferenceStreamId() {
- return this.referenceStreamId;
- }
-
- public void setReferenceStreamId(String id) {
- this.referenceStreamId = id;
- }
-
- public String getPersistenceStreamId() {
- return persistenceStreamId;
- }
-
- public void setPersistenceStreamId(String id) {
- this.persistenceStreamId = id;
- }
-
+
public InputStream getBinaryStream() throws SQLException {
- checkReference();
- return this.srcXML.getBinaryStream();
+ return this.reference.getBinaryStream();
}
public Reader getCharacterStream() throws SQLException {
- checkReference();
- return this.srcXML.getCharacterStream();
+ return this.reference.getCharacterStream();
}
public <T extends Source> T getSource(Class<T> sourceClass) throws SQLException {
- checkReference();
- return this.srcXML.getSource(sourceClass);
+ return this.reference.getSource(sourceClass);
}
public String getString() throws SQLException {
- checkReference();
- return this.srcXML.getString();
+ return this.reference.getString();
}
public OutputStream setBinaryStream() throws SQLException {
- checkReference();
- return this.srcXML.setBinaryStream();
+ return this.reference.setBinaryStream();
}
public Writer setCharacterStream() throws SQLException {
- checkReference();
- return this.srcXML.setCharacterStream();
+ return this.reference.setCharacterStream();
}
public void setString(String value) throws SQLException {
- checkReference();
- this.srcXML.setString(value);
+ this.reference.setString(value);
}
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof XMLType)) {
- return false;
- }
-
- XMLType other = (XMLType)o;
-
- if (this.srcXML != null) {
- return this.srcXML.equals(other.srcXML);
- }
-
- return this.persistenceStreamId == other.persistenceStreamId
- && this.referenceStreamId == other.referenceStreamId;
- }
-
- public String toString() {
- checkReference();
- return srcXML.toString();
- }
-
- private void checkReference() {
- if (this.srcXML == null) {
- throw new InvalidReferenceException(CorePlugin.Util.getString("XMLValue.InvalidReference")); //$NON-NLS-1$
- }
- }
-
public void free() throws SQLException {
- checkReference();
- this.srcXML.free();
+ this.reference.free();
}
public <T extends Result> T setResult(Class<T> resultClass)
throws SQLException {
- checkReference();
- return this.srcXML.setResult(resultClass);
+ return this.reference.setResult(resultClass);
}
}
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/basic/ClobToStringTransform.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/basic/ClobToStringTransform.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/basic/ClobToStringTransform.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -28,7 +28,6 @@
import com.metamatrix.common.types.ClobType;
import com.metamatrix.common.types.DataTypeManager;
-import com.metamatrix.common.types.InvalidReferenceException;
import com.metamatrix.common.types.TransformationException;
import com.metamatrix.core.CorePlugin;
@@ -65,9 +64,7 @@
throw new TransformationException(e, CorePlugin.Util.getString("failed_convert", new Object[] {getSourceType().getName(), getTargetType().getName()})); //$NON-NLS-1$
} catch(IOException e) {
throw new TransformationException(e, CorePlugin.Util.getString("failed_convert", new Object[] {getSourceType().getName(), getTargetType().getName()})); //$NON-NLS-1$
- } catch(InvalidReferenceException e) {
- throw new TransformationException(e, CorePlugin.Util.getString("remote_lob_access")); //$NON-NLS-1$
- }
+ }
}
/**
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/basic/SQLXMLToStringTransform.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/basic/SQLXMLToStringTransform.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/basic/SQLXMLToStringTransform.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -26,7 +26,6 @@
import java.sql.SQLException;
import com.metamatrix.common.types.DataTypeManager;
-import com.metamatrix.common.types.InvalidReferenceException;
import com.metamatrix.common.types.TransformationException;
import com.metamatrix.common.types.XMLType;
import com.metamatrix.core.CorePlugin;
@@ -54,8 +53,6 @@
return new String(result, 0, read);
} catch (SQLException e) {
throw new TransformationException(e, CorePlugin.Util.getString("failed_convert", new Object[] {getSourceType().getName(), getTargetType().getName()})); //$NON-NLS-1$
- } catch(InvalidReferenceException e) {
- throw new TransformationException(e, CorePlugin.Util.getString("remote_lob_access")); //$NON-NLS-1$
} catch (IOException e) {
throw new TransformationException(e, CorePlugin.Util.getString("failed_convert", new Object[] {getSourceType().getName(), getTargetType().getName()})); //$NON-NLS-1$
}
Modified: trunk/common-core/src/main/resources/com/metamatrix/core/i18n.properties
===================================================================
--- trunk/common-core/src/main/resources/com/metamatrix/core/i18n.properties 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/resources/com/metamatrix/core/i18n.properties 2009-08-12 17:39:14 UTC (rev 1235)
@@ -325,12 +325,8 @@
BlobImpl.Invalid_start_position=The position to begin searching, "{0}", is not valid.
-BlobValue.isNUll=Blob object argument can not be null
-BlobValue.InvalidReference=Blob Contents are not available, use the Streaming interface to get the contents.
-ClobValue.isNUll=Clob object argument can not be null
-ClobValue.InvalidReference=Clob Contents are not available, use the Streaming interface to get the contents.
-XMLValue.InvalidReference=XML Contents are not available, use the Streaming interface to get the contents.
-XMLValue.isNUll=SQLXML object argument can not be null
+Streamable.isNUll=Streamable object argument can not be null
+Streamable.InvalidReference=Streamable contents are not available, use the Streaming interface to get the contents.
WorkerPool.New_thread=Created worker thread "{0}".
WorkerPool.uncaughtException=Uncaught exception processing work
Modified: trunk/common-core/src/test/java/com/metamatrix/common/types/TestBlobValue.java
===================================================================
--- trunk/common-core/src/test/java/com/metamatrix/common/types/TestBlobValue.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/test/java/com/metamatrix/common/types/TestBlobValue.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -68,12 +68,7 @@
assertEquals(key, read.getReferenceStreamId());
// and lost the original object
- try {
- read.getBinaryStream();
- fail("this must thrown a reference stream exception"); //$NON-NLS-1$
- } catch (InvalidReferenceException e) {
- // pass
- }
+ assertNull(read.getReference());
saved.delete();
}
Modified: trunk/common-core/src/test/java/com/metamatrix/common/types/TestClobValue.java
===================================================================
--- trunk/common-core/src/test/java/com/metamatrix/common/types/TestClobValue.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/test/java/com/metamatrix/common/types/TestClobValue.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -68,12 +68,7 @@
assertEquals(key, read.getReferenceStreamId());
// and lost the original object
- try {
- read.getCharacterStream();
- fail("this must thrown a reference stream exception"); //$NON-NLS-1$
- } catch (InvalidReferenceException e) {
- // pass
- }
+ assertNull(read.getReference());
saved.delete();
}
Modified: trunk/common-core/src/test/java/com/metamatrix/common/types/TestXMLValue.java
===================================================================
--- trunk/common-core/src/test/java/com/metamatrix/common/types/TestXMLValue.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/test/java/com/metamatrix/common/types/TestXMLValue.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -69,12 +69,7 @@
assertEquals(pkey, read.getPersistenceStreamId());
// and lost the original object
- try {
- read.getCharacterStream();
- fail("this must thrown a reference stream exception"); //$NON-NLS-1$
- } catch (InvalidReferenceException e) {
- // pass
- }
+ assertNull(read.getReference());
saved.delete();
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -27,6 +27,7 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.lob.LobChunk;
+import com.metamatrix.common.types.Streamable;
/**
* The buffer manager controls how memory is used and how data flows through
@@ -268,4 +269,23 @@
* to ensure that the memory can be freed.
*/
void releasePinnedBatches() throws MetaMatrixComponentException;
+
+ /**
+ * Return the LOB associated with the referenceId
+ * @param id
+ * @param referenceId
+ * @return
+ * @throws TupleSourceNotFoundException
+ * @throws MetaMatrixComponentException
+ */
+ public Streamable<?> getStreamable(TupleSourceID id, String referenceId)
+ throws TupleSourceNotFoundException, MetaMatrixComponentException;
+
+ /**
+ * Assign the tuplesource as the persistent stream for the streamable
+ * @param id
+ * @param s
+ * @throws TupleSourceNotFoundException
+ */
+ public void setPersistentTupleSource(TupleSourceID id, Streamable<? extends Object> s) throws TupleSourceNotFoundException;
}
Copied: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerLobChunkStream.java (from rev 1231, trunk/engine/src/main/java/com/metamatrix/common/lob/BufferManagerLobChunkStream.java)
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerLobChunkStream.java (rev 0)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerLobChunkStream.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.buffer;
+
+
+import java.io.IOException;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.lob.LobChunk;
+import com.metamatrix.common.lob.LobChunkProducer;
+import com.metamatrix.common.log.LogManager;
+import com.metamatrix.dqp.DQPPlugin;
+import com.metamatrix.dqp.util.LogConstants;
+
+public class BufferManagerLobChunkStream implements LobChunkProducer {
+ private TupleSourceID sourceId;
+ private BufferManager bufferMgr;
+ private int position = 0;
+
+ public BufferManagerLobChunkStream(String persitentId, BufferManager bufferMgr) {
+ this.sourceId = new TupleSourceID(persitentId);
+ this.bufferMgr = bufferMgr;
+ }
+
+ public LobChunk getNextChunk() throws IOException {
+ try {
+ this.position++;
+ return bufferMgr.getStreamablePart(sourceId, position);
+ } catch (TupleSourceNotFoundException e) {
+ String msg = DQPPlugin.Util.getString("BufferManagerLobChunkStream.no_tuple_source", new Object[] {sourceId}); //$NON-NLS-1$
+ LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, msg);
+ throw new IOException(msg);
+ } catch (MetaMatrixComponentException e) {
+ String msg = DQPPlugin.Util.getString("BufferManagerLobChunkStream.error_processing", new Object[] {sourceId}); //$NON-NLS-1$
+ LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, msg);
+ throw new IOException(msg);
+ }
+ }
+
+ /**
+ * @see com.metamatrix.common.lob.LobChunkProducer#close()
+ */
+ public void close() throws IOException {
+ // we could remove the buffer tuple here but, this is just a stream, so we need to delete
+ // that when we close th eplan.
+ }
+}
\ No newline at end of file
Deleted: trunk/engine/src/main/java/com/metamatrix/common/buffer/LobTupleBatch.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/LobTupleBatch.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/LobTupleBatch.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer;
-
-import java.util.List;
-
-
-
-/**
- * A marker class file holding the lob based data in a separate batch holder.
- */
-public class LobTupleBatch extends TupleBatch {
-
- public LobTupleBatch() {
- }
-
- public LobTupleBatch(int beginRow, List listOfTupleLists) {
- super(beginRow, listOfTupleLists);
- }
-}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -25,7 +25,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -39,7 +41,6 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.IndexedTupleSource;
-import com.metamatrix.common.buffer.LobTupleBatch;
import com.metamatrix.common.buffer.MemoryNotAvailableException;
import com.metamatrix.common.buffer.StorageManager;
import com.metamatrix.common.buffer.TupleBatch;
@@ -50,23 +51,28 @@
import com.metamatrix.common.types.Streamable;
import com.metamatrix.core.log.MessageLevel;
import com.metamatrix.core.util.Assertion;
+import com.metamatrix.dqp.DQPPlugin;
import com.metamatrix.dqp.util.LogConstants;
import com.metamatrix.query.execution.QueryExecPlugin;
/**
* <p>Default implementation of BufferManager. This buffer manager implementation
* assumes the usage of a StorageManager of type memory and optionally (preferred)
- * an additional StorageManager of type FILE or DISK. If no persistent manager
- * is specified, everything managed by this BufferManager is assumed to fit in
- * memory. This can be useful for testing or for small uses.</p>
- *
- * <p>Lots of state is cached in memory. The tupleSourceMap contains a map of
- * TupleSourceID --> TupleSourceInfo. Everything about a particular tuple
- * source is stored there. The memoryState contains everything pertaining to
- * memory management. The config contains all config info.</p>
+ * an additional StorageManager of type FILE.</p>
*/
public class BufferManagerImpl implements BufferManager {
-
+
+ //memory availability when reserveMemory() is called
+ static final int MEMORY_AVAILABLE = 1;
+ static final int MEMORY_EXCEED_MAX = 2; //exceed buffer manager max memory
+ static final int MEMORY_EXCEED_SESSION_MAX = 3; //exceed session max memory
+
+ private static ThreadLocal<Set<ManagedBatch>> PINNED_BY_THREAD = new ThreadLocal<Set<ManagedBatch>>() {
+ protected Set<ManagedBatch> initialValue() {
+ return new HashSet<ManagedBatch>();
+ };
+ };
+
// Initialized stuff
private String lookup;
private BufferConfig config;
@@ -76,15 +82,18 @@
// groupName (String) -> TupleGroupInfo map
private Map<String, TupleGroupInfo> groupInfos = new HashMap<String, TupleGroupInfo>();
- // Storage managers
+ // Storage manager
private StorageManager diskMgr;
// ID creator
private AtomicLong currentTuple = new AtomicLong(0);
- // Memory management
- private MemoryState memoryState;
-
+ // Keep track of how memory usage
+ private volatile long memoryUsed = 0;
+
+ // Track the currently unpinned stuff in a sorted set
+ private Set<ManagedBatch> unpinned = Collections.synchronizedSet(new LinkedHashSet<ManagedBatch>());
+
// Trigger to handle management and stats logging
private Timer timer;
@@ -94,6 +103,7 @@
private AtomicInteger pinnedFromDisk = new AtomicInteger(0);
private AtomicInteger cleanings = new AtomicInteger(0);
private AtomicLong totalCleaned = new AtomicLong(0);
+ private AtomicInteger pinned = new AtomicInteger();
/**
* See {@link com.metamatrix.common.buffer.BufferManagerPropertyNames} for a
@@ -108,15 +118,13 @@
// Set up config based on properties
this.config = new BufferConfig(properties);
- // Set up memory state object
- this.memoryState = new MemoryState(config);
-
// Set up alarms based on config
if(this.config.getManagementInterval() > 0) {
TimerTask mgmtTask = new TimerTask() {
public void run() {
- clean(0);
+ clean(0, null);
}
+
};
getTimer().schedule(mgmtTask, 0, this.config.getManagementInterval());
}
@@ -156,7 +164,8 @@
BufferStats stats = new BufferStats();
// Get memory info
- this.memoryState.fillStats(stats);
+ stats.memoryUsed = this.memoryUsed;
+ stats.memoryFree = config.getTotalAvailableMemory() - memoryUsed;
// Get picture of what's happening
Set<TupleSourceID> copyKeys = tupleSourceMap.keySet();
@@ -245,11 +254,8 @@
TupleSourceID newID = new TupleSourceID(String.valueOf(this.currentTuple.getAndIncrement()), this.lookup);
TupleGroupInfo tupleGroupInfo = getGroupInfo(groupName);
TupleSourceInfo info = new TupleSourceInfo(newID, schema, types, tupleGroupInfo, tupleSourceType);
- synchronized (tupleGroupInfo) {
- tupleGroupInfo.getTupleSourceIDs().add(newID);
- }
+ tupleGroupInfo.getTupleSourceIDs().add(newID);
tupleSourceMap.put(newID, info);
-
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, new Object[]{"Creating TupleSource:", newID, "of type "+tupleSourceType}); //$NON-NLS-1$ //$NON-NLS-2$
}
@@ -289,31 +295,25 @@
ManagedBatch batch = iter.next();
switch(batch.getLocation()) {
case ManagedBatch.UNPINNED:
- memoryState.removeUnpinned(batch);
- memoryState.releaseMemory(batch.getSize(), info.getGroupInfo());
+ this.unpinned.remove(batch);
+ releaseMemory(batch.getSize(), info.getGroupInfo());
break;
case ManagedBatch.PINNED:
- memoryState.removePinned(info.getTupleSourceID(), batch.getBeginRow());
- memoryState.releaseMemory(batch.getSize(), info.getGroupInfo());
+ PINNED_BY_THREAD.get().remove(batch);
+ this.pinned.getAndDecrement();
+ releaseMemory(batch.getSize(), info.getGroupInfo());
break;
}
}
}
}
TupleGroupInfo tupleGroupInfo = info.getGroupInfo();
- synchronized (tupleGroupInfo) {
- tupleGroupInfo.getTupleSourceIDs().remove(tupleSourceID);
- }
+ tupleGroupInfo.getTupleSourceIDs().remove(tupleSourceID);
// Remove disk storage
if (this.diskMgr != null){
this.diskMgr.removeBatches(tupleSourceID);
}
-
- // remove any dependent tuple sources on this tuple source
- // lob frame work uses the parent tuple's name as group name to
- // tie it back to the original source.
- removeTupleSources(tupleSourceID.getStringID());
}
/**
@@ -338,28 +338,25 @@
return;
}
List<TupleSourceID> tupleSourceIDs = null;
- synchronized (tupleGroupInfo) {
+ synchronized (tupleGroupInfo.getTupleSourceIDs()) {
tupleSourceIDs = new ArrayList<TupleSourceID>(tupleGroupInfo.getTupleSourceIDs());
}
- // Remove them
- if(tupleSourceIDs.size() > 0) {
- MetaMatrixComponentException ex = null;
+ MetaMatrixComponentException ex = null;
- for (TupleSourceID tsID : tupleSourceIDs) {
- try {
- this.removeTupleSource(tsID);
- } catch(TupleSourceNotFoundException e) {
- // ignore and go on
- } catch(MetaMatrixComponentException e) {
- if(ex == null) {
- ex = e;
- }
- }
- }
+ for (TupleSourceID tsID : tupleSourceIDs) {
+ try {
+ this.removeTupleSource(tsID);
+ } catch(TupleSourceNotFoundException e) {
+ // ignore and go on
+ } catch(MetaMatrixComponentException e) {
+ if(ex == null) {
+ ex = e;
+ }
+ }
+ }
- if(ex != null) {
- throw ex;
- }
+ if(ex != null) {
+ throw ex;
}
}
@@ -466,7 +463,7 @@
// if there are lobs in source then we need to keep manage then
// in a separate tuple sources.
if (info.lobsInSource()) {
- createTupleSourcesForLobs(tupleSourceID, tupleBatch);
+ correctLobReferences(info, tupleBatch);
}
// Determine where to store
@@ -474,13 +471,13 @@
tupleBatch.setSize(bytes);
short location = ManagedBatch.PERSISTENT;
- if(memoryState.reserveMemory(bytes, info.getGroupInfo()) == MemoryState.MEMORY_AVAILABLE) {
+ if(reserveMemory(bytes, info.getGroupInfo()) == BufferManagerImpl.MEMORY_AVAILABLE) {
location = ManagedBatch.UNPINNED;
}
synchronized(info) {
if(info.isRemoved()) {
- memoryState.releaseMemory(bytes, info.getGroupInfo());
+ releaseMemory(bytes, info.getGroupInfo());
throw new TupleSourceNotFoundException(QueryExecPlugin.Util.getString("BufferManagerImpl.tuple_source_not_found", tupleSourceID)); //$NON-NLS-1$
}
@@ -498,14 +495,14 @@
} catch(MetaMatrixComponentException e) {
// If we were storing to memory, clean up memory we reserved
if(location != ManagedBatch.PERSISTENT) {
- memoryState.releaseMemory(bytes, info.getGroupInfo());
+ releaseMemory(bytes, info.getGroupInfo());
}
throw e;
}
// Add to memory state if in memory
if(location == ManagedBatch.UNPINNED) {
- this.memoryState.addUnpinned(managedBatch);
+ this.unpinned.add(managedBatch);
}
// Update info with new rows
@@ -533,18 +530,39 @@
this.pinRequests.incrementAndGet();
- TupleBatch memoryBatch = null;
+ TupleSourceInfo info = getTupleSourceInfo(tupleSourceID, true);
+ int memoryAvailability = BufferManagerImpl.MEMORY_AVAILABLE;
+
+ ManagedBatch mbatch = null;
+ synchronized (info) {
+ mbatch = info.getBatch(beginRow);
+ }
+ if(mbatch == null) {
+ return new TupleBatch(beginRow, Collections.EMPTY_LIST);
+ }
+
int endRow = 0;
- int count = 0;
int pass = 0;
+ //if the client request previous batch, the end row
+ //is smaller than the begin row
+ if(beginRow > maxEndRow) {
+ endRow = Math.min(beginRow, mbatch.getEndRow());
+ beginRow = Math.max(maxEndRow, mbatch.getBeginRow());
+ }else {
+ endRow = Math.min(maxEndRow, mbatch.getEndRow());
+ }
+ int count = endRow - beginRow + 1;
+ if (count == 0) {
+ return new TupleBatch(beginRow, Collections.EMPTY_LIST);
+ }
+ long memoryRequiredByBatch = mbatch.getSize();
- TupleSourceInfo info = getTupleSourceInfo(tupleSourceID, true);
- long memoryRequiredByBatch = 0;
- int memoryAvailability = MemoryState.MEMORY_AVAILABLE;
+ TupleBatch memoryBatch = null;
+
while(pass < 2) {
if(pass == 1) {
- if(memoryAvailability == MemoryState.MEMORY_EXCEED_MAX) {
- clean(memoryRequiredByBatch);
+ if(memoryAvailability == BufferManagerImpl.MEMORY_EXCEED_MAX) {
+ clean(memoryRequiredByBatch, null);
}else {
//exceed session limit
clean(memoryRequiredByBatch, info.getGroupInfo());
@@ -552,28 +570,23 @@
}
synchronized(info) {
- ManagedBatch mbatch = info.getBatch(beginRow);
- if(mbatch == null) {
- return new TupleBatch(beginRow, Collections.EMPTY_LIST);
-
- } else if(mbatch.getLocation() == ManagedBatch.PINNED) {
+ if(mbatch.getLocation() == ManagedBatch.PINNED) {
// Load batch from memory - already pinned
memoryBatch = mbatch.getBatch();
} else if(mbatch.getLocation() == ManagedBatch.UNPINNED) {
// Already in memory - just move from unpinned to pinned
- mbatch.setLocation(ManagedBatch.PINNED);
- this.memoryState.removeUnpinned(mbatch);
- this.memoryState.addPinned(mbatch);
+ this.unpinned.remove(mbatch);
+ pin(mbatch);
// Load batch from memory
memoryBatch = mbatch.getBatch();
- } else if(mbatch.getLocation() == ManagedBatch.PERSISTENT) {
+ } else {
memoryRequiredByBatch = mbatch.getSize();
// Try to reserve some memory
- if((memoryAvailability = memoryState.reserveMemory(memoryRequiredByBatch, info.getGroupInfo())) != MemoryState.MEMORY_AVAILABLE) {
+ if((memoryAvailability = reserveMemory(memoryRequiredByBatch, info.getGroupInfo())) != BufferManagerImpl.MEMORY_AVAILABLE) {
if(pass == 0) {
// Break and try to clean - it is important to break out of the synchronized block
// here so that the clean does not cause a deadlock on this TupleSourceInfo
@@ -595,22 +608,11 @@
memoryBatch = diskMgr.getBatch(tupleSourceID, internalBeginRow, info.getTypes());
mbatch.setBatch(memoryBatch);
- mbatch.setLocation(ManagedBatch.PINNED);
- this.memoryState.addPinned(mbatch);
+ if (info.lobsInSource()) {
+ correctLobReferences(info, memoryBatch);
+ }
+ pin(mbatch);
}
-
- //if the client request previous batch, the end row
- //is smaller than the begin row
- if(beginRow > maxEndRow) {
- endRow = Math.min(beginRow, memoryBatch.getEndRow());
- beginRow = Math.max(maxEndRow, memoryBatch.getBeginRow());
- }else {
- endRow = Math.min(maxEndRow, memoryBatch.getEndRow());
- }
- count = endRow - beginRow + 1;
- if(count > 0) {
- mbatch.pin();
- }
}
break;
@@ -618,7 +620,7 @@
// Batch should now be pinned in memory, so grab it and build a correctly
// sized batch to return
- if(memoryBatch.getRowCount() == 0 || count == 0 || (beginRow == memoryBatch.getBeginRow() && count == memoryBatch.getRowCount())) {
+ if(beginRow == memoryBatch.getBeginRow() && count == memoryBatch.getRowCount()) {
return memoryBatch;
}
@@ -628,6 +630,15 @@
System.arraycopy(memoryRows, firstOffset, rows, 0, count);
return new TupleBatch(beginRow, rows);
}
+
+ private void pin(ManagedBatch mbatch) {
+ mbatch.setLocation(ManagedBatch.PINNED);
+ PINNED_BY_THREAD.get().add(mbatch);
+ if (!mbatch.hasPinnedRows()) {
+ pinned.getAndIncrement();
+ }
+ mbatch.pin();
+ }
/**
* Unpin a tuple source batch.
@@ -655,8 +666,9 @@
// Determine whether batch itself should be unpinned
if(! mbatch.hasPinnedRows()) {
mbatch.setLocation(ManagedBatch.UNPINNED);
- memoryState.removePinned(tupleSourceID, mbatch.getBeginRow());
- memoryState.addUnpinned(mbatch);
+ PINNED_BY_THREAD.get().remove(mbatch);
+ this.unpinned.add(mbatch);
+ pinned.getAndDecrement();
}
}
}
@@ -698,83 +710,71 @@
}
return info;
}
-
+
/**
- * Clean the memory state, using LRU. This can be done either via the background
- * cleaning thread or actively if someone wants memory and none is free.
+ * This can be done actively if someone wants memory and none is free.
*/
- protected void clean(long memoryRequired) {
- // Defect 14573 - this method needs to know how much memory is required, so that (even if we're not past the active memory
- // threshold) if the memory available is less than the memory required, we should clean up unpinned batches.
+ protected void clean(long memoryRequired, TupleGroupInfo targetGroupInfo) {
+ cleanLobTupleSource();
+
+ long released = 0;
+
long targetLevel = config.getActiveMemoryLevel();
long totalMemory = config.getTotalAvailableMemory();
- long released = 0;
-
- Iterator<ManagedBatch> unpinnedIter = this.memoryState.getAllUnpinned();
- while(unpinnedIter.hasNext() && // If there are unpinned batches in memory, AND
- // Defect 14573 - if we require more than what's available, then cleanup regardless of the threshold
- (memoryRequired > totalMemory - memoryState.getMemoryUsed() || // if the memory needed is more than what's available, or
- memoryState.getMemoryUsed() > targetLevel)){ // if we've crossed the active memory threshold, then cleanup
-
- ManagedBatch batch = unpinnedIter.next();
+
+ boolean generalCleaningDone = false;
+ List<ManagedBatch> toClean = null;
+ synchronized (unpinned) {
+ //TODO: re-implement without having to scan and compete
+ toClean = new ArrayList<ManagedBatch>(unpinned);
+ }
+ for (ManagedBatch batch : toClean) {
TupleSourceID tsID = batch.getTupleSourceID();
-
- released += releaseMemory(batch, tsID);
- }
-
- if(released > 0) {
- this.cleanings.incrementAndGet();
- this.totalCleaned.addAndGet(released);
- }
-
- }
-
- /**
- * Over memory limit for this session. Clean the memory for this session.
- * Clean the memory state, using LRU. This can be done actively if someone wants memory and none is free.
- */
- protected void clean(long memoryRequired, TupleGroupInfo targetGroupInfo) throws TupleSourceNotFoundException{
- boolean cleanForSessionSucceeded = false;
- long released = 0;
-
- Iterator<ManagedBatch> unpinnedIter = this.memoryState.getAllUnpinned();
- while(unpinnedIter.hasNext()) {
- ManagedBatch batch = unpinnedIter.next();
- TupleSourceID tsID = batch.getTupleSourceID();
- TupleSourceInfo tsInfo = getTupleSourceInfo(tsID, false);
+ TupleSourceInfo tsInfo = this.tupleSourceMap.get(tsID);
if(tsInfo == null) {
//may be removed by another thread
continue;
}
- if(!tsInfo.getGroupInfo().equals(targetGroupInfo)) {
- //continue if they are not the same tuple group
- continue;
+
+ long currentMemoryUsed = memoryUsed;
+ if (!generalCleaningDone && (memoryRequired <= totalMemory - currentMemoryUsed && // if the memory needed is more than what's available, or
+ currentMemoryUsed <= targetLevel)) { // if we've crossed the active memory threshold, then cleanup
+ generalCleaningDone = true;
}
+ if (generalCleaningDone) {
+ if (targetGroupInfo == null) {
+ break;
+ }
+ if (targetGroupInfo == tsInfo.getGroupInfo()) {
+ //if the memory needed is more than what is available for the session, then cleanup. Otherwise, break the loop.
+ if(memoryRequired <= config.getMaxAvailableSession() - targetGroupInfo.getGroupMemoryUsed()) {
+ break;
+ }
+ }
+ }
- long groupMemoryUsed = memoryState.getGroupMemoryUsed(targetGroupInfo);
- //if the memory needed is more than what is available for the session, then cleanup. Otherwise, break the loop.
- if(memoryRequired <= config.getMaxAvailableSession() - groupMemoryUsed) {
- cleanForSessionSucceeded = true;
- break;
- }
-
- released += releaseMemory(batch, tsID);
+ released += releaseMemory(batch, tsInfo);
}
if(released > 0) {
this.cleanings.incrementAndGet();
this.totalCleaned.addAndGet(released);
}
-
- if(!cleanForSessionSucceeded) {
- //if we cannot clean enough memory for this session, it fails
- return;
+ }
+
+ //TODO: run asynch
+ private void cleanLobTupleSource() {
+ String tupleSourceId = TupleSourceInfo.getStaleLobTupleSource();
+ if (tupleSourceId != null) {
+ try {
+ removeTupleSource(new TupleSourceID(tupleSourceId));
+ } catch (TupleSourceNotFoundException e) {
+ } catch (MetaMatrixComponentException e) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Exception removing stale lob tuple source"); //$NON-NLS-1$
+ }
}
+ }
- //make sure it is not over the buffer manager memory limit
- clean(memoryRequired);
- }
-
/**
* Release the memory for the given unpinned batch.
* @param batch Batch to be released from memory
@@ -782,48 +782,32 @@
* @return The size of memory released in bytes
* @since 4.3
*/
- private long releaseMemory(ManagedBatch batch, TupleSourceID tsID) {
+ private long releaseMemory(ManagedBatch batch, TupleSourceInfo info) {
// Find info and lock on it
- try {
- TupleSourceInfo info = getTupleSourceInfo(tsID, false);
- if(info == null) {
+ synchronized(info) {
+ if(info.isRemoved() || batch.getLocation() != ManagedBatch.UNPINNED) {
return 0;
}
- synchronized(info) {
- if(info.isRemoved()) {
- return 0;
- }
+ // This batch is still unpinned - move to persistent storage
+ TupleBatch dataBatch = batch.getBatch();
- // Re-get the batch and check that it still exists and is unpinned
- batch = info.getBatch(batch.getBeginRow());
- if(batch == null || batch.getLocation() != ManagedBatch.UNPINNED) {
- return 0;
- }
+ try {
+ diskMgr.addBatch(info.getTupleSourceID(), dataBatch, info.getTypes());
+ } catch(MetaMatrixComponentException e) {
+ // Can't move
+ return 0;
+ }
- // This batch is still unpinned - move to persistent storage
- TupleBatch dataBatch = batch.getBatch();
+ batch.setBatch(null);
- try {
- diskMgr.addBatch(tsID, dataBatch, info.getTypes());
- } catch(MetaMatrixComponentException e) {
- // Can't move
- return 0;
- }
+ // Update memory
+ batch.setLocation(ManagedBatch.PERSISTENT);
+ this.unpinned.remove(batch);
+ releaseMemory(batch.getSize(), info.getGroupInfo());
- batch.setBatch(null);
-
- // Update memory
- batch.setLocation(ManagedBatch.PERSISTENT);
- memoryState.removeUnpinned(batch);
- memoryState.releaseMemory(batch.getSize(), info.getGroupInfo());
-
- return batch.getSize();
- }
- } catch(TupleSourceNotFoundException e) {
- // ignore, go to next batch
- return 0;
- }
+ return batch.getSize();
+ }
}
/**
@@ -855,114 +839,49 @@
}
/**
- * If a tuple batch is being added with Lobs, then maintain the LOB
- * objects in a separate TupleSource than from the original, so that
- * the original can only serilize the id, but the otherone can serialize
- * the contents.
+ * If a tuple batch is being added with Lobs, then references to
+ * the lobs will be held on the {@link TupleSourceInfo}
* @param batch
*/
- private void createTupleSourcesForLobs(TupleSourceID parentId, TupleBatch batch)
- throws MetaMatrixComponentException, TupleSourceNotFoundException {
-
- TupleSourceInfo info = getTupleSourceInfo(parentId, false);
+ @SuppressWarnings("unchecked")
+ private void correctLobReferences(TupleSourceInfo info, TupleBatch batch) {
List parentSchema = info.getTupleSchema();
List[] rows = batch.getAllTuples();
-
+ int columns = parentSchema.size();
// walk through the results and find all the lobs
for (int row = 0; row < rows.length; row++) {
-
- int col = 0;
- for (Iterator i = rows[row].iterator(); i.hasNext();) {
- Object anObj = i.next();
+ for (int col = 0; col < columns; col++) {
+ Object anObj = rows[row].get(col);
- if (anObj instanceof Streamable) {
- // once lob is found check to see if this has already been assigned
- // a streming id or not; if one is not assigned create one and assign it
- // to the lob; if one is already assigned just return;
- // this will prohibit calling lob on itself into this routine.
- Streamable lob = (Streamable)anObj;
-
- if (lob.getReferenceStreamId() == null || lobIsNotKnownInTupleSourceMap( lob, parentId) ) {
- List schema = new ArrayList();
- schema.add(parentSchema.get(col));
-
- TupleSourceID id = createTupleSource(schema, new String[] {info.getTypes()[col]}, parentId.getStringID(), TupleSourceType.PROCESSOR);
- lob.setReferenceStreamId(id.getStringID());
-
- List results = new ArrayList();
- results.add(lob);
-
- List listOfRows = new ArrayList();
- listOfRows.add(results);
-
- // these batches are wrapped in a special marker batch tag
- // which are saved from forcing them to disk.
- LobTupleBatch separateBatch = new LobTupleBatch(1, listOfRows);
- separateBatch.setTerminationFlag(true);
-
- // now save this as separate tuple source.
- addTupleBatch(id, separateBatch);
- } else {
- // this means the XML object being moved from one tuple to another tuple
- // i.e. one plan to another plan. So update the group info.
-
- // First update the reference tuple source.
- if (!lob.getReferenceStreamId().equals(parentId.getStringID())) {
- TupleGroupInfo groupInfo = getGroupInfo(parentId.getStringID());
- TupleSourceID id = new TupleSourceID(lob.getReferenceStreamId());
- TupleSourceInfo lobInfo = getTupleSourceInfo(id, false);
- reassignGroup(groupInfo, lobInfo);
-
- // if the lob moving parent has a assosiated persistent
- // tuple source, then move that one to same parent too.
- if (lob.getPersistenceStreamId() != null) {
- id = new TupleSourceID(lob.getPersistenceStreamId());
- lobInfo = getTupleSourceInfo(id, false);
- reassignGroup(groupInfo, lobInfo);
- }
- }
- }
+ if (!(anObj instanceof Streamable<?>)) {
+ continue;
}
- col++;
+ Streamable lob = (Streamable)anObj;
+ info.addLobReference(lob);
+ if (lob.getReference() == null) {
+ lob.setReference(info.getLobReference(lob.getReferenceStreamId()).getReference());
+ }
}
}
}
-
- private void reassignGroup(TupleGroupInfo groupInfo, TupleSourceInfo lobInfo) {
- TupleGroupInfo tupleGroupInfo = lobInfo.getGroupInfo();
- synchronized (tupleGroupInfo) {
- tupleGroupInfo.getTupleSourceIDs().remove(lobInfo.getTupleSourceID());
- }
- lobInfo.setGroupInfo(groupInfo);
- synchronized (groupInfo) {
- groupInfo.getTupleSourceIDs().add(lobInfo.getTupleSourceID());
- }
- }
- private boolean lobIsNotKnownInTupleSourceMap( Streamable lob, TupleSourceID parentId) throws TupleSourceNotFoundException {
- /*
- * The need for this defensive feature arises because there are multiple uses of the TupleSourceMap which
- * are somewhat inconsistent with one another. In the case of LOBs we use the parent/child group feature
- * of tuplesources to associate a parent tuplesource containing metadata about the LOB with a second
- * tuplesource that contains the LOB. When such a group is no longer needed (for example, see SubqueryProcessorUtility.close()),
- * removing the child tupleSources has the unfortunate side effect of leaving the actual LOBs with references to
- * tuplesources that no longer exist, and are therefore no longer in the tupleSourceMap.
- *
- * This test ensures that such orphaned LOBs will be treated correctly (TEIID-54).
- *
- */
- if (!lob.getReferenceStreamId().equals(parentId.getStringID())) {
- TupleSourceID id = new TupleSourceID(lob.getReferenceStreamId());
- TupleSourceInfo lobInfo = getTupleSourceInfo(id, false);
-
- if ( lobInfo == null ) {
- return true; // is not known
- }
- return false; // is known
- }
- return false; // don't care if known
+ @Override
+ public Streamable<?> getStreamable(TupleSourceID id, String referenceId) throws TupleSourceNotFoundException, MetaMatrixComponentException {
+ TupleSourceInfo tsInfo = getTupleSourceInfo(id, true);
+ Streamable<?> s = tsInfo.getLobReference(referenceId);
+ if (s == null) {
+ throw new MetaMatrixComponentException(DQPPlugin.Util.getString("ProcessWorker.wrongdata")); //$NON-NLS-1$
+ }
+ return s;
}
+ @Override
+ public void setPersistentTupleSource(TupleSourceID id, Streamable<?> s) throws TupleSourceNotFoundException {
+ cleanLobTupleSource();
+ TupleSourceInfo tsInfo = getTupleSourceInfo(id, true);
+ s.setPersistenceStreamId(id.getStringID());
+ tsInfo.setContainingLobReference(s);
+ }
/**
* @see com.metamatrix.common.buffer.BufferManager#addStreamablePart(com.metamatrix.common.buffer.TupleSourceID, com.metamatrix.common.lob.LobChunk, int)
@@ -976,7 +895,7 @@
synchronized(info) {
- List data = new ArrayList();
+ List<LobChunk> data = new ArrayList<LobChunk>();
data.add(streamChunk);
TupleBatch batch = new TupleBatch(beginRow, new List[] {data});
this.diskMgr.addBatch(tupleSourceID, batch, info.getTypes());
@@ -1008,44 +927,62 @@
* @see com.metamatrix.common.buffer.BufferManager#releasePinnedBatches()
*/
public void releasePinnedBatches() throws MetaMatrixComponentException {
- Map<TupleSourceID, Map<Integer, ManagedBatch>> threadPinned = memoryState.getPinnedByCurrentThread();
- if (threadPinned == null) {
- return;
- }
- for (Iterator<Map.Entry<TupleSourceID, Map<Integer, ManagedBatch>>> i = threadPinned.entrySet().iterator(); i.hasNext();) {
- Map.Entry<TupleSourceID, Map<Integer, ManagedBatch>> entry = i.next();
- i.remove();
- TupleSourceID tsid = entry.getKey();
- Map<Integer, ManagedBatch> pinnedBatches = entry.getValue();
- try {
- for (Iterator<ManagedBatch> j = pinnedBatches.values().iterator(); j.hasNext();) {
- ManagedBatch batch = j.next();
-
- //TODO: add trace logging about the batch that is being unpinned
- unpinTupleBatch(tsid, batch.getBeginRow(), batch.getEndRow());
- }
- } catch (TupleSourceNotFoundException err) {
- continue;
- }
- }
+ MetaMatrixComponentException e = null;
+ List<ManagedBatch> pinnedByThread = new ArrayList<ManagedBatch>(PINNED_BY_THREAD.get());
+ for (ManagedBatch managedBatch : pinnedByThread) {
+ try {
+ //TODO: add trace logging about the batch that is being unpinned
+ unpinTupleBatch(managedBatch.getTupleSourceID(), managedBatch.getBeginRow(), managedBatch.getEndRow());
+ } catch (TupleSourceNotFoundException err) {
+ } catch (MetaMatrixComponentException err) {
+ e = err;
+ }
+ }
+ if (e != null) {
+ throw e;
+ }
}
/**
- * for testing purposes
+ * Check for whether the specified amount of memory can be reserved,
+ * and if so reserve it. This is done in the same method so that the
+ * memory is not taken away by a different thread between checking and
+ * reserving - standard "test and set" behavior.
+ * @param bytes Bytes requested
+ * @return One of MEMORY_AVAILABLE, MEMORY_EXCEED_MAX, or MEMORY_EXCEED_SESSION_MAX
*/
- public int getPinnedCount() {
- Map<TupleSourceID, Map<Integer, ManagedBatch>> pinned = memoryState.getAllPinned();
-
- int count = 0;
-
- if (pinned == null) {
- return count;
+ private synchronized int reserveMemory(long bytes, TupleGroupInfo groupInfo) {
+ //check session limit first
+ long sessionMax = config.getMaxAvailableSession();
+ if(sessionMax - groupInfo.getGroupMemoryUsed() < bytes) {
+ return BufferManagerImpl.MEMORY_EXCEED_SESSION_MAX;
}
- for (Iterator<Map<Integer, ManagedBatch>> i = pinned.values().iterator(); i.hasNext();) {
- count += i.next().size();
+ //then check the total memory limit
+ long max = config.getTotalAvailableMemory();
+ if(max - memoryUsed < bytes) {
+ return BufferManagerImpl.MEMORY_EXCEED_MAX;
}
- return count;
+ groupInfo.reserveMemory(bytes);
+ memoryUsed += bytes;
+
+ return BufferManagerImpl.MEMORY_AVAILABLE;
}
+
+ /**
+ * Release memory
+ * @param bytes Bytes to release
+ */
+ private synchronized void releaseMemory(long bytes, TupleGroupInfo groupInfo) {
+ groupInfo.releaseMemory(bytes);
+ memoryUsed -= bytes;
+ }
+
+ /**
+ * for testing purposes
+ */
+ public int getPinnedCount() {
+ return pinned.get();
+ }
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferStats.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferStats.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferStats.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -54,7 +54,7 @@
public long totalCleaned;
// Pinned batch details
- public List pinnedManagedBatches = new LinkedList();
+ public List<ManagedBatch> pinnedManagedBatches = new LinkedList<ManagedBatch>();
/**
* Constructor for BufferStats.
@@ -90,7 +90,7 @@
LogManager.logInfo(LogConstants.CTX_BUFFER_MGR, " avgCleaned = " + avgCleaned); //$NON-NLS-1$
if ( LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE) ) {
- HashMap stackTraces = new HashMap();
+ HashMap<List<String>, Integer> stackTraces = new HashMap<List<String>, Integer>();
if ( pinnedManagedBatches.isEmpty() ) {
return;
@@ -102,12 +102,10 @@
int stackNumber = 1;
// pinned batch details
- Iterator it = pinnedManagedBatches.iterator();
- while ( it.hasNext() ) {
- ManagedBatch batch = (ManagedBatch)it.next();
+ for (ManagedBatch batch : pinnedManagedBatches) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, " TupleSourceID: " + batch.getTupleSourceID() + " Begin: " + batch.getBeginRow() + " End: " + batch.getEndRow()); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
- Integer stackKey = (Integer)stackTraces.get(batch.getCallStack());
+ Integer stackKey = stackTraces.get(batch.getCallStack());
boolean isFirst = false;
@@ -119,7 +117,7 @@
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, " Pinned at: " + batch.getCallStackTimeStamp() + " by call# " + stackKey); //$NON-NLS-1$ //$NON-NLS-2$
if (isFirst) {
- for (Iterator j = batch.getCallStack().iterator(); j.hasNext();) {
+ for (Iterator<String> j = batch.getCallStack().iterator(); j.hasNext();) {
LogManager.logTrace( LogConstants.CTX_BUFFER_MGR, " " + j.next() ); //$NON-NLS-1$
}
}
Deleted: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -1,260 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer.impl;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.metamatrix.common.buffer.TupleSourceID;
-import com.metamatrix.common.log.LogManager;
-import com.metamatrix.core.log.MessageLevel;
-import com.metamatrix.dqp.util.LogConstants;
-
-/**
- * <p>This class represents the memory state of the BufferManagerImpl. The
- * critical thing to know is what batches are in memory but not being
- * used and what batches in memory but being used. The access patterns
- * for these two types of information are very different, so they are stored
- * in very different data structures.</p>
- *
- * <p>The unpinned batches are stored in a linked list, ordered by a least
- * recently used timestamp. This is optimized for cleanup. It's not very good
- * for finding stuff but fortunately, we don't need to most of the time.</p>
- *
- * <p>The pinned batches are stored in a Map, keyed by TupleSourceID. The value
- * is another map, keyed by beginRow, with a ManagedBatch as the value. This
- * is really good for finding batches quickly, which is exactly what we need
- * to do with pinned batches.</p>
- *
- * <p>All methods on this class are synchronized to preserve state.</p>
- */
-class MemoryState {
-
- private static ThreadLocal<Map<TupleSourceID, Map<Integer, ManagedBatch>>> PINNED_BY_THREAD = new ThreadLocal<Map<TupleSourceID, Map<Integer, ManagedBatch>>>();
-
- //memory availability when reserveMemory() is called
- static final int MEMORY_AVAILABLE = 1;
- static final int MEMORY_EXCEED_MAX = 2; //exceed buffer manager max memory
- static final int MEMORY_EXCEED_SESSION_MAX = 3; //exceed session max memory
-
- // Configuration, used to get available memory info
- private BufferConfig config;
-
- // Keep track of how memory we are using
- private volatile long memoryUsed = 0;
-
- // Track the currently pinned stuff by TupleSourceID for easy lookup
- private Map<TupleSourceID, Map<Integer, ManagedBatch>> pinned = new HashMap<TupleSourceID, Map<Integer, ManagedBatch>>();
-
- // Track the currently unpinned stuff in a sorted list, sorted by access time
- private Set<ManagedBatch> unpinned = Collections.synchronizedSet(new LinkedHashSet<ManagedBatch>());
-
- /**
- * Constructor for MemoryState, based on config.
- * @param config Configuration
- */
- public MemoryState(BufferConfig config) {
- this.config = config;
- }
-
- /**
- * Fill the stats object with stats about memory
- * @param stats Stats info to be filled
- */
- public void fillStats(BufferStats stats) {
- stats.memoryUsed = this.memoryUsed;
- stats.memoryFree = config.getTotalAvailableMemory() - memoryUsed;
- }
-
- /**
- * Get the amount of memory currently being used in bytes
- * @return Used memory, in bytes
- */
- public long getMemoryUsed() {
- return this.memoryUsed;
- }
-
- /**
- * Check for whether the specified amount of memory can be reserved,
- * and if so reserve it. This is done in the same method so that the
- * memory is not taken away by a different thread between checking and
- * reserving - standard "test and set" behavior.
- * @param bytes Bytes requested
- * @return One of MEMORY_AVAILABLE, MEMORY_EXCEED_MAX, or MEMORY_EXCEED_SESSION_MAX
- */
- public synchronized int reserveMemory(long bytes, TupleGroupInfo groupInfo) {
- //check session limit first
- long sessionMax = config.getMaxAvailableSession();
- if(sessionMax - groupInfo.getGroupMemoryUsed() < bytes) {
- return MEMORY_EXCEED_SESSION_MAX;
- }
-
- //then check the total memory limit
- long max = config.getTotalAvailableMemory();
- if(max - memoryUsed < bytes) {
- return MEMORY_EXCEED_MAX;
- }
-
- /* NOTE1
- * Since the groupInfo call is being made in the synchronized block for the entire buffer,
- * groupInfo doesn't need additional locking.
- */
- groupInfo.reserveMemory(bytes);
- memoryUsed += bytes;
-
- return MEMORY_AVAILABLE;
- }
-
- /**
- * Release memory
- * @param bytes Bytes to release
- */
- public synchronized void releaseMemory(long bytes, TupleGroupInfo groupInfo) {
- // see NOTE1
- groupInfo.releaseMemory(bytes);
- memoryUsed -= bytes;
- }
-
- /**
- * Get the amount of memory currently being used for the specified group in bytes
- * @param groupInfo TupleGroupInfo
- * @return Used memory, in bytes
- */
- public synchronized long getGroupMemoryUsed(TupleGroupInfo groupInfo) {
- // see NOTE1
- return groupInfo.getGroupMemoryUsed();
- }
-
- /**
- * Add a pinned batch
- * @param batch Pinned batch to add
- */
- public void addPinned(ManagedBatch batch) {
- synchronized (this) {
- addPinnedInternal(pinned, batch);
- }
- Map<TupleSourceID, Map<Integer, ManagedBatch>> theadPinned = PINNED_BY_THREAD.get();
- if (theadPinned == null) {
- theadPinned = new HashMap<TupleSourceID, Map<Integer, ManagedBatch>>();
- PINNED_BY_THREAD.set(theadPinned);
- }
- addPinnedInternal(theadPinned, batch);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE )) {
- batch.captureCallStack();
- }
- }
-
- private void addPinnedInternal(Map<TupleSourceID, Map<Integer, ManagedBatch>> pinnedMap, ManagedBatch batch) {
- TupleSourceID tsID = batch.getTupleSourceID();
- Map<Integer, ManagedBatch> tsPinned = pinnedMap.get(tsID);
- if(tsPinned == null) {
- tsPinned = new HashMap<Integer, ManagedBatch>();
- pinnedMap.put(tsID, tsPinned);
- }
-
- // Add batch, indexed by beginRow
- tsPinned.put(new Integer(batch.getBeginRow()), batch);
- }
-
- /**
- * Remove a pinned batch, if not found do nothing and return null
- * @param tsID Tuple source id
- * @param beginRow Beginning row
- * @return Removed batch or null if not found
- */
- public ManagedBatch removePinned(TupleSourceID tsID, int beginRow) {
- ManagedBatch result = null;
- synchronized (this) {
- result = removePinnedInternal(pinned, tsID, beginRow);
- }
- if (result != null) {
- Map<TupleSourceID, Map<Integer, ManagedBatch>> theadPinned = PINNED_BY_THREAD.get();
- if (theadPinned != null) {
- removePinnedInternal(theadPinned, tsID, beginRow);
- }
- }
- return result;
- }
-
- private ManagedBatch removePinnedInternal(Map<TupleSourceID, Map<Integer, ManagedBatch>> pinnedMap, TupleSourceID tsID,
- int beginRow) {
- Map<Integer, ManagedBatch> tsPinned = pinnedMap.get(tsID);
- if(tsPinned != null) {
- ManagedBatch mbatch = tsPinned.remove(new Integer(beginRow));
-
- if(tsPinned.size() == 0) {
- pinnedMap.remove(tsID);
- }
-
- return mbatch;
- }
- return null;
- }
-
- /**
- * Add an unpinned batch
- * @param batch Unpinned batch to add
- */
- public void addUnpinned(ManagedBatch batch) {
- unpinned.add(batch);
- }
-
- /**
- * Remove an unpinned batch
- * @param batch Batch to remove
- */
- public void removeUnpinned(ManagedBatch batch) {
- unpinned.remove(batch);
- }
-
- /**
- * Get an iterator on all unpinned batches, typically for clean up
- * purposes. This iterator is "safe" in that it is based on a copy
- * of the real list and will not be invalidated by changes to the original
- * list. However, this means that it also may contain batches that
- * are no longer in the unpinned list, so the user of this iterator
- * should check that each batch is still unpinned.
- * @return Safe (but possibly out of date) iterator on unpinned batches
- */
- public Iterator<ManagedBatch> getAllUnpinned() {
- synchronized (unpinned) {
- List<ManagedBatch> copy = new ArrayList<ManagedBatch>(unpinned);
- return copy.iterator();
- }
- }
-
- public synchronized Map<TupleSourceID, Map<Integer, ManagedBatch>> getAllPinned() {
- return new HashMap<TupleSourceID, Map<Integer, ManagedBatch>>(pinned);
- }
-
- public Map<TupleSourceID, Map<Integer, ManagedBatch>> getPinnedByCurrentThread() {
- return PINNED_BY_THREAD.get();
- }
-
-}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -22,6 +22,7 @@
package com.metamatrix.common.buffer.impl;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -38,8 +39,8 @@
private String groupName;
/** The bytes of memory used by this tuple group*/
- private long memoryUsed;
- private Set<TupleSourceID> tupleSourceIDs = new HashSet<TupleSourceID>();
+ private volatile long memoryUsed;
+ private Set<TupleSourceID> tupleSourceIDs = Collections.synchronizedSet(new HashSet<TupleSourceID>());
TupleGroupInfo(String groupName) {
this.groupName = groupName;
@@ -54,17 +55,14 @@
}
long getGroupMemoryUsed() {
- // no locking required. See MemoryState.NOTE1
return memoryUsed;
}
long reserveMemory(long bytes) {
- // no locking required. See MemoryState.NOTE1
return memoryUsed += bytes;
}
long releaseMemory(long bytes) {
- // no locking required. See MemoryState.NOTE1
return memoryUsed -= bytes;
}
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -22,30 +22,55 @@
package com.metamatrix.common.buffer.impl;
-import java.util.*;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
import com.metamatrix.common.buffer.TupleSourceID;
import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
import com.metamatrix.common.types.DataTypeManager;
+import com.metamatrix.common.types.Streamable;
/**
* Describe a TupleSource and all important information about it.
*/
public class TupleSourceInfo {
+ private static final AtomicLong LOB_ID = new AtomicLong();
+ private static final ReferenceQueue<Streamable<?>> LOB_QUEUE = new ReferenceQueue<Streamable<?>>();
+
+ private static class LobReference extends PhantomReference<Streamable<?>> {
+
+ String persistentStreamId;
+
+ public LobReference(Streamable<?> lob) {
+ super(lob, LOB_QUEUE);
+ this.persistentStreamId = lob.getPersistenceStreamId();
+ }
+ }
+
private TupleSourceType type; // Type of TupleSource, as defined in BufferManager constants
private TupleSourceID tsID;
private List schema;
private String[] types;
private int rowCount;
- private TupleSourceStatus status;
+ private TupleSourceStatus status = TupleSourceStatus.ACTIVE;
private TupleGroupInfo groupInfo;
private boolean removed = false;
private TreeMap<Integer, ManagedBatch> batches = new TreeMap<Integer, ManagedBatch>();
-
+ private Map<String, Streamable<?>> lobReferences; //references to contained lobs
private boolean lobs;
+ @SuppressWarnings("unused")
+ private LobReference containingLobReference; //reference to containing lob
+
/**
* Construct a TupleSourceInfo given information about it.
* @param tsID Identifier
@@ -58,16 +83,51 @@
this.schema = schema;
this.types = types;
this.groupInfo = groupInfo;
- this.status = TupleSourceStatus.ACTIVE;
- this.rowCount = 0;
this.type = type;
this.lobs = checkForLobs();
}
+ public void setContainingLobReference(Streamable<?> s) {
+ this.containingLobReference = new LobReference(s);
+ }
+
+ public void addLobReference(Streamable<Object> lob) {
+ String id = lob.getReferenceStreamId();
+ if (id == null) {
+ id = String.valueOf(LOB_ID.getAndIncrement());
+ lob.setReferenceStreamId(id);
+ }
+ if (this.lobReferences == null) {
+ this.lobReferences = Collections.synchronizedMap(new HashMap<String, Streamable<?>>());
+ }
+ this.lobReferences.put(id, lob);
+ }
+
+ public static String getStaleLobTupleSource() {
+ LobReference ref = (LobReference)LOB_QUEUE.poll();
+ if (ref == null) {
+ return null;
+ }
+ return ref.persistentStreamId;
+ }
+
+ public Streamable<?> getLobReference(String id) {
+ if (this.lobReferences == null) {
+ return null;
+ }
+ return this.lobReferences.get(id);
+ }
+
public void addBatch(ManagedBatch batch) {
batches.put(batch.getBeginRow(), batch);
}
+ /**
+ * Returns the batch containing the begin row or null
+ * if it doesn't exist
+ * @param beginRow
+ * @return
+ */
public ManagedBatch getBatch(int beginRow) {
Map.Entry<Integer, ManagedBatch> entry = batches.floorEntry(beginRow);
if (entry != null && entry.getValue().getEndRow() >= beginRow) {
@@ -184,19 +244,16 @@
return "TupleSourceInfo[" + this.tsID + "]"; //$NON-NLS-1$ //$NON-NLS-2$
}
-
private boolean checkForLobs() {
- boolean lob = false;
- if (types != null) {
- for (int i = 0; i < types.length; i++) {
- lob |= DataTypeManager.isLOB(types[i]);
+ if (types == null) {
+ // assume the worst
+ return true;
+ }
+ for (int i = 0; i < types.length; i++) {
+ if (DataTypeManager.isLOB(types[i]) || types[i] == DataTypeManager.DefaultDataTypes.OBJECT) {
+ return true;
}
}
- else {
- // if incase the user did not specify the types, then make
- // them walk the batch; pay the penalty of performence
- return true;
- }
- return lob;
+ return false;
}
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -41,7 +41,6 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.buffer.BufferManagerPropertyNames;
-import com.metamatrix.common.buffer.LobTupleBatch;
import com.metamatrix.common.buffer.StorageManager;
import com.metamatrix.common.buffer.TupleBatch;
import com.metamatrix.common.buffer.TupleSourceID;
@@ -172,16 +171,6 @@
public void addBatch(TupleSourceID sourceID, TupleBatch batch, String[] types)
throws MetaMatrixComponentException {
- /* Right now we do not support the saving of the lobs to the disk.
- * by throwing an exception the memory is never released for lobs, which is same
- * as keeping them in a map. This is not going to be memory hog because, the actual
- * lob (clob or blob) are backed by connector, xml is backed by already persisted
- * tuple source. Here we are only saving the referenes to the actual objects.
- */
- if (batch instanceof LobTupleBatch) {
- throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString("FileStorageManager.can_not_save_lobs")); //$NON-NLS-1$
- }
-
// Defect 13342 - addBatch method now creates spill files if the total bytes exceeds the max file size limit
TupleSourceInfo tsInfo = getTupleSourceInfo(sourceID, true);
synchronized (tsInfo) {
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -181,7 +181,10 @@
}
throw new MetaMatrixComponentException(e);
} finally {
- bufferMgr.releasePinnedBatches();
+ //iff this is the root command context release any pinned (Unclosed tuplesources)
+ if (this.context.getParent() == null) {
+ bufferMgr.releasePinnedBatches();
+ }
}
if(done || requestClosed) {
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/dynamic/XMLSource.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/dynamic/XMLSource.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/dynamic/XMLSource.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -24,7 +24,6 @@
import java.io.StringReader;
import java.util.List;
-import java.util.Properties;
import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;
@@ -32,10 +31,7 @@
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.TupleSource;
-import com.metamatrix.common.buffer.TupleSourceID;
-import com.metamatrix.common.types.InvalidReferenceException;
import com.metamatrix.common.types.XMLType;
-import com.metamatrix.query.processor.xml.XMLUtil;
/**
@@ -59,12 +55,7 @@
// as processing excceptions.
if (value instanceof XMLType) {
XMLType xml = (XMLType)value;
- try {
- return xml.getSource(null);
- } catch (InvalidReferenceException e) {
- xml = XMLUtil.getFromBufferManager(bufferMgr, new TupleSourceID(xml.getPersistenceStreamId()), new Properties());
- return xml.getSource(null);
- }
+ return xml.getSource(null);
}
return new StreamSource(new StringReader((String)value));
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -561,7 +561,6 @@
String rsKey = rsName.toUpperCase();
CursorState state = this.cursorStates.remove(rsKey);
if(state != null) {
- state.ts.closeSource();
try {
this.bufferMgr.removeTupleSource(state.tsID);
} catch (TupleSourceNotFoundException e) {
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -36,7 +36,6 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.query.ExpressionEvaluationException;
import com.metamatrix.common.buffer.BlockedException;
-import com.metamatrix.common.buffer.TupleSource;
import com.metamatrix.common.buffer.TupleSourceNotFoundException;
import com.metamatrix.query.rewriter.QueryRewriter;
import com.metamatrix.query.sql.lang.AbstractSetCriteria;
@@ -88,12 +87,6 @@
sortSymbols.add(dependentSetStates.get(i).valueExpression);
}
DependentValueSource originalVs = (DependentValueSource)dependentNode.getContext().getVariableContext().getGlobalValue(valueSource);
- TupleSource ts;
- try {
- ts = dependentNode.getBufferManager().getTupleSource(originalVs.getTupleSourceID());
- } catch (TupleSourceNotFoundException e) {
- throw new MetaMatrixComponentException(e);
- }
this.sortUtility = new SortUtility(originalVs.getTupleSourceID(), sortSymbols, sortDirection, true, dependentNode.getBufferManager(), dependentNode.getConnectionID());
}
dvs = new DependentValueSource(sortUtility.sort(), dependentNode.getBufferManager());
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -103,6 +103,7 @@
result.add(value);
}
}
+ its.closeSource();
if (cachedSets == null) {
cachedSets = new HashMap<Expression, HashSet<Object>>();
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -368,7 +368,7 @@
currentGroupTuple = null;
groupBegin++;
}
-
+ this.groupTupleSource.closeSource();
if(rowCount != 0 || sortElements == null) {
// Close last group
List row = new ArrayList(functions.length);
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -299,6 +299,7 @@
matchEnd = -1;
partitionedTuple = null;
}
+ currentSource.closeSource();
currentSource = null;
currentPartition++;
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -363,6 +363,7 @@
} finally {
tc.saveBatch();
}
+ outTs.closeSource();
outTs = null;
}
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLPlan.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLPlan.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLPlan.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -71,7 +71,6 @@
import com.metamatrix.common.lob.LobChunk;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.types.DataTypeManager;
-import com.metamatrix.common.types.InvalidReferenceException;
import com.metamatrix.common.types.SQLXMLImpl;
import com.metamatrix.common.types.Streamable;
import com.metamatrix.common.types.XMLType;
@@ -274,7 +273,7 @@
// if this is the first chunk, then create a tuple source id for this sequence of chunks
if (!this.docInProgress) {
this.docInProgress = true;
- this.docInProgressTupleSourceId = XMLUtil.createXMLTupleSource(this.bufferMgr, this.resultsTupleSourceId.getStringID());
+ this.docInProgressTupleSourceId = XMLUtil.createXMLTupleSource(this.bufferMgr, this.getContext().getConnectionID());
this.chunkPosition = 1;
}
@@ -287,8 +286,8 @@
// we want this to be naturally feed by chunks whether inside
// or out side the processor
- xml = new XMLType();
- xml.setPersistenceStreamId(this.docInProgressTupleSourceId.getStringID());
+ xml = new XMLType(XMLUtil.getFromBufferManager(bufferMgr, this.docInProgressTupleSourceId, getProperties()));
+ this.bufferMgr.setPersistentTupleSource(this.docInProgressTupleSourceId, xml);
//reset current document state.
this.docInProgress = false;
@@ -356,12 +355,7 @@
Reader source = null;
try {
- try {
- source = xmlDoc.getCharacterStream();
- } catch (InvalidReferenceException e) {
- xmlDoc = XMLUtil.getFromBufferManager(this.bufferMgr, new TupleSourceID(xmlDoc.getPersistenceStreamId()), props);
- source = xmlDoc.getCharacterStream();
- }
+ source = xmlDoc.getCharacterStream();
// Validate against schema
if(this.shouldValidate) {
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -32,18 +32,17 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.buffer.BufferManager;
+import com.metamatrix.common.buffer.BufferManagerLobChunkStream;
import com.metamatrix.common.buffer.TupleSourceID;
import com.metamatrix.common.buffer.TupleSourceNotFoundException;
import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
-import com.metamatrix.common.lob.BufferManagerLobChunkStream;
import com.metamatrix.common.lob.ByteLobChunkStream;
import com.metamatrix.common.lob.LobChunk;
import com.metamatrix.common.lob.LobChunkInputStream;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.common.types.SQLXMLImpl;
import com.metamatrix.common.types.XMLReaderFactory;
-import com.metamatrix.common.types.XMLType;
import com.metamatrix.query.sql.symbol.ElementSymbol;
@@ -95,15 +94,8 @@
* This will reconstruct the XML object from the buffer manager from given
* buffer manager id.
*/
- public static XMLType getFromBufferManager(final BufferManager bufferMgr, final TupleSourceID sourceId, Properties props) {
- SQLXML sqlXML = new SQLXMLImpl(new BufferMangerXMLReaderFactory(bufferMgr, sourceId), props);
-
- // this is object to be sent to the client. The reference
- // id will be set by the buffer manager.
- XMLType xml = new XMLType(sqlXML);
- xml.setPersistenceStreamId(sourceId.getStringID());
-
- return xml;
+ public static SQLXML getFromBufferManager(final BufferManager bufferMgr, final TupleSourceID sourceId, Properties props) {
+ return new SQLXMLImpl(new BufferMangerXMLReaderFactory(bufferMgr, sourceId), props);
}
/**
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/xquery/XQueryPlan.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/xquery/XQueryPlan.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/xquery/XQueryPlan.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -160,21 +160,15 @@
List rows = new ArrayList(1);
List row = new ArrayList(1);
- // this may be little confusing, but the top layer is not immediately going
- // to disk for saving; when it does it only saves the streaming id, not the
- // contents. The below one saves immediately to disk, and when client refers to top
- // id, processor know about the *saved* and gets the contents from it.
+ TupleSourceID savedId = XMLUtil.saveToBufferManager(this.bufferMgr, this.getContext().getConnectionID(), srcXML, this.chunkSize);
+
+ //for large documents use the buffermanager version instead
+ if (this.bufferMgr.getFinalRowCount(savedId) > 1) {
+ srcXML = XMLUtil.getFromBufferManager(this.bufferMgr, savedId, getFormatProperties());
+ }
- // the one which saves to disk
- TupleSourceID savedId = XMLUtil.saveToBufferManager(this.bufferMgr, this.resultsTupleSourceId.getStringID(), srcXML, this.chunkSize);
-
- // here we have 2 options; create xml from original source or from buffer
- // manager. since buffer manager is slow we will choose the first option.
- // incase this xml used in processor it will be faster; if it used in the
- // client using steaming will be slow.
XMLType xml = new XMLType(srcXML);
- xml.setPersistenceStreamId(savedId.getStringID());
- //XMLValue xml = XMLUtil.getFromBufferManager(this.bufferMgr, savedId, this.chunkSize, getFormatProperties());
+ this.bufferMgr.setPersistentTupleSource(savedId, xml);
// now build the top batch with information from the saved one.
row.add(xml);
Modified: trunk/engine/src/main/java/com/metamatrix/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/util/CommandContext.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/util/CommandContext.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -30,18 +30,14 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.query.QueryProcessingException;
-import com.metamatrix.common.util.PropertiesUtils;
import com.metamatrix.core.util.ArgCheck;
import com.metamatrix.query.QueryPlugin;
import com.metamatrix.query.eval.SecurityFunctionEvaluator;
import com.metamatrix.query.execution.QueryExecPlugin;
import com.metamatrix.query.optimizer.relational.PlanToProcessConverter;
import com.metamatrix.query.processor.QueryProcessor;
-import com.metamatrix.query.sql.symbol.ContextReference;
import com.metamatrix.query.sql.symbol.ElementSymbol;
import com.metamatrix.query.sql.symbol.Expression;
-import com.metamatrix.query.sql.util.ValueIterator;
-import com.metamatrix.query.sql.util.ValueIteratorSource;
import com.metamatrix.query.sql.util.VariableContext;
/**
@@ -159,6 +155,10 @@
public CommandContext() {
}
+ public CommandContext getParent() {
+ return parent;
+ }
+
public boolean isSessionFunctionEvaluated() {
if (parent != null) {
return parent.isSessionFunctionEvaluated();
Modified: trunk/engine/src/main/java/com/metamatrix/query/xquery/saxon/SaxonXQueryExpression.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/xquery/saxon/SaxonXQueryExpression.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/xquery/saxon/SaxonXQueryExpression.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -44,10 +44,7 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.types.SQLXMLImpl;
-import com.metamatrix.core.util.StringUtil;
import com.metamatrix.query.QueryPlugin;
-import com.metamatrix.query.eval.Evaluator;
-import com.metamatrix.query.sql.symbol.Expression;
import com.metamatrix.query.util.XMLFormatConstants;
import com.metamatrix.query.xquery.XQueryExpression;
import com.metamatrix.query.xquery.XQuerySQLEvaluator;
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -27,17 +27,14 @@
import java.sql.SQLException;
import com.metamatrix.common.buffer.BufferManager;
-import com.metamatrix.common.lob.BufferManagerLobChunkStream;
import com.metamatrix.common.lob.ByteLobChunkStream;
import com.metamatrix.common.lob.LobChunk;
import com.metamatrix.common.lob.LobChunkProducer;
import com.metamatrix.common.lob.ReaderInputStream;
import com.metamatrix.common.types.BlobType;
import com.metamatrix.common.types.ClobType;
-import com.metamatrix.common.types.InvalidReferenceException;
import com.metamatrix.common.types.Streamable;
import com.metamatrix.common.types.XMLType;
-import com.metamatrix.dqp.DQPPlugin;
/**
* A Lob Stream builder class. Given the Lob object this object can build
@@ -48,7 +45,7 @@
LobChunkProducer internalStream = null;
- public LobChunkStream(Streamable streamable, int chunkSize, BufferManager bufferMgr)
+ public LobChunkStream(Streamable<?> streamable, int chunkSize, BufferManager bufferMgr)
throws IOException {
try {
@@ -64,14 +61,6 @@
BlobType blob = (BlobType)streamable;
this.internalStream = new ByteLobChunkStream(blob.getBinaryStream(), chunkSize);
}
- } catch (InvalidReferenceException e) {
- // if the lob did not have a persistent id, there is no way for us to re-create the
- // object. so throw an error.
- if (streamable.getPersistenceStreamId() == null) {
- throw new IOException(DQPPlugin.Util.getString("LobStream.noreference")); //$NON-NLS-1$
- }
- // otherwise read directly from the buffer manager.
- this.internalStream = new BufferManagerLobChunkStream(streamable.getPersistenceStreamId(), bufferMgr);
} catch(SQLException e) {
IOException ex = new IOException();
ex.initCause(e);
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -23,14 +23,9 @@
package org.teiid.dqp.internal.process;
import java.io.IOException;
-import java.util.List;
import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.common.CommonPlugin;
import com.metamatrix.common.buffer.BlockedOnMemoryException;
-import com.metamatrix.common.buffer.MemoryNotAvailableException;
-import com.metamatrix.common.buffer.TupleBatch;
-import com.metamatrix.common.buffer.TupleSourceID;
import com.metamatrix.common.buffer.TupleSourceNotFoundException;
import com.metamatrix.common.comm.api.ResultsReceiver;
import com.metamatrix.common.lob.LobChunk;
@@ -73,7 +68,7 @@
// If no previous stream is not found for this request create one and
// save for future
if (stream == null) {
- stream = createLobStream(new TupleSourceID(streamId));
+ stream = createLobStream(streamId);
}
// now get the chunk from stream
@@ -119,35 +114,13 @@
* Create a object which can create a sequence of LobChunk objects on a given
* LOB object
*/
- private LobChunkStream createLobStream(TupleSourceID referenceStreamId)
+ private LobChunkStream createLobStream(String referenceStreamId)
throws BlockedOnMemoryException, MetaMatrixComponentException, IOException, TupleSourceNotFoundException {
// get the reference object in the buffer manager, and try to stream off
// the original sources.
- TupleBatch batch = null;
- try {
- batch = dqpCore.getBufferManager().pinTupleBatch(referenceStreamId, 1, 1);
- List[] tuples = batch.getAllTuples();
-
- if (tuples != null && tuples.length > 0) {
- Object anObj = tuples[0].get(0);
- if (anObj instanceof Streamable) {
- Streamable streamable = (Streamable)anObj;
- return new LobChunkStream(streamable, chunkSize, dqpCore.getBufferManager());
- }
- }
- throw new MetaMatrixComponentException(DQPPlugin.Util.getString("ProcessWorker.wrongdata")); //$NON-NLS-1$
- } catch (MemoryNotAvailableException e) {
- throw BlockedOnMemoryException.INSTANCE;
- } finally {
- try {
- if (batch != null) {
- dqpCore.getBufferManager().unpinTupleBatch(referenceStreamId, batch.getBeginRow(), batch.getEndRow());
- }
- } catch (MetaMatrixComponentException e) {
- LogManager.logDetail(LogConstants.CTX_DQP, e, "Call to unpin failed during lob stream creation"); //$NON-NLS-1$
- }
- }
+ Streamable<?> streamable = dqpCore.getBufferManager().getStreamable(parent.resultsID, referenceStreamId);
+ return new LobChunkStream(streamable, chunkSize, dqpCore.getBufferManager());
}
synchronized void setResultsReceiver(ResultsReceiver<LobChunk> resultsReceiver) {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -141,7 +141,7 @@
protected Command originalCommand;
private AnalysisRecord analysisRecord;
private TransactionContext transactionContext;
- private TupleSourceID resultsID;
+ protected TupleSourceID resultsID;
private Collection schemas; // These are schemas associated with XML results
private boolean returnsUpdateCount;
Modified: trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -236,19 +236,7 @@
assertTrue(xml1.getPersistenceStreamId() == null);
assertTrue(xml2.getPersistenceStreamId() == null);
- TupleSourceInfo info = mgr.getTupleSourceInfo(new TupleSourceID(xml1.getReferenceStreamId()), true);
- // make sure the group name of the reference lob, is same as part batch id
- assertEquals(id.getStringID(), info.getGroupInfo().getGroupName());
-
- // now delete the parent tuple source, this should delete the
- // all the kids with same name
- mgr.removeTupleSource(id);
-
- try {
- mgr.getTupleSource(new TupleSourceID(xml2.getReferenceStreamId()));
- fail("this is already should have been cleaned up by above one"); //$NON-NLS-1$
- } catch (TupleSourceNotFoundException e) {
- }
+ assertNotNull(mgr.getStreamable(id, xml1.getReferenceStreamId()));
}
public void testAddStreamablePart() throws Exception {
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -50,6 +50,7 @@
import com.metamatrix.common.buffer.BufferManagerFactory;
import com.metamatrix.common.buffer.TupleSource;
import com.metamatrix.common.buffer.TupleSourceID;
+import com.metamatrix.common.buffer.TupleSourceNotFoundException;
import com.metamatrix.common.buffer.impl.BufferManagerImpl;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.common.types.XMLType;
@@ -215,16 +216,18 @@
}
}
- private void helpProcessException(ProcessorPlan plan, ProcessorDataManager dataManager) {
+ private void helpProcessException(ProcessorPlan plan, ProcessorDataManager dataManager) throws TupleSourceNotFoundException, MetaMatrixComponentException {
helpProcessException(plan, dataManager, null);
}
- private void helpProcessException(ProcessorPlan plan, ProcessorDataManager dataManager, String expectedErrorMessage) {
-
+ private void helpProcessException(ProcessorPlan plan, ProcessorDataManager dataManager, String expectedErrorMessage) throws TupleSourceNotFoundException, MetaMatrixComponentException {
+ TupleSourceID tsId = null;
+ BufferManager bufferMgr = null;
try {
- BufferManager bufferMgr = BufferManagerFactory.getStandaloneBufferManager();
+ bufferMgr = BufferManagerFactory.getStandaloneBufferManager();
CommandContext context = new CommandContext("0", "test", null, null, null); //$NON-NLS-1$ //$NON-NLS-2$
QueryProcessor processor = new QueryProcessor(plan, context, bufferMgr, dataManager);
+ tsId = processor.getResultsID();
processor.process();
fail("Expected error during processing, but got none."); //$NON-NLS-1$
} catch(MetaMatrixCoreException e) {
@@ -232,6 +235,8 @@
if(expectedErrorMessage != null) {
assertEquals(expectedErrorMessage, e.getMessage());
}
+ } finally {
+ bufferMgr.removeTupleSource(tsId);
}
}
@@ -2296,7 +2301,7 @@
* Tests a scalar subquery which returns more than one rows
* causes the expected Exception
*/
- @Test public void testSubqueryScalarException() {
+ @Test public void testSubqueryScalarException() throws Exception {
String sql = "SELECT e1, (SELECT e2 FROM pm2.g1) FROM pm1.g1"; //$NON-NLS-1$
// Construct data manager with data
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -107,13 +107,12 @@
}
public static int getIntBatchSize() {
- List[] expected = new List[] {
- Arrays.asList(new Object[] { new Integer(0) }),
- };
+ List[] expected = new List[BATCH_SIZE];
+ Arrays.fill(expected, Arrays.asList(1));
String[] types = { "integer" }; //$NON-NLS-1$
- int size = (int)SizeUtility.getBatchSize( types, expected ) * BATCH_SIZE;
+ int size = (int)SizeUtility.getBatchSize( types, expected );
return size;
}
Modified: trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
===================================================================
--- trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java 2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java 2009-08-12 17:39:14 UTC (rev 1235)
@@ -54,7 +54,6 @@
private static final String DEFAULT_MANAGEMENT_INTERVAL = "0"; //$NON-NLS-1$
private static final String DEFAULT_LOG_STATS_INTERVAL = DEFAULT_MANAGEMENT_INTERVAL;
private static final String DEFAULT_SESSION_USE_PERCENTAGE = "100"; //$NON-NLS-1$
- private static final String DEFAULT_ID_CREATOR = "com.metamatrix.common.buffer.impl.LongIDCreator"; //$NON-NLS-1$
private static final String DEFAULT_MAX_OPEN_FILES = "10"; //$NON-NLS-1$
// Instance
15 years, 5 months