Author: shawkins
Date: 2011-10-05 16:22:43 -0400 (Wed, 05 Oct 2011)
New Revision: 3532
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
branches/7.4.x/engine/src/test/java/org/teiid/query/processor/TestQueryProcessor.java
branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java
Log:
TEIID-1769 fix for hangs from close
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-10-05
16:09:13 UTC (rev 3531)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-10-05
20:22:43 UTC (rev 3532)
@@ -291,37 +291,9 @@
LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread",
requestID, "- time slice expired"); //$NON-NLS-1$ //$NON-NLS-2$
this.moreWork();
} catch (Throwable e) {
- LogManager.logDetail(LogConstants.CTX_DQP, e, "Request Thread",
requestID, "- error occurred"); //$NON-NLS-1$ //$NON-NLS-2$
-
- if (!isCanceled()) {
- dqpCore.logMMCommand(this, Event.ERROR, null);
- //Case 5558: Differentiate between system level errors and
- //processing errors. Only log system level errors as errors,
- //log the processing errors as warnings only
- if(e instanceof TeiidProcessingException) {
- Throwable cause = e;
- while (cause.getCause() != null && cause.getCause() != cause) {
- cause = cause.getCause();
- }
- StackTraceElement[] elems = cause.getStackTrace();
- Object elem = null;
- if (elems.length > 0) {
- elem = cause.getStackTrace()[0];
- } else {
- elem = cause.getMessage();
- }
- LogManager.logWarning(LogConstants.CTX_DQP,
QueryPlugin.Util.getString("ProcessWorker.processing_error", e.getMessage(),
requestID, e.getClass().getName(), elem)); //$NON-NLS-1$
- }else {
- LogManager.logError(LogConstants.CTX_DQP, e,
QueryPlugin.Util.getString("ProcessWorker.error", requestID)); //$NON-NLS-1$
- }
- }
-
- this.processingException = e;
- this.state = ProcessingState.CLOSE;
+ handleThrowable(e);
} finally {
- if (this.state == ProcessingState.CLOSE && !isClosed) {
- attemptClose();
- } else if (isClosed) {
+ if (isClosed) {
/*
* since there may be a client waiting notify them of a problem
*/
@@ -329,11 +301,42 @@
this.processingException = new IllegalStateException("Request is already
closed"); //$NON-NLS-1$
}
sendError();
+ } else if (this.state == ProcessingState.CLOSE) {
+ close();
}
suspend();
}
}
+ private void handleThrowable(Throwable e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, e, "Request Thread", requestID,
"- error occurred"); //$NON-NLS-1$ //$NON-NLS-2$
+
+ if (!isCanceled()) {
+ dqpCore.logMMCommand(this, Event.ERROR, null);
+ //Case 5558: Differentiate between system level errors and
+ //processing errors. Only log system level errors as errors,
+ //log the processing errors as warnings only
+ if(e instanceof TeiidProcessingException) {
+ Throwable cause = e;
+ while (cause.getCause() != null && cause.getCause() != cause) {
+ cause = cause.getCause();
+ }
+ StackTraceElement[] elems = cause.getStackTrace();
+ Object elem = null;
+ if (elems.length > 0) {
+ elem = cause.getStackTrace()[0];
+ } else {
+ elem = cause.getMessage();
+ }
+ LogManager.logWarning(LogConstants.CTX_DQP,
QueryPlugin.Util.getString("ProcessWorker.processing_error", e.getMessage(),
requestID, e.getClass().getName(), elem)); //$NON-NLS-1$
+ }else {
+ LogManager.logError(LogConstants.CTX_DQP, e,
QueryPlugin.Util.getString("ProcessWorker.error", requestID)); //$NON-NLS-1$
+ }
+ }
+ this.processingException = e;
+ this.state = ProcessingState.CLOSE;
+ }
+
private void resume() throws XATransactionException {
if (this.transactionState == TransactionState.ACTIVE &&
this.transactionContext.getTransaction() != null) {
this.transactionService.resume(this.transactionContext);
@@ -386,60 +389,64 @@
* Client close is currently implemented as asynch.
* Any errors that occur will not make it to the client, instead we just log them here.
*/
- protected void attemptClose() {
+ protected void close() {
int rowcount = -1;
- if (this.resultsBuffer != null) {
- if (this.processor != null) {
- this.processor.closeProcessing();
-
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Removing tuplesource for the
request " + requestID); //$NON-NLS-1$
- }
- rowcount = resultsBuffer.getRowCount();
- if (this.cid == null || !this.doneProducingBatches) {
- resultsBuffer.remove();
- } else {
- try {
- this.resultsBuffer.persistLobs();
- } catch (TeiidComponentException e) {
- LogManager.logDetail(LogConstants.CTX_DQP,
QueryPlugin.Util.getString("failed_to_cache")); //$NON-NLS-1$
+ try {
+ if (this.resultsBuffer != null) {
+ if (this.processor != null) {
+ this.processor.closeProcessing();
+
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "Removing tuplesource for the
request " + requestID); //$NON-NLS-1$
+ }
+ rowcount = resultsBuffer.getRowCount();
+ if (this.cid == null || !this.doneProducingBatches) {
+ resultsBuffer.remove();
+ } else {
+ try {
+ this.resultsBuffer.persistLobs();
+ } catch (TeiidComponentException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP,
QueryPlugin.Util.getString("failed_to_cache")); //$NON-NLS-1$
+ }
}
+
+ for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
+ connectorRequest.fullyCloseSource();
+ }
}
+
+ this.resultsBuffer = null;
- for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
- connectorRequest.fullyCloseSource();
- }
+ for (LobWorkItem lobWorkItem : this.lobStreams.values()) {
+ lobWorkItem.close();
+ }
}
-
- this.resultsBuffer = null;
-
- for (LobWorkItem lobWorkItem : this.lobStreams.values()) {
- lobWorkItem.close();
+
+ if (this.transactionState == TransactionState.ACTIVE) {
+ this.transactionState = TransactionState.DONE;
+ if (transactionContext.getTransactionType() ==
TransactionContext.Scope.REQUEST) {
+ try {
+ this.transactionService.rollback(transactionContext);
+ } catch (XATransactionException e1) {
+ LogManager.logWarning(LogConstants.CTX_DQP, e1,
QueryPlugin.Util.getString("ProcessWorker.failed_rollback")); //$NON-NLS-1$
+ }
+ } else {
+ suspend();
+ }
}
- }
-
- if (this.transactionState == TransactionState.ACTIVE) {
- this.transactionState = TransactionState.DONE;
- if (transactionContext.getTransactionType() ==
TransactionContext.Scope.REQUEST) {
- try {
- this.transactionService.rollback(transactionContext);
- } catch (XATransactionException e1) {
- LogManager.logWarning(LogConstants.CTX_DQP, e1,
QueryPlugin.Util.getString("ProcessWorker.failed_rollback")); //$NON-NLS-1$
- }
+ } catch (Throwable t) {
+ handleThrowable(t);
+ } finally {
+ isClosed = true;
+
+ dqpCore.removeRequest(this);
+
+ if (this.processingException != null) {
+ sendError();
} else {
- suspend();
+ dqpCore.logMMCommand(this, Event.END, rowcount);
}
}
-
- isClosed = true;
-
- dqpCore.removeRequest(this);
-
- if (this.processingException != null) {
- sendError();
- } else {
- dqpCore.logMMCommand(this, Event.END, rowcount);
- }
}
protected void processNew() throws TeiidProcessingException, TeiidComponentException {
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2011-10-05
16:09:13 UTC (rev 3531)
+++
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2011-10-05
20:22:43 UTC (rev 3532)
@@ -197,11 +197,13 @@
this.bufferMgr.releaseBuffers(reserved);
reserved = 0;
processorClosed = true;
- try {
- processPlan.close();
- } catch (TeiidComponentException e1){
- LogManager.logDetail(LogConstants.CTX_DQP, e1, "Error closing processor");
//$NON-NLS-1$
- }
+ if (initialized) {
+ try {
+ processPlan.close();
+ } catch (TeiidComponentException e1){
+ LogManager.logDetail(LogConstants.CTX_DQP, e1, "Error closing processor");
//$NON-NLS-1$
+ }
+ }
}
@Override
Modified:
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
---
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-10-05
16:09:13 UTC (rev 3531)
+++
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-10-05
20:22:43 UTC (rev 3532)
@@ -529,6 +529,20 @@
assertNotNull(t.chunkFuture.get().getBytes());
}
+ @Test public void testServerTimeout() throws Exception {
+ RequestMessage reqMsg = exampleRequestMessage("select to_bytes(stringkey,
'utf-8') FROM BQT1.SmallA");
+ reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);
+ agds.setSleep(100);
+ String sessionid = "1";
+ String userName = "A";
+ DQPWorkContext.getWorkContext().getSession().setSessionId(String.valueOf(sessionid));
+ DQPWorkContext.getWorkContext().getSession().setUserName(userName);
+
+ Future<ResultsMessage> message =
core.executeRequest(reqMsg.getExecutionId(), reqMsg);
+ core.cancelRequest(reqMsg.getExecutionId());
+ assertNotNull(message.get().getException());
+ }
+
public void helpTestVisibilityFails(String sql) throws Exception {
RequestMessage reqMsg = exampleRequestMessage(sql);
reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);
Modified:
branches/7.4.x/engine/src/test/java/org/teiid/query/processor/TestQueryProcessor.java
===================================================================
---
branches/7.4.x/engine/src/test/java/org/teiid/query/processor/TestQueryProcessor.java 2011-10-05
16:09:13 UTC (rev 3531)
+++
branches/7.4.x/engine/src/test/java/org/teiid/query/processor/TestQueryProcessor.java 2011-10-05
20:22:43 UTC (rev 3532)
@@ -22,39 +22,30 @@
package org.teiid.query.processor;
+import static org.junit.Assert.*;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import org.junit.Test;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.TupleSource;
+import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidException;
-import org.teiid.query.processor.BatchCollector;
-import org.teiid.query.processor.QueryProcessor;
import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.util.CommandContext;
-import junit.framework.TestCase;
-
-
/**
*/
-public class TestQueryProcessor extends TestCase {
+public class TestQueryProcessor {
- /**
- * Constructor for TestQueryProcessor.
- * @param name
- */
- public TestQueryProcessor(String name) {
- super(name);
- }
-
- public void helpTestProcessor(FakeProcessorPlan plan, long timeslice, List[]
expectedResults) throws TeiidException {
+ public void helpTestProcessor(FakeProcessorPlan plan, List[] expectedResults) throws
TeiidException {
BufferManager bufferMgr = BufferManagerFactory.getStandaloneBufferManager();
FakeDataManager dataManager = new FakeDataManager();
@@ -85,14 +76,14 @@
tsID.remove();
}
- public void testNoResults() throws Exception {
+ @Test public void testNoResults() throws Exception {
List elements = new ArrayList();
elements.add(new ElementSymbol("a")); //$NON-NLS-1$
FakeProcessorPlan plan = new FakeProcessorPlan(elements, null);
- helpTestProcessor(plan, 1000, new List[0]);
+ helpTestProcessor(plan, new List[0]);
}
- public void testBlockNoResults() throws Exception {
+ @Test public void testBlockNoResults() throws Exception {
List elements = new ArrayList();
elements.add(new ElementSymbol("a")); //$NON-NLS-1$
@@ -103,10 +94,10 @@
batches.add(batch);
FakeProcessorPlan plan = new FakeProcessorPlan(elements, batches);
- helpTestProcessor(plan, 1000, new List[0]);
+ helpTestProcessor(plan, new List[0]);
}
- public void testProcessWithOccasionalBlocks() throws Exception {
+ @Test public void testProcessWithOccasionalBlocks() throws Exception {
List elements = new ArrayList();
elements.add(new ElementSymbol("a")); //$NON-NLS-1$
@@ -137,6 +128,16 @@
}
FakeProcessorPlan plan = new FakeProcessorPlan(elements, batches);
- helpTestProcessor(plan, 1000, expectedResults);
+ helpTestProcessor(plan, expectedResults);
}
+
+ @Test public void testCloseBeforeInitialization() throws TeiidComponentException {
+ BufferManager bufferMgr = BufferManagerFactory.getStandaloneBufferManager();
+ FakeDataManager dataManager = new FakeDataManager();
+
+ CommandContext context = new CommandContext("pid", "group",
null, null, 1); //$NON-NLS-1$ //$NON-NLS-2$
+
+ QueryProcessor processor = new QueryProcessor(null, context, bufferMgr,
dataManager);
+ processor.closeProcessing();
+ }
}
Modified:
branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java
===================================================================
---
branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java 2011-10-05
16:09:13 UTC (rev 3531)
+++
branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java 2011-10-05
20:22:43 UTC (rev 3532)
@@ -56,7 +56,7 @@
DQPConfiguration dqpConfig = new DQPConfiguration();
dqpConfig.setMaxActivePlans(2);
- FakeServer server = new FakeServer();
+ FakeServer server = new FakeServer(dqpConfig);
server.setUseCallingThread(false);
server.deployVDB("parts", UnitTestUtil.getTestDataPath() +
"/PartsSupplier.vdb");