Author: shawkins
Date: 2010-04-25 21:01:27 -0400 (Sun, 25 Apr 2010)
New Revision: 2078
Modified:
trunk/client/src/main/java/org/teiid/adminapi/Request.java
trunk/client/src/main/java/org/teiid/adminapi/impl/RequestMetadata.java
trunk/client/src/main/java/org/teiid/adminapi/impl/RequestMetadataMapper.java
trunk/client/src/test/java/org/teiid/adminapi/impl/TestRequestMetadata.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/test/java/com/metamatrix/dqp/service/AutoGenDataService.java
Log:
TEIID-1033 changing state to processingstate and added threadstate
Modified: trunk/client/src/main/java/org/teiid/adminapi/Request.java
===================================================================
--- trunk/client/src/main/java/org/teiid/adminapi/Request.java 2010-04-23 21:31:03 UTC
(rev 2077)
+++ trunk/client/src/main/java/org/teiid/adminapi/Request.java 2010-04-26 01:01:27 UTC
(rev 2078)
@@ -36,11 +36,17 @@
*/
public interface Request extends AdminObject {
- public enum State {
+ public enum ProcessingState {
PROCESSING,
DONE,
CANCELED
}
+
+ public enum ThreadState {
+ RUNNING,
+ QUEUED,
+ IDLE
+ }
/**
* Get the ExecutionId for a Request
@@ -86,8 +92,13 @@
public Integer getNodeId();
/**
- * @return In the case that this is a source request this represents the node id.
Otherwise blank
+ * @return The request state
*/
- State getState();
+ ProcessingState getState();
+
+ /**
+ * @return The thread state
+ */
+ ThreadState getThreadState();
}
Modified: trunk/client/src/main/java/org/teiid/adminapi/impl/RequestMetadata.java
===================================================================
--- trunk/client/src/main/java/org/teiid/adminapi/impl/RequestMetadata.java 2010-04-23
21:31:03 UTC (rev 2077)
+++ trunk/client/src/main/java/org/teiid/adminapi/impl/RequestMetadata.java 2010-04-26
01:01:27 UTC (rev 2078)
@@ -43,7 +43,8 @@
private boolean sourceRequest;
private Integer nodeID;
private String transactionId;
- private State state;
+ private ProcessingState processingState = ProcessingState.PROCESSING;
+ private ThreadState threadState = ThreadState.RUNNING;
@Override
@ManagementProperty(description="Unique Identifier for Request",
readOnly=true)
@@ -57,14 +58,23 @@
@Override
@ManagementProperty(description="State of the Request", readOnly=true)
- public State getState() {
- return state;
+ public ProcessingState getState() {
+ return processingState;
}
- public void setState(State state) {
- this.state = state;
+ public void setState(ProcessingState state) {
+ this.processingState = state;
}
+ @Override
+ public ThreadState getThreadState() {
+ return threadState;
+ }
+
+ public void setThreadState(ThreadState threadState) {
+ this.threadState = threadState;
+ }
+
@Override
@ManagementProperty(description="Session ID", readOnly=true)
public String getSessionId() {
Modified: trunk/client/src/main/java/org/teiid/adminapi/impl/RequestMetadataMapper.java
===================================================================
---
trunk/client/src/main/java/org/teiid/adminapi/impl/RequestMetadataMapper.java 2010-04-23
21:31:03 UTC (rev 2077)
+++
trunk/client/src/main/java/org/teiid/adminapi/impl/RequestMetadataMapper.java 2010-04-26
01:01:27 UTC (rev 2078)
@@ -36,7 +36,7 @@
import org.jboss.metatype.plugins.types.MutableCompositeMetaType;
import org.jboss.metatype.spi.values.MetaMapper;
import org.teiid.adminapi.Request;
-import org.teiid.adminapi.Request.State;
+import org.teiid.adminapi.Request.ProcessingState;
public class RequestMetadataMapper extends MetaMapper<RequestMetadata> {
private static final String TRANSACTION_ID = "transactionId"; //$NON-NLS-1$
@@ -46,7 +46,8 @@
private static final String START_TIME = "startTime"; //$NON-NLS-1$
private static final String SESSION_ID = "sessionId"; //$NON-NLS-1$
private static final String EXECUTION_ID = "executionId"; //$NON-NLS-1$
- private static final String STATE = "state"; //$NON-NLS-1$
+ private static final String STATE = "processingState"; //$NON-NLS-1$
+ private static final String THREAD_STATE = "threadState"; //$NON-NLS-1$
private static final MutableCompositeMetaType metaType;
private static final MetaValueFactory metaValueFactory =
MetaValueFactory.getInstance();
@@ -59,8 +60,8 @@
metaType.addItem(SOURCE_REQUEST, SOURCE_REQUEST, SimpleMetaType.BOOLEAN_PRIMITIVE);
metaType.addItem(NODE_ID, NODE_ID, SimpleMetaType.INTEGER);
metaType.addItem(TRANSACTION_ID, TRANSACTION_ID, SimpleMetaType.STRING);
- EnumMetaType emt = new EnumMetaType(Request.State.values());
- metaType.addItem(STATE, STATE, emt);
+ metaType.addItem(STATE, STATE, new EnumMetaType(Request.ProcessingState.values()));
+ metaType.addItem(THREAD_STATE, THREAD_STATE, new
EnumMetaType(Request.ThreadState.values()));
metaType.freeze();
}
@@ -91,6 +92,7 @@
request.set(TRANSACTION_ID,SimpleValueSupport.wrap(object.getTransactionId()));
EnumMetaType emt = (EnumMetaType)composite.getType(STATE);
request.set(STATE, new EnumValueSupport(emt, object.getState()));
+ request.set(THREAD_STATE, new
EnumValueSupport((EnumMetaType)composite.getType(THREAD_STATE),
object.getThreadState()));
return request;
}
throw new IllegalArgumentException("Cannot convert RequestMetadata " +
object); //$NON-NLS-1$
@@ -112,7 +114,7 @@
request.setSourceRequest((Boolean)
metaValueFactory.unwrap(compositeValue.get(SOURCE_REQUEST)));
request.setNodeId((Integer) metaValueFactory.unwrap(compositeValue.get(NODE_ID)));
request.setTransactionId((String)
metaValueFactory.unwrap(compositeValue.get(TRANSACTION_ID)));
- request.setState((State) metaValueFactory.unwrap(compositeValue.get(STATE)));
+ request.setState((ProcessingState)
metaValueFactory.unwrap(compositeValue.get(STATE)));
return request;
}
throw new IllegalStateException("Unable to unwrap RequestMetadata " +
metaValue); //$NON-NLS-1$
Modified: trunk/client/src/test/java/org/teiid/adminapi/impl/TestRequestMetadata.java
===================================================================
--- trunk/client/src/test/java/org/teiid/adminapi/impl/TestRequestMetadata.java 2010-04-23
21:31:03 UTC (rev 2077)
+++ trunk/client/src/test/java/org/teiid/adminapi/impl/TestRequestMetadata.java 2010-04-26
01:01:27 UTC (rev 2078)
@@ -26,13 +26,13 @@
import org.jboss.metatype.api.values.MetaValue;
import org.junit.Test;
-import org.teiid.adminapi.Request.State;
+import org.teiid.adminapi.Request.ProcessingState;
public class TestRequestMetadata {
@Test public void testMapping() {
RequestMetadata request = new RequestMetadata();
- request.setState(State.PROCESSING);
+ request.setState(ProcessingState.PROCESSING);
RequestMetadataMapper rmm = new RequestMetadataMapper();
MetaValue mv = rmm.createMetaValue(rmm.getMetaType(), request);
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java 2010-04-23
21:31:03 UTC (rev 2077)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java 2010-04-26
01:01:27 UTC (rev 2078)
@@ -39,5 +39,7 @@
void close();
AtomicResultsMessage execute() throws ConnectorException, BlockedException;
-
+
+ boolean isQueued();
+
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-04-23
21:31:03 UTC (rev 2077)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-04-26
01:01:27 UTC (rev 2078)
@@ -142,6 +142,11 @@
public AbstractWorkItem getParent() {
return awi;
}
+
+ @Override
+ public boolean isQueued() {
+ return this.permitMode == PermitMode.BLOCKED;
+ }
public void cancel() {
try {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2010-04-23
21:31:03 UTC (rev 2077)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2010-04-26
01:01:27 UTC (rev 2078)
@@ -40,7 +40,7 @@
}
private ThreadState threadState = ThreadState.MORE_WORK;
- private volatile boolean release;
+ private volatile boolean isProcessing;
public void run() {
startProcessing();
@@ -55,7 +55,12 @@
return this.threadState;
}
+ public boolean isProcessing() {
+ return isProcessing;
+ }
+
private synchronized void startProcessing() {
+ isProcessing = true;
logTrace("start processing"); //$NON-NLS-1$
if (this.threadState != ThreadState.MORE_WORK) {
throw new IllegalStateException("Must be in MORE_WORK"); //$NON-NLS-1$
@@ -64,6 +69,7 @@
}
private synchronized void endProcessing() {
+ isProcessing = false;
logTrace("end processing"); //$NON-NLS-1$
switch (this.threadState) {
case WORKING:
@@ -131,16 +137,12 @@
public abstract String toString();
+ @Override
+ public void release() {
+
+ }
+
@Override
- public void release() {
- this.release = true;
- }
-
- public boolean shouldAbortProcessing() {
- return this.release;
- }
-
- @Override
public void workAccepted(WorkEvent arg0) {
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-04-23
21:31:03 UTC (rev 2077)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-04-26
01:01:27 UTC (rev 2078)
@@ -42,7 +42,8 @@
import org.teiid.adminapi.Admin;
import org.teiid.adminapi.AdminException;
-import org.teiid.adminapi.Request.State;
+import org.teiid.adminapi.Request.ProcessingState;
+import org.teiid.adminapi.Request.ThreadState;
import org.teiid.adminapi.impl.RequestMetadata;
import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
import org.teiid.client.DQP;
@@ -247,7 +248,19 @@
req.setSessionId(holder.requestID.getConnectionID());
req.setCommand(holder.requestMsg.getCommandString());
req.setStartTime(holder.getProcessingTimestamp());
-
req.setState(holder.isCanceled()?State.CANCELED:holder.isDoneProcessing()?State.DONE:State.PROCESSING);
+
req.setState(holder.isCanceled()?ProcessingState.CANCELED:holder.isDoneProcessing()?ProcessingState.DONE:ProcessingState.PROCESSING);
+ switch (holder.getThreadState()) {
+ case DONE:
+ case IDLE:
+ req.setThreadState(ThreadState.IDLE);
+ break;
+ default:
+ if (holder.isProcessing()) {
+ req.setThreadState(ThreadState.RUNNING);
+ } else {
+ req.setThreadState(ThreadState.QUEUED);
+ }
+ }
if (holder.getTransactionContext() != null &&
holder.getTransactionContext().getTransactionType() != Scope.NONE) {
req.setTransactionId(holder.getTransactionContext().getTransactionId());
}
@@ -262,14 +275,20 @@
// add all the subrequest messages
AtomicRequestMessage arm = conInfo.getAtomicRequestMessage();
RequestMetadata info = new RequestMetadata();
-
+ if (conInfo.isQueued()) {
+ info.setThreadState(ThreadState.QUEUED);
+ } else if (conInfo.isRunning()) {
+ info.setThreadState(ThreadState.RUNNING);
+ } else {
+ info.setThreadState(ThreadState.IDLE);
+ }
info.setExecutionId(arm.getRequestID().getExecutionID());
info.setSessionId(holder.requestID.getConnectionID());
info.setCommand(arm.getCommand().toString());
info.setStartTime(arm.getProcessingTimestamp());
info.setSourceRequest(true);
info.setNodeId(arm.getAtomicRequestID().getNodeID());
-
info.setState(conInfo.isCanceled()?State.CANCELED:conInfo.isDone()?State.DONE:State.PROCESSING);
+
info.setState(conInfo.isCanceled()?ProcessingState.CANCELED:conInfo.isDone()?ProcessingState.DONE:ProcessingState.PROCESSING);
results.add(info);
}
results.add(req);
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-04-23
21:31:03 UTC (rev 2077)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-04-26
01:01:27 UTC (rev 2078)
@@ -57,6 +57,8 @@
private boolean closed;
private volatile boolean canceled;
+ private volatile boolean running;
+
/**
* Constructor for DataTierTupleSource.
*/
@@ -87,12 +89,19 @@
return null;
}
try {
+ running = true;
receiveResults(this.cwi.more());
} catch (ConnectorException e) {
exceptionOccurred(e, true);
+ } finally {
+ running = false;
}
}
}
+
+ public boolean isQueued() {
+ return this.cwi != null && this.cwi.isQueued();
+ }
public boolean isDone() {
return this.arm != null && this.arm.getFinalRow() > 0;
@@ -105,12 +114,19 @@
Assertion.isNull(workItem.getConnectorRequest(aqr.getAtomicRequestID()));
workItem.addConnectorRequest(aqr.getAtomicRequestID(), this);
}
+ running = true;
receiveResults(this.cwi.execute());
} catch (ConnectorException e) {
exceptionOccurred(e, true);
+ } finally {
+ running = false;
}
}
+ public boolean isRunning() {
+ return running;
+ }
+
public void fullyCloseSource() {
if (!closed) {
if (cwi != null) {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-04-23
21:31:03 UTC (rev 2077)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-04-26
01:01:27 UTC (rev 2078)
@@ -568,7 +568,6 @@
try {
transactionService.cancelTransactions(requestID.getConnectionID(),
true);
} catch (XATransactionException err) {
- LogManager.logWarning(LogConstants.CTX_DQP, "rollback failed
for requestID=" + requestID.getConnectionID()); //$NON-NLS-1$
throw new MetaMatrixComponentException(err);
}
}
@@ -682,4 +681,13 @@
return processingTimestamp;
}
+ @Override
+ public void release() {
+ try {
+ requestCancel();
+ } catch (MetaMatrixComponentException e) {
+ LogManager.logWarning(LogConstants.CTX_DQP, e, "Failed to cancel " +
requestID); //$NON-NLS-1$
+ }
+ }
+
}
\ No newline at end of file
Modified: trunk/engine/src/test/java/com/metamatrix/dqp/service/AutoGenDataService.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/dqp/service/AutoGenDataService.java 2010-04-23
21:31:03 UTC (rev 2077)
+++
trunk/engine/src/test/java/com/metamatrix/dqp/service/AutoGenDataService.java 2010-04-26
01:01:27 UTC (rev 2078)
@@ -104,6 +104,11 @@
public void cancel() {
}
+
+ @Override
+ public boolean isQueued() {
+ return false;
+ }
};
}