Author: shawkins
Date: 2012-06-29 16:04:55 -0400 (Fri, 29 Jun 2012)
New Revision: 4215
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/BlockedException.java
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/UnionAllNode.java
trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
trunk/test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestXMLTypeTranslations.java
Log:
TEIID-2089 fix for query hanging with enhanced merge join
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BlockedException.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BlockedException.java 2012-06-29
13:44:31 UTC (rev 4214)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BlockedException.java 2012-06-29
20:04:55 UTC (rev 4215)
@@ -22,6 +22,8 @@
package org.teiid.common.buffer;
+import java.util.Arrays;
+
import org.teiid.core.TeiidComponentException;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
@@ -45,9 +47,20 @@
public static BlockedException block(Object... msg) {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, msg);
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, msg);
}
return INSTANCE;
}
+
+ public static BlockedException blockWithTrace(Object... msg) {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL))
{
+ BlockedException be = new BlockedException();
+ if (be.getStackTrace().length > 0) {
+ be.setStackTrace(Arrays.copyOfRange(be.getStackTrace(), 1, Math.max(0, Math.min(8,
be.getStackTrace().length))));
+ }
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, be, msg);
+ }
+ return INSTANCE;
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2012-06-29
13:44:31 UTC (rev 4214)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2012-06-29
20:04:55 UTC (rev 4215)
@@ -318,7 +318,7 @@
if(isFinal) {
return null;
}
- throw BlockedException.block("Blocking on non-final TupleBuffer",
tupleSourceID); //$NON-NLS-1$
+ throw BlockedException.blockWithTrace("Blocking on non-final
TupleBuffer", tupleSourceID, "size", getRowCount()); //$NON-NLS-1$
//$NON-NLS-2$
}
@Override
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2012-06-29
13:44:31 UTC (rev 4214)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2012-06-29
20:04:55 UTC (rev 4215)
@@ -261,7 +261,7 @@
} else if (this.cwi.isDataAvailable()) {
continue;
}
- throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking on
DataNotAvailableException"); //$NON-NLS-1$
+ throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking on
DataNotAvailableException", aqr.getAtomicRequestID()); //$NON-NLS-1$
}
receiveResults(results, partial);
}
@@ -312,7 +312,7 @@
addWork();
}
if (!futureResult.isDone()) {
- throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking on source
query"); //$NON-NLS-1$
+ throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking on source
query", aqr.getAtomicRequestID()); //$NON-NLS-1$
}
FutureWork<AtomicResultsMessage> currentResults = futureResult;
futureResult = null;
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2012-06-29
13:44:31 UTC (rev 4214)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2012-06-29
20:04:55 UTC (rev 4215)
@@ -571,7 +571,8 @@
&& this.getTupleBuffer().getManagedRowCount() >=
OUTPUT_BUFFER_MAX_BATCHES * this.getTupleBuffer().getBatchSize()) {
if (!dqpCore.hasWaitingPlans(RequestWorkItem.this)) {
//requestMore will trigger more processing
- throw BlockedException.block(requestID, "Blocking due to full results
buffer."); //$NON-NLS-1$
+ throw BlockedException.block(requestID, "Blocking due to full results
TupleBuffer", //$NON-NLS-1$
+ this.getTupleBuffer().getId(), "rows",
this.getTupleBuffer().getManagedRowCount(), "batch size",
this.getTupleBuffer().getBatchSize()); //$NON-NLS-1$ //$NON-NLS-2$
}
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, requestID, "Exceeding buffer limit
since there are pending active plans."); //$NON-NLS-1$
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2012-06-29
13:44:31 UTC (rev 4214)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2012-06-29
20:04:55 UTC (rev 4215)
@@ -37,6 +37,9 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.logging.MessageLevel;
import org.teiid.query.optimizer.relational.rules.NewCalculateCostUtil;
import org.teiid.query.sql.lang.OrderBy;
import org.teiid.query.sql.symbol.ElementSymbol;
@@ -56,7 +59,7 @@
*/
public class EnhancedSortMergeJoinStrategy extends MergeJoinStrategy {
- private final class SingleTupleSource extends AbstractList<Object> implements
TupleSource {
+ private static final class SingleTupleSource extends AbstractList<Object>
implements TupleSource {
boolean returned;
private int[] indexes;
private List<?> keyTuple;
@@ -256,6 +259,9 @@
if (this.processingSortLeft != SortOption.NOT_SORTED &&
this.processingSortRight != SortOption.NOT_SORTED) {
super.loadRight();
super.loadLeft();
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "degrading to merged join",
this.joinNode.getID()); //$NON-NLS-1$
+ }
return; //degrade to merge join
}
if (this.processingSortLeft == SortOption.NOT_SORTED) {
@@ -267,6 +273,9 @@
} else {
super.loadRight(); //sort if needed
this.notSortedSource.sort(SortOption.NOT_SORTED); //do a single sort pass
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP,
MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "performing single pass sort
right", this.joinNode.getID()); //$NON-NLS-1$
+ }
}
} else if (this.processingSortRight == SortOption.NOT_SORTED) {
this.sortedSource = this.leftSource;
@@ -288,6 +297,9 @@
} else {
super.loadLeft(); //sort if needed
this.notSortedSource.sort(SortOption.NOT_SORTED); //do a single sort pass
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP,
MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "performing single pass sort
left", this.joinNode.nodeToString()); //$NON-NLS-1$
+ }
}
}
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/UnionAllNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/UnionAllNode.java 2012-06-29
13:44:31 UTC (rev 4214)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/UnionAllNode.java 2012-06-29
20:04:55 UTC (rev 4215)
@@ -129,7 +129,7 @@
} else if(activeSources > 0) {
// Didn't get a batch but there are active sources so we are blocked
- throw BlockedException.block(getContext().getRequestId(), "Blocking on
union source."); //$NON-NLS-1$
+ throw BlockedException.block(getContext().getRequestId(), "Blocking on
union source.", getID()); //$NON-NLS-1$
} else {
// No batch and no active sources - return empty termination batch (should
never happen but just in case)
outputBatch = new TupleBatch(outputRow, Collections.EMPTY_LIST);
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java 2012-06-29
13:44:31 UTC (rev 4214)
+++
trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java 2012-06-29
20:04:55 UTC (rev 4215)
@@ -132,7 +132,7 @@
insert.setTupleSource(new TempLoadTupleSource());
this.dataManager.registerRequest(this.internalProcessor.getContext(), insert,
TempMetadataAdapter.TEMP_MODEL.getName(), new RegisterRequestParameter());
if (!doneLoading) {
- throw BlockedException.block("Blocking on result set load");
//$NON-NLS-1$
+ throw BlockedException.block(resultInfo.getResultSetName(), "Blocking on
result set load"); //$NON-NLS-1$
}
internalProcessor.closeProcessing();
AlterTempTable att = new AlterTempTable(tempTable);
Modified:
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java 2012-06-29
13:44:31 UTC (rev 4214)
+++
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java 2012-06-29
20:04:55 UTC (rev 4215)
@@ -743,25 +743,25 @@
this.rightTuples = data;
this.leftTuples = new List[17];
for (int i = 0; i < this.leftTuples.length; i++) {
- this.leftTuples[i] = Arrays.asList(i);
+ this.leftTuples[i] = Arrays.asList(i*4);
}
if (!indexDistinct) {
- this.leftTuples[1] = Arrays.asList(0);
+ this.leftTuples[3] = Arrays.asList(0);
}
this.leftTuples[11] = Arrays.asList((Integer)null);
expected = new List[] {
- Arrays.asList(13, 13),
- Arrays.asList(2, 2),
+ Arrays.asList(64, 64),
+ Arrays.asList(36, 36),
Arrays.asList(8, 8),
- Arrays.asList(14, 14),
- Arrays.asList(3, 3),
- Arrays.asList(9, 9),
- Arrays.asList(15, 15),
+ Arrays.asList(48, 48),
+ Arrays.asList(20, 20),
+ Arrays.asList(60, 60),
+ Arrays.asList(32, 32),
Arrays.asList(4, 4),
- Arrays.asList(10, 10),
Arrays.asList(16, 16),
- Arrays.asList(5, 5),
+ Arrays.asList(56, 56),
+ Arrays.asList(28, 28),
Arrays.asList(0, 0),
Arrays.asList(0, 0),
};
Modified:
trunk/test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestXMLTypeTranslations.java
===================================================================
---
trunk/test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestXMLTypeTranslations.java 2012-06-29
13:44:31 UTC (rev 4214)
+++
trunk/test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestXMLTypeTranslations.java 2012-06-29
20:04:55 UTC (rev 4215)
@@ -86,7 +86,7 @@
})});
- List[] expected = new List[] { Arrays.asList(new Object[] {"<?xml
version=\"1.0\" encoding=\"UTF-8\"?><XSDTypesNS:test
xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\"
xmlns:XSDTypesNS=\"http://www.metamatrix.com/XMLSchema/DataSets/XSDT...
})}; //$NON-NLS-1$
+ List<?>[] expected = new List[] { Arrays.asList(new Object[]
{"<?xml version=\"1.0\"
encoding=\"UTF-8\"?><XSDTypesNS:test
xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\"
xmlns:XSDTypesNS=\"http://www.metamatrix.com/XMLSchema/DataSets/XSDT...
})}; //$NON-NLS-1$
doProcess(metadata,
sql,
finder, dataMgr , expected, DEBUG);