Author: shawkins
Date: 2009-12-24 00:03:40 -0500 (Thu, 24 Dec 2009)
New Revision: 1704
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java
Log:
TEIID-915 adding more graceful handling of exceptions occurring in between batch requests
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-12-23
22:55:47 UTC (rev 1703)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-12-24
05:03:40 UTC (rev 1704)
@@ -41,7 +41,6 @@
import com.metamatrix.api.exception.MetaMatrixException;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.buffer.BlockedException;
-import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.TupleBatch;
import com.metamatrix.common.buffer.TupleBuffer;
import com.metamatrix.common.comm.api.ResultsReceiver;
@@ -75,35 +74,6 @@
public class RequestWorkItem extends AbstractWorkItem {
- protected static class ResultsCursor {
- int begin;
- int end;
- boolean resultsRequested;
-
- /**
- * Ask for results.
- * @param beginRow
- * @param endRow
- * @param isPoll
- */
- synchronized void requestResults(int beginRow, int endRow, boolean isPoll) {
- if (this.resultsRequested) {
- if (!isPoll) {
- throw new IllegalStateException("Results already requested");
//$NON-NLS-1$\
- } else if (begin != beginRow || end != endRow) {
- throw new IllegalStateException("Polling for different results than previously
requested"); //$NON-NLS-1$
- }
- }
- this.begin = beginRow;
- this.end = endRow;
- this.resultsRequested = true;
- }
-
- synchronized void resultsSent() {
- this.resultsRequested = false;
- }
- }
-
private enum ProcessingState {NEW, PROCESSING, CLOSE}
private ProcessingState state = ProcessingState.NEW;
@@ -117,13 +87,16 @@
final RequestMessage requestMsg;
final RequestID requestID;
protected Request request; //provides the processing plan, held on a temporary basis
- final private BufferManager bufferMgr;
final private int processorTimeslice;
//protected ResultSetCache rsCache;
//protected CacheID cid;
final private TransactionService transactionService;
final DQPWorkContext dqpWorkContext;
+
+ //results request
ResultsReceiver<ResultsMessage> resultsReceiver;
+ private int begin;
+ private int end;
/*
* obtained during new
@@ -150,16 +123,11 @@
private volatile boolean isCanceled;
private volatile boolean closeRequested;
- /** Range of rows requested by the client */
- protected ResultsCursor resultsCursor = new ResultsCursor();
-
private Map<Integer, LobWorkItem> lobStreams = Collections.synchronizedMap(new
HashMap<Integer, LobWorkItem>(4));
public RequestWorkItem(DQPCore dqpCore, RequestMessage requestMsg, Request request,
ResultsReceiver<ResultsMessage> receiver, RequestID requestID, DQPWorkContext
workContext) {
this.requestMsg = requestMsg;
this.requestID = requestID;
- this.resultsCursor.requestResults(1, requestMsg.getFetchSize(), false);
- this.bufferMgr = dqpCore.getBufferManager();
this.processorTimeslice = dqpCore.getProcessorTimeSlice();
/*this.rsCache = dqpCore.getRsCache();
if (this.rsCache != null) {
@@ -168,10 +136,24 @@
this.transactionService = dqpCore.getTransactionServiceDirect();
this.dqpCore = dqpCore;
this.request = request;
- this.resultsReceiver = receiver;
this.dqpWorkContext = workContext;
+ this.requestResults(1, requestMsg.getFetchSize(), receiver);
}
+ /**
+ * Ask for results.
+ * @param beginRow
+ * @param endRow
+ */
+ synchronized void requestResults(int beginRow, int endRow,
ResultsReceiver<ResultsMessage> receiver) {
+ if (this.resultsReceiver != null) {
+ throw new IllegalStateException("Results already requested");
//$NON-NLS-1$\
+ }
+ this.resultsReceiver = receiver;
+ this.begin = beginRow;
+ this.end = endRow;
+ }
+
@Override
protected boolean isDoneProcessing() {
return isClosed;
@@ -257,7 +239,7 @@
cr.setAnalysisRecord(analysisRecord);
}*/
if (this.transactionState == TransactionState.ACTIVE) {
- boolean end = true;
+ boolean endState = true;
/*
* TEIID-14 if we are done producing batches, then proactively close transactional
* executions even ones that were intentionally kept alive. this may
@@ -267,10 +249,10 @@
for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
if (connectorRequest.isTransactional()) {
connectorRequest.fullyCloseSource();
- end = false;
+ endState = false;
}
}
- if (end) {
+ if (endState) {
this.transactionState = TransactionState.END;
}
}
@@ -382,71 +364,74 @@
* Send results if they have been requested. This should only be called from the
processing thread.
*/
protected void sendResultsIfNeeded(TupleBatch batch) throws MetaMatrixComponentException
{
-
- synchronized (resultsCursor) {
- if (!this.resultsCursor.resultsRequested
- || (this.resultsCursor.begin > this.processor.getHighestRow() &&
!doneProducingBatches)
+ ResultsMessage response = null;
+ ResultsReceiver<ResultsMessage> receiver = null;
+ synchronized (this) {
+ if (this.resultsReceiver == null
+ || (this.begin > this.processor.getHighestRow() &&
!doneProducingBatches)
|| (this.transactionState == TransactionState.ACTIVE)) {
return;
}
+
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP,
"[RequestWorkItem.sendResultsIfNeeded] requestID: " + requestID + "
resultsID: " + this.resultsBuffer + " done: " + doneProducingBatches );
//$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ }
+
+ if (batch == null || batch.getBeginRow() > this.begin) {
+ batch = resultsBuffer.getBatch(begin);
+ //TODO: support fetching more than 1 batch
+ int count = this.end - this.begin + 1;
+ if (batch.getRowCount() > count) {
+ int beginRow = Math.min(this.begin, batch.getEndRow() - count + 1);
+ int endRow = Math.min(beginRow + count - 1, batch.getEndRow());
+ int firstOffset = beginRow - batch.getBeginRow();
+ List[] memoryRows = batch.getAllTuples();
+ List[] rows = new List[count];
+ System.arraycopy(memoryRows, firstOffset, rows, 0, endRow - beginRow +
1);
+ batch = new TupleBatch(beginRow, rows);
+ }
+ }
+ int finalRowCount = doneProducingBatches?this.processor.getHighestRow():-1;
+
+ response = createResultsMessage(requestMsg, batch.getAllTuples(),
this.processor.getProcessorPlan().getOutputElements(), analysisRecord);
+ response.setFirstRow(batch.getBeginRow());
+ response.setLastRow(batch.getEndRow());
+ response.setUpdateResult(this.returnsUpdateCount);
+ // set final row
+ response.setFinalRow(finalRowCount);
+
+ // send any schemas associated with the results
+ response.setSchemas(this.schemas);
+
+ // send any warnings with the response object
+ List<Throwable> responseWarnings = new ArrayList<Throwable>();
+ List<Exception> currentWarnings = processor.getAndClearWarnings();
+ if (currentWarnings != null) {
+ responseWarnings.addAll(currentWarnings);
+ }
+ synchronized (warnings) {
+ responseWarnings.addAll(this.warnings);
+ this.warnings.clear();
+ }
+ response.setWarnings(responseWarnings);
+
+ // If it is stored procedure, set parameters
+ if (originalCommand instanceof StoredProcedure) {
+ StoredProcedure proc = (StoredProcedure)originalCommand;
+ if (proc.returnParameters()) {
+ response.setParameters(getParameterInfo(proc));
+ }
+ }
+ /*
+ * mark the results sent at this point.
+ * communication exceptions will be treated as non-recoverable
+ */
+ receiver = this.resultsReceiver;
+ this.resultsReceiver = null;
}
-
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "[RequestWorkItem.sendResultsIfNeeded]
requestID: " + requestID + " resultsID: " + this.resultsBuffer + "
done: " + doneProducingBatches ); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
- }
- if (batch == null || batch.getBeginRow() > this.resultsCursor.begin) {
- batch = resultsBuffer.getBatch(resultsCursor.begin);
- //TODO: support fetching more than 1 batch
- int count = this.resultsCursor.end - this.resultsCursor.begin + 1;
- if (batch.getRowCount() > count) {
- int beginRow = Math.min(this.resultsCursor.begin, batch.getEndRow() - count + 1);
- int endRow = Math.min(beginRow + count - 1, batch.getEndRow());
- int firstOffset = beginRow - batch.getBeginRow();
- List[] memoryRows = batch.getAllTuples();
- List[] rows = new List[count];
- System.arraycopy(memoryRows, firstOffset, rows, 0, endRow - beginRow +
1);
- batch = new TupleBatch(beginRow, rows);
- }
- }
- int finalRowCount = doneProducingBatches?this.processor.getHighestRow():-1;
+ receiver.receiveResults(response);
- ResultsMessage response = createResultsMessage(requestMsg, batch.getAllTuples(),
this.processor.getProcessorPlan().getOutputElements(), analysisRecord);
- response.setFirstRow(batch.getBeginRow());
- response.setLastRow(batch.getEndRow());
- response.setUpdateResult(this.returnsUpdateCount);
- // set final row
- response.setFinalRow(finalRowCount);
-
- // send any schemas associated with the results
- response.setSchemas(this.schemas);
-
- // send any warnings with the response object
- List<Throwable> responseWarnings = new ArrayList<Throwable>();
- List<Exception> currentWarnings = processor.getAndClearWarnings();
- if (currentWarnings != null) {
- responseWarnings.addAll(currentWarnings);
- }
- synchronized (warnings) {
- responseWarnings.addAll(this.warnings);
- this.warnings.clear();
- }
- response.setWarnings(responseWarnings);
-
- // If it is stored procedure, set parameters
- if (originalCommand instanceof StoredProcedure) {
- StoredProcedure proc = (StoredProcedure)originalCommand;
- if (proc.returnParameters()) {
- response.setParameters(getParameterInfo(proc));
- }
- }
-
- /*
- * mark the results sent at this point.
- * communication exceptions will be treated as non-recoverable
- */
- this.resultsCursor.resultsSent();
- this.resultsReceiver.receiveResults(response);
}
public static ResultsMessage createResultsMessage(RequestMessage message, List[]
batch, List columnSymbols, AnalysisRecord analysisRecord) {
@@ -490,7 +475,13 @@
}
private void sendError() {
- LogManager.logDetail(LogConstants.CTX_DQP, processingException, "Sedning error to
client", requestID); //$NON-NLS-1$
+ synchronized (this) {
+ if (this.resultsReceiver == null) {
+ LogManager.logDetail(LogConstants.CTX_DQP, processingException, "Unable to
send error to client as results were already sent.", requestID); //$NON-NLS-1$
+ return;
+ }
+ }
+ LogManager.logDetail(LogConstants.CTX_DQP, processingException, "Sending error to
client", requestID); //$NON-NLS-1$
ResultsMessage response = new ResultsMessage(requestMsg);
response.setException(processingException);
setAnalysisRecords(response, analysisRecord);
@@ -594,8 +585,7 @@
}
public void requestMore(int batchFirst, int batchLast,
ResultsReceiver<ResultsMessage> receiver) {
- this.resultsReceiver = receiver;
- this.resultsCursor.requestResults(batchFirst, batchLast, false);
+ this.requestResults(batchFirst, batchLast, receiver);
this.moreWork();
}
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java 2009-12-23
22:55:47 UTC (rev 1703)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java 2009-12-24
05:03:40 UTC (rev 1704)
@@ -113,8 +113,7 @@
DQPCore rm = new DQPCore();
RequestMessage r0 = new RequestMessage("foo"); //$NON-NLS-1$
RequestID requestID = new RequestID(SESSION_STRING, 1);
- RequestWorkItem workItem = addRequest(rm, r0, requestID, null, null);
- assertTrue(workItem.resultsCursor.resultsRequested);
+ addRequest(rm, r0, requestID, null, null);
}
public void testWarnings1() {