[teiid-commits] teiid SVN: r3532 - in branches/7.4.x: engine/src/main/java/org/teiid/query/processor and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Oct 5 16:22:44 EDT 2011


Author: shawkins
Date: 2011-10-05 16:22:43 -0400 (Wed, 05 Oct 2011)
New Revision: 3532

Modified:
   branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
   branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
   branches/7.4.x/engine/src/test/java/org/teiid/query/processor/TestQueryProcessor.java
   branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java
Log:
TEIID-1769 fix for hangs from close

Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-10-05 16:09:13 UTC (rev 3531)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-10-05 20:22:43 UTC (rev 3532)
@@ -291,37 +291,9 @@
             LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread", requestID, "- time slice expired"); //$NON-NLS-1$ //$NON-NLS-2$
             this.moreWork();
         } catch (Throwable e) {
-        	LogManager.logDetail(LogConstants.CTX_DQP, e, "Request Thread", requestID, "- error occurred"); //$NON-NLS-1$ //$NON-NLS-2$
-            
-            if (!isCanceled()) {
-            	dqpCore.logMMCommand(this, Event.ERROR, null);
-                //Case 5558: Differentiate between system level errors and
-                //processing errors.  Only log system level errors as errors, 
-                //log the processing errors as warnings only
-                if(e instanceof TeiidProcessingException) {                          
-                	Throwable cause = e;
-                	while (cause.getCause() != null && cause.getCause() != cause) {
-                		cause = cause.getCause();
-                	}
-                	StackTraceElement[] elems = cause.getStackTrace();
-                	Object elem = null;
-                	if (elems.length > 0) {
-                		elem = cause.getStackTrace()[0];
-                	} else {
-                		elem = cause.getMessage();
-                	}
-                    LogManager.logWarning(LogConstants.CTX_DQP, QueryPlugin.Util.getString("ProcessWorker.processing_error", e.getMessage(), requestID, e.getClass().getName(), elem)); //$NON-NLS-1$
-                }else {
-                    LogManager.logError(LogConstants.CTX_DQP, e, QueryPlugin.Util.getString("ProcessWorker.error", requestID)); //$NON-NLS-1$
-                }                                
-            }
-            
-            this.processingException = e;
-            this.state = ProcessingState.CLOSE;
+        	handleThrowable(e);
         } finally {
-        	if (this.state == ProcessingState.CLOSE && !isClosed) {
-        		attemptClose();
-        	} else if (isClosed) {
+        	if (isClosed) {
         		/*
         		 * since there may be a client waiting notify them of a problem
         		 */
@@ -329,11 +301,42 @@
         			this.processingException = new IllegalStateException("Request is already closed"); //$NON-NLS-1$
         		}
         		sendError();
+        	} else if (this.state == ProcessingState.CLOSE) {
+        		close();
         	}
         	suspend();
         }
     }
 
+	private void handleThrowable(Throwable e) {
+		LogManager.logDetail(LogConstants.CTX_DQP, e, "Request Thread", requestID, "- error occurred"); //$NON-NLS-1$ //$NON-NLS-2$
+		
+		if (!isCanceled()) {
+			dqpCore.logMMCommand(this, Event.ERROR, null);
+		    //Case 5558: Differentiate between system level errors and
+		    //processing errors.  Only log system level errors as errors, 
+		    //log the processing errors as warnings only
+		    if(e instanceof TeiidProcessingException) {                          
+		    	Throwable cause = e;
+		    	while (cause.getCause() != null && cause.getCause() != cause) {
+		    		cause = cause.getCause();
+		    	}
+		    	StackTraceElement[] elems = cause.getStackTrace();
+		    	Object elem = null;
+		    	if (elems.length > 0) {
+		    		elem = cause.getStackTrace()[0];
+		    	} else {
+		    		elem = cause.getMessage();
+		    	}
+		        LogManager.logWarning(LogConstants.CTX_DQP, QueryPlugin.Util.getString("ProcessWorker.processing_error", e.getMessage(), requestID, e.getClass().getName(), elem)); //$NON-NLS-1$
+		    }else {
+		        LogManager.logError(LogConstants.CTX_DQP, e, QueryPlugin.Util.getString("ProcessWorker.error", requestID)); //$NON-NLS-1$
+		    }                                
+		}
+		this.processingException = e;
+		this.state = ProcessingState.CLOSE;
+	}
+
 	private void resume() throws XATransactionException {
 		if (this.transactionState == TransactionState.ACTIVE && this.transactionContext.getTransaction() != null) {
 			this.transactionService.resume(this.transactionContext);
@@ -386,60 +389,64 @@
 	 * Client close is currently implemented as asynch.
 	 * Any errors that occur will not make it to the client, instead we just log them here.
 	 */
-	protected void attemptClose() {
+	protected void close() {
 		int rowcount = -1;
-		if (this.resultsBuffer != null) {
-			if (this.processor != null) {
-				this.processor.closeProcessing();
-			
-				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-			        LogManager.logDetail(LogConstants.CTX_DQP, "Removing tuplesource for the request " + requestID); //$NON-NLS-1$
-			    }
-				rowcount = resultsBuffer.getRowCount();
-				if (this.cid == null || !this.doneProducingBatches) {
-					resultsBuffer.remove();
-				} else {
-					try {
-						this.resultsBuffer.persistLobs();
-					} catch (TeiidComponentException e) {
-						LogManager.logDetail(LogConstants.CTX_DQP, QueryPlugin.Util.getString("failed_to_cache")); //$NON-NLS-1$
+		try {
+			if (this.resultsBuffer != null) {
+				if (this.processor != null) {
+					this.processor.closeProcessing();
+				
+					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+				        LogManager.logDetail(LogConstants.CTX_DQP, "Removing tuplesource for the request " + requestID); //$NON-NLS-1$
+				    }
+					rowcount = resultsBuffer.getRowCount();
+					if (this.cid == null || !this.doneProducingBatches) {
+						resultsBuffer.remove();
+					} else {
+						try {
+							this.resultsBuffer.persistLobs();
+						} catch (TeiidComponentException e) {
+							LogManager.logDetail(LogConstants.CTX_DQP, QueryPlugin.Util.getString("failed_to_cache")); //$NON-NLS-1$
+						}
 					}
+					
+					for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
+						connectorRequest.fullyCloseSource();
+				    }
 				}
+	
+				this.resultsBuffer = null;
 				
-				for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
-					connectorRequest.fullyCloseSource();
-			    }
+				for (LobWorkItem lobWorkItem : this.lobStreams.values()) {
+					lobWorkItem.close();
+				}
 			}
-
-			this.resultsBuffer = null;
-			
-			for (LobWorkItem lobWorkItem : this.lobStreams.values()) {
-				lobWorkItem.close();
+	
+			if (this.transactionState == TransactionState.ACTIVE) { 
+				this.transactionState = TransactionState.DONE;
+	            if (transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
+					try {
+		        		this.transactionService.rollback(transactionContext);
+		            } catch (XATransactionException e1) {
+		                LogManager.logWarning(LogConstants.CTX_DQP, e1, QueryPlugin.Util.getString("ProcessWorker.failed_rollback")); //$NON-NLS-1$           
+		            } 
+				} else {
+					suspend();
+				}
 			}
-		}
-
-		if (this.transactionState == TransactionState.ACTIVE) { 
-			this.transactionState = TransactionState.DONE;
-            if (transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
-				try {
-	        		this.transactionService.rollback(transactionContext);
-	            } catch (XATransactionException e1) {
-	                LogManager.logWarning(LogConstants.CTX_DQP, e1, QueryPlugin.Util.getString("ProcessWorker.failed_rollback")); //$NON-NLS-1$           
-	            } 
+		} catch (Throwable t) {
+			handleThrowable(t);
+		} finally {
+			isClosed = true;
+	
+			dqpCore.removeRequest(this);
+		    
+			if (this.processingException != null) {
+				sendError();			
 			} else {
-				suspend();
+		        dqpCore.logMMCommand(this, Event.END, rowcount);
 			}
 		}
-		
-		isClosed = true;
-
-		dqpCore.removeRequest(this);
-	    
-		if (this.processingException != null) {
-			sendError();			
-		} else {
-	        dqpCore.logMMCommand(this, Event.END, rowcount);
-		}
 	}
 
 	protected void processNew() throws TeiidProcessingException, TeiidComponentException {

Modified: branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2011-10-05 16:09:13 UTC (rev 3531)
+++ branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2011-10-05 20:22:43 UTC (rev 3532)
@@ -197,11 +197,13 @@
 		this.bufferMgr.releaseBuffers(reserved);
 		reserved = 0;
         processorClosed = true;
-        try {
-        	processPlan.close();
-		} catch (TeiidComponentException e1){
-			LogManager.logDetail(LogConstants.CTX_DQP, e1, "Error closing processor"); //$NON-NLS-1$
-		}
+        if (initialized) {
+	        try {
+	        	processPlan.close();
+			} catch (TeiidComponentException e1){
+				LogManager.logDetail(LogConstants.CTX_DQP, e1, "Error closing processor"); //$NON-NLS-1$
+			}
+        }
     }
 
     @Override

Modified: branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-10-05 16:09:13 UTC (rev 3531)
+++ branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-10-05 20:22:43 UTC (rev 3532)
@@ -529,6 +529,20 @@
         assertNotNull(t.chunkFuture.get().getBytes());
     }
     
+    @Test public void testServerTimeout() throws Exception {
+    	RequestMessage reqMsg = exampleRequestMessage("select to_bytes(stringkey, 'utf-8') FROM BQT1.SmallA"); 
+        reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);
+        agds.setSleep(100);
+        String sessionid = "1";
+        String userName = "A";
+		DQPWorkContext.getWorkContext().getSession().setSessionId(String.valueOf(sessionid));
+        DQPWorkContext.getWorkContext().getSession().setUserName(userName);
+
+        Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
+        core.cancelRequest(reqMsg.getExecutionId());
+        assertNotNull(message.get().getException());
+    }
+    
 	public void helpTestVisibilityFails(String sql) throws Exception {
         RequestMessage reqMsg = exampleRequestMessage(sql); 
         reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);

Modified: branches/7.4.x/engine/src/test/java/org/teiid/query/processor/TestQueryProcessor.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/query/processor/TestQueryProcessor.java	2011-10-05 16:09:13 UTC (rev 3531)
+++ branches/7.4.x/engine/src/test/java/org/teiid/query/processor/TestQueryProcessor.java	2011-10-05 20:22:43 UTC (rev 3532)
@@ -22,39 +22,30 @@
 
 package org.teiid.query.processor;
 
+import static org.junit.Assert.*;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 
+import org.junit.Test;
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.BufferManagerFactory;
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.common.buffer.TupleSource;
+import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidException;
-import org.teiid.query.processor.BatchCollector;
-import org.teiid.query.processor.QueryProcessor;
 import org.teiid.query.sql.symbol.ElementSymbol;
 import org.teiid.query.util.CommandContext;
 
-import junit.framework.TestCase;
-
-
 /**
  */
-public class TestQueryProcessor extends TestCase {
+public class TestQueryProcessor {
 
-    /**
-     * Constructor for TestQueryProcessor.
-     * @param name
-     */
-    public TestQueryProcessor(String name) {
-        super(name);
-    }
-    
-    public void helpTestProcessor(FakeProcessorPlan plan, long timeslice, List[] expectedResults) throws TeiidException {
+    public void helpTestProcessor(FakeProcessorPlan plan, List[] expectedResults) throws TeiidException {
         BufferManager bufferMgr = BufferManagerFactory.getStandaloneBufferManager();
         FakeDataManager dataManager = new FakeDataManager();
 
@@ -85,14 +76,14 @@
         tsID.remove();
     }
     
-    public void testNoResults() throws Exception {
+    @Test public void testNoResults() throws Exception {
         List elements = new ArrayList();
         elements.add(new ElementSymbol("a")); //$NON-NLS-1$
         FakeProcessorPlan plan = new FakeProcessorPlan(elements, null);
-        helpTestProcessor(plan, 1000, new List[0]);    
+        helpTestProcessor(plan, new List[0]);    
     }
 
-    public void testBlockNoResults() throws Exception {
+    @Test public void testBlockNoResults() throws Exception {
         List elements = new ArrayList();
         elements.add(new ElementSymbol("a")); //$NON-NLS-1$
         
@@ -103,10 +94,10 @@
         batches.add(batch);
         
         FakeProcessorPlan plan = new FakeProcessorPlan(elements, batches);
-        helpTestProcessor(plan, 1000, new List[0]);    
+        helpTestProcessor(plan, new List[0]);    
     }
     
-    public void testProcessWithOccasionalBlocks() throws Exception {
+    @Test public void testProcessWithOccasionalBlocks() throws Exception {
         List elements = new ArrayList();
         elements.add(new ElementSymbol("a")); //$NON-NLS-1$
                 
@@ -137,6 +128,16 @@
         }
         
         FakeProcessorPlan plan = new FakeProcessorPlan(elements, batches);
-        helpTestProcessor(plan, 1000, expectedResults);                    
+        helpTestProcessor(plan, expectedResults);                    
     }
+    
+    @Test public void testCloseBeforeInitialization() throws TeiidComponentException {
+        BufferManager bufferMgr = BufferManagerFactory.getStandaloneBufferManager();
+        FakeDataManager dataManager = new FakeDataManager();
+
+        CommandContext context = new CommandContext("pid", "group", null, null, 1); //$NON-NLS-1$ //$NON-NLS-2$
+
+    	QueryProcessor processor = new QueryProcessor(null, context, bufferMgr, dataManager);
+    	processor.closeProcessing();
+    }
 }

Modified: branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java
===================================================================
--- branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java	2011-10-05 16:09:13 UTC (rev 3531)
+++ branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java	2011-10-05 20:22:43 UTC (rev 3532)
@@ -56,7 +56,7 @@
 		
 		DQPConfiguration dqpConfig = new DQPConfiguration();
 		dqpConfig.setMaxActivePlans(2);
-		FakeServer server = new FakeServer();
+		FakeServer server = new FakeServer(dqpConfig);
 		server.setUseCallingThread(false);
 		server.deployVDB("parts", UnitTestUtil.getTestDataPath() + "/PartsSupplier.vdb");
 		



More information about the teiid-commits mailing list