[teiid-commits] teiid SVN: r1704 - in trunk/engine/src: test/java/org/teiid/dqp/internal/process and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Dec 24 00:03:41 EST 2009


Author: shawkins
Date: 2009-12-24 00:03:40 -0500 (Thu, 24 Dec 2009)
New Revision: 1704

Modified:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java
Log:
TEIID-915 adding more graceful handling of exceptions occurring in between batch requests

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2009-12-23 22:55:47 UTC (rev 1703)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2009-12-24 05:03:40 UTC (rev 1704)
@@ -41,7 +41,6 @@
 import com.metamatrix.api.exception.MetaMatrixException;
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
 import com.metamatrix.common.buffer.BlockedException;
-import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.common.buffer.TupleBatch;
 import com.metamatrix.common.buffer.TupleBuffer;
 import com.metamatrix.common.comm.api.ResultsReceiver;
@@ -75,35 +74,6 @@
 
 public class RequestWorkItem extends AbstractWorkItem {
 	
-	protected static class ResultsCursor {
-		int begin;
-		int end;
-		boolean resultsRequested;
-		
-		/**
-		 * Ask for results.
-		 * @param beginRow
-		 * @param endRow
-		 * @param isPoll
-		 */
-		synchronized void requestResults(int beginRow, int endRow, boolean isPoll) {
-			if (this.resultsRequested) {
-				if (!isPoll) {
-					throw new IllegalStateException("Results already requested"); //$NON-NLS-1$\
-				} else if (begin != beginRow || end != endRow) {
-					throw new IllegalStateException("Polling for different results than previously requested"); //$NON-NLS-1$
-				}
-			}
-			this.begin = beginRow;
-			this.end = endRow;
-			this.resultsRequested = true;
-		}
-		
-		synchronized void resultsSent() {
-			this.resultsRequested = false;
-		}
-	}
-	
 	private enum ProcessingState {NEW, PROCESSING, CLOSE}
 	private ProcessingState state = ProcessingState.NEW;
     
@@ -117,13 +87,16 @@
     final RequestMessage requestMsg;    
     final RequestID requestID;
     protected Request request; //provides the processing plan, held on a temporary basis
-    final private BufferManager bufferMgr;
     final private int processorTimeslice;
     //protected ResultSetCache rsCache;
 	//protected CacheID cid;
 	final private TransactionService transactionService;
 	final DQPWorkContext dqpWorkContext;
+	
+	//results request
 	ResultsReceiver<ResultsMessage> resultsReceiver;
+	private int begin;
+	private int end;
         
     /*
      * obtained during new
@@ -150,16 +123,11 @@
     private volatile boolean isCanceled;
     private volatile boolean closeRequested;
     
-    /** Range of rows requested by the client */
-    protected ResultsCursor resultsCursor = new ResultsCursor();
-    
     private Map<Integer, LobWorkItem> lobStreams = Collections.synchronizedMap(new HashMap<Integer, LobWorkItem>(4));
     
     public RequestWorkItem(DQPCore dqpCore, RequestMessage requestMsg, Request request, ResultsReceiver<ResultsMessage> receiver, RequestID requestID, DQPWorkContext workContext) {
         this.requestMsg = requestMsg;
         this.requestID = requestID;
-        this.resultsCursor.requestResults(1, requestMsg.getFetchSize(), false);
-        this.bufferMgr = dqpCore.getBufferManager();
         this.processorTimeslice = dqpCore.getProcessorTimeSlice();
         /*this.rsCache = dqpCore.getRsCache();
         if (this.rsCache != null) {
@@ -168,10 +136,24 @@
         this.transactionService = dqpCore.getTransactionServiceDirect();
         this.dqpCore = dqpCore;
         this.request = request;
-        this.resultsReceiver = receiver;
         this.dqpWorkContext = workContext;
+        this.requestResults(1, requestMsg.getFetchSize(), receiver);
     }
     
+	/**
+	 * Ask for results.
+	 * @param beginRow
+	 * @param endRow
+	 */
+    synchronized void requestResults(int beginRow, int endRow, ResultsReceiver<ResultsMessage> receiver) {
+		if (this.resultsReceiver != null) {
+			throw new IllegalStateException("Results already requested"); //$NON-NLS-1$\
+		}
+		this.resultsReceiver = receiver;
+		this.begin = beginRow;
+		this.end = endRow;
+	}
+    
 	@Override
 	protected boolean isDoneProcessing() {
 		return isClosed;
@@ -257,7 +239,7 @@
                 cr.setAnalysisRecord(analysisRecord);
             }*/
 			if (this.transactionState == TransactionState.ACTIVE) {
-				boolean end = true;
+				boolean endState = true;
 				/*
 				 * TEIID-14 if we are done producing batches, then proactively close transactional 
 				 * executions even ones that were intentionally kept alive. this may 
@@ -267,10 +249,10 @@
 	        	for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
 	        		if (connectorRequest.isTransactional()) {
 	        			connectorRequest.fullyCloseSource();
-	        			end = false;
+	        			endState = false;
 	        		}
 	            }
-				if (end) {
+				if (endState) {
 					this.transactionState = TransactionState.END;
 				}
 			}
@@ -382,71 +364,74 @@
 	 * Send results if they have been requested.  This should only be called from the processing thread.
 	 */
 	protected void sendResultsIfNeeded(TupleBatch batch) throws MetaMatrixComponentException {
-		
-		synchronized (resultsCursor) {
-			if (!this.resultsCursor.resultsRequested
-					|| (this.resultsCursor.begin > this.processor.getHighestRow() && !doneProducingBatches)
+		ResultsMessage response = null;
+		ResultsReceiver<ResultsMessage> receiver = null;
+		synchronized (this) {
+			if (this.resultsReceiver == null
+					|| (this.begin > this.processor.getHighestRow() && !doneProducingBatches)
 					|| (this.transactionState == TransactionState.ACTIVE)) {
 				return;
 			}
+			
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+				LogManager.logDetail(LogConstants.CTX_DQP, "[RequestWorkItem.sendResultsIfNeeded] requestID: " + requestID + " resultsID: " + this.resultsBuffer + " done: " + doneProducingBatches );   //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+			}
+	
+	    	if (batch == null || batch.getBeginRow() > this.begin) {
+	    		batch = resultsBuffer.getBatch(begin);
+	    		//TODO: support fetching more than 1 batch
+	    		int count = this.end - this.begin + 1;
+	    		if (batch.getRowCount() > count) {
+	    			int beginRow = Math.min(this.begin, batch.getEndRow() - count + 1);
+	    			int endRow = Math.min(beginRow + count - 1, batch.getEndRow());
+	        		int firstOffset = beginRow - batch.getBeginRow();
+	                List[] memoryRows = batch.getAllTuples();
+	                List[] rows = new List[count];
+	                System.arraycopy(memoryRows, firstOffset, rows, 0, endRow - beginRow + 1);
+	                batch = new TupleBatch(beginRow, rows);
+	    		}
+	    	}
+	        int finalRowCount = doneProducingBatches?this.processor.getHighestRow():-1;
+	        
+	        response = createResultsMessage(requestMsg, batch.getAllTuples(), this.processor.getProcessorPlan().getOutputElements(), analysisRecord);
+	        response.setFirstRow(batch.getBeginRow());
+	        response.setLastRow(batch.getEndRow());
+	        response.setUpdateResult(this.returnsUpdateCount);
+	        // set final row
+	        response.setFinalRow(finalRowCount);
+	
+	        // send any schemas associated with the results
+	        response.setSchemas(this.schemas);
+	        
+	        // send any warnings with the response object
+	        List<Throwable> responseWarnings = new ArrayList<Throwable>();
+			List<Exception> currentWarnings = processor.getAndClearWarnings();
+		    if (currentWarnings != null) {
+		    	responseWarnings.addAll(currentWarnings);
+		    }
+		    synchronized (warnings) {
+	        	responseWarnings.addAll(this.warnings);
+	        	this.warnings.clear();
+		    }
+	        response.setWarnings(responseWarnings);
+	        
+	        // If it is stored procedure, set parameters
+	        if (originalCommand instanceof StoredProcedure) {
+	        	StoredProcedure proc = (StoredProcedure)originalCommand;
+	        	if (proc.returnParameters()) {
+	        		response.setParameters(getParameterInfo(proc));
+	        	}
+	        }
+	        /*
+	         * mark the results sent at this point.
+	         * communication exceptions will be treated as non-recoverable 
+	         */
+	        receiver = this.resultsReceiver;
+	        this.resultsReceiver = null;    
 		}
-		
-		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-			LogManager.logDetail(LogConstants.CTX_DQP, "[RequestWorkItem.sendResultsIfNeeded] requestID: " + requestID + " resultsID: " + this.resultsBuffer + " done: " + doneProducingBatches );   //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
-		}
 
-    	if (batch == null || batch.getBeginRow() > this.resultsCursor.begin) {
-    		batch = resultsBuffer.getBatch(resultsCursor.begin);
-    		//TODO: support fetching more than 1 batch
-    		int count = this.resultsCursor.end - this.resultsCursor.begin + 1;
-    		if (batch.getRowCount() > count) {
-    			int beginRow = Math.min(this.resultsCursor.begin, batch.getEndRow() - count + 1);
-    			int endRow = Math.min(beginRow + count - 1, batch.getEndRow());
-        		int firstOffset = beginRow - batch.getBeginRow();
-                List[] memoryRows = batch.getAllTuples();
-                List[] rows = new List[count];
-                System.arraycopy(memoryRows, firstOffset, rows, 0, endRow - beginRow + 1);
-                batch = new TupleBatch(beginRow, rows);
-    		}
-    	}
-        int finalRowCount = doneProducingBatches?this.processor.getHighestRow():-1;
+        receiver.receiveResults(response);
         
-        ResultsMessage response = createResultsMessage(requestMsg, batch.getAllTuples(), this.processor.getProcessorPlan().getOutputElements(), analysisRecord);
-        response.setFirstRow(batch.getBeginRow());
-        response.setLastRow(batch.getEndRow());
-        response.setUpdateResult(this.returnsUpdateCount);
-        // set final row
-        response.setFinalRow(finalRowCount);
-
-        // send any schemas associated with the results
-        response.setSchemas(this.schemas);
-        
-        // send any warnings with the response object
-        List<Throwable> responseWarnings = new ArrayList<Throwable>();
-		List<Exception> currentWarnings = processor.getAndClearWarnings();
-	    if (currentWarnings != null) {
-	    	responseWarnings.addAll(currentWarnings);
-	    }
-	    synchronized (warnings) {
-        	responseWarnings.addAll(this.warnings);
-        	this.warnings.clear();
-	    }
-        response.setWarnings(responseWarnings);
-        
-        // If it is stored procedure, set parameters
-        if (originalCommand instanceof StoredProcedure) {
-        	StoredProcedure proc = (StoredProcedure)originalCommand;
-        	if (proc.returnParameters()) {
-        		response.setParameters(getParameterInfo(proc));
-        	}
-        }
-
-        /*
-         * mark the results sent at this point.
-         * communication exceptions will be treated as non-recoverable 
-         */
-        this.resultsCursor.resultsSent();
-        this.resultsReceiver.receiveResults(response);
 	}
     
     public static ResultsMessage createResultsMessage(RequestMessage message, List[] batch, List columnSymbols, AnalysisRecord analysisRecord) {
@@ -490,7 +475,13 @@
 	}
 
     private void sendError() {
-		LogManager.logDetail(LogConstants.CTX_DQP, processingException, "Sedning error to client", requestID); //$NON-NLS-1$
+    	synchronized (this) {
+    		if (this.resultsReceiver == null) {
+    			LogManager.logDetail(LogConstants.CTX_DQP, processingException, "Unable to send error to client as results were already sent.", requestID); //$NON-NLS-1$
+    			return;
+    		}
+    	}
+		LogManager.logDetail(LogConstants.CTX_DQP, processingException, "Sending error to client", requestID); //$NON-NLS-1$
         ResultsMessage response = new ResultsMessage(requestMsg);
         response.setException(processingException);
         setAnalysisRecords(response, analysisRecord);
@@ -594,8 +585,7 @@
     }
     
     public void requestMore(int batchFirst, int batchLast, ResultsReceiver<ResultsMessage> receiver) {
-    	this.resultsReceiver = receiver;
-    	this.resultsCursor.requestResults(batchFirst, batchLast, false);
+    	this.requestResults(batchFirst, batchLast, receiver);
     	this.moreWork(); 
     }
     

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java	2009-12-23 22:55:47 UTC (rev 1703)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java	2009-12-24 05:03:40 UTC (rev 1704)
@@ -113,8 +113,7 @@
         DQPCore rm = new DQPCore();
         RequestMessage r0 = new RequestMessage("foo"); //$NON-NLS-1$
         RequestID requestID = new RequestID(SESSION_STRING, 1);
-        RequestWorkItem workItem = addRequest(rm, r0, requestID, null, null);  
-        assertTrue(workItem.resultsCursor.resultsRequested);
+        addRequest(rm, r0, requestID, null, null);  
     }
     
     public void testWarnings1() {



More information about the teiid-commits mailing list