Author: shawkins
Date: 2010-04-22 18:03:05 -0400 (Thu, 22 Apr 2010)
New Revision: 2075
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/test/java/com/metamatrix/dqp/service/AutoGenDataService.java
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java
Log:
TEIID-1015 adding back queuing logic for non-transactional requests
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2010-04-22
21:17:22 UTC (rev 2074)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2010-04-22
22:03:05 UTC (rev 2075)
@@ -26,6 +26,7 @@
*/
package org.teiid.dqp.internal.datamgr.impl;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@@ -46,10 +47,12 @@
import org.teiid.connector.metadata.runtime.MetadataFactory;
import org.teiid.connector.metadata.runtime.MetadataStore;
import org.teiid.dqp.internal.cache.DQPContextCache;
+import org.teiid.dqp.internal.datamgr.impl.ConnectorWorkItem.PermitMode;
+import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.logging.api.CommandLogMessage;
import org.teiid.logging.api.CommandLogMessage.Event;
-import org.teiid.security.SecurityHelper;
+import com.metamatrix.common.buffer.BlockedException;
import com.metamatrix.common.log.LogConstants;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.log.MessageLevel;
@@ -70,17 +73,9 @@
@ManagementObject(isRuntime=true,
componentType=@ManagementComponent(type="teiid",subtype="connectormanager"),
properties=ManagementProperties.EXPLICIT)
public class ConnectorManager {
- public enum ConnectorStatus {
- NOT_INITIALIZED, INIT_FAILED, OPEN, DATA_SOURCE_UNAVAILABLE, CLOSED, UNABLE_TO_CHECK;
- }
-
public static final int DEFAULT_MAX_THREADS = 20;
private String connectorName;
- private SecurityHelper securityHelper;
-
- private volatile ConnectorStatus state = ConnectorStatus.NOT_INITIALIZED;
-
//services acquired in start
private BufferService bufferService;
@@ -88,25 +83,42 @@
private ConcurrentHashMap<AtomicRequestID, ConnectorWorkItem> requestStates =
new ConcurrentHashMap<AtomicRequestID, ConnectorWorkItem>();
private SourceCapabilities cachedCapabilities;
+
+ private int currentConnections;
+ private int maxConnections;
+ private LinkedList<ConnectorWorkItem> queuedRequests = new
LinkedList<ConnectorWorkItem>();
+
+ private volatile boolean stopped;
public ConnectorManager(String name) {
- this(name, DEFAULT_MAX_THREADS, null);
+ this(name, DEFAULT_MAX_THREADS);
}
- public ConnectorManager(String name, int maxThreads, SecurityHelper securityHelper)
{
+ public ConnectorManager(String name, int maxThreads) {
if (name == null) {
throw new IllegalArgumentException("Connector name can not be null");
//$NON-NLS-1$
}
if (maxThreads <= 0) {
maxThreads = DEFAULT_MAX_THREADS;
}
+ this.maxConnections = maxThreads;
this.connectorName = name;
- this.securityHelper = securityHelper;
}
- SecurityHelper getSecurityHelper() {
- return securityHelper;
- }
+ public synchronized void acquireConnectionLock(ConnectorWorkItem item) throws
BlockedException {
+ switch (item.getPermitMode()) {
+ case NOT_ACQUIRED:
+ if (currentConnections < maxConnections) {
+ currentConnections++;
+ item.setPermitMode(PermitMode.ACQUIRED);
+ return;
+ }
+ queuedRequests.add(item);
+ item.setPermitMode(PermitMode.BLOCKED);
+ case BLOCKED:
+ throw BlockedException.INSTANCE;
+ }
+ }
public String getName() {
return this.connectorName;
@@ -129,7 +141,6 @@
return factory.getMetadataStore();
}
-
public SourceCapabilities getCapabilities() throws ConnectorException {
if (cachedCapabilities != null) {
return cachedCapabilities;
@@ -161,13 +172,13 @@
}
}
- public ConnectorWork executeRequest(AtomicRequestMessage message) throws
ConnectorException {
+ public ConnectorWork executeRequest(AtomicRequestMessage message, AbstractWorkItem
awi) throws ConnectorException {
// Set the connector ID to be used; if not already set.
checkStatus();
AtomicRequestID atomicRequestId = message.getAtomicRequestID();
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {atomicRequestId,
"Create State"}); //$NON-NLS-1$
- ConnectorWorkItem item = new ConnectorWorkItem(message, this);
+ ConnectorWorkItem item = new ConnectorWorkItem(message, awi, this);
Assertion.isNull(requestStates.put(atomicRequestId, item), "State already
existed"); //$NON-NLS-1$
return item;
}
@@ -182,7 +193,18 @@
*/
void removeState(AtomicRequestID id) {
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {id, "Remove
State"}); //$NON-NLS-1$
- requestStates.remove(id);
+ ConnectorWorkItem cwi = requestStates.remove(id);
+ if (cwi != null && cwi.getPermitMode() == PermitMode.ACQUIRED) {
+ synchronized (this) {
+ ConnectorWorkItem next = queuedRequests.pollFirst();
+ if (next == null) {
+ currentConnections--;
+ return;
+ }
+ next.setPermitMode(PermitMode.ACQUIRED);
+ next.getParent().moreWork();
+ }
+ }
}
int size() {
@@ -196,28 +218,15 @@
/**
* initialize this <code>ConnectorManager</code>.
*/
- public synchronized void start() {
- if (this.state != ConnectorStatus.NOT_INITIALIZED) {
- return;
- }
- this.state = ConnectorStatus.INIT_FAILED;
-
+ public void start() {
LogManager.logDetail(LogConstants.CTX_CONNECTOR,
DQPPlugin.Util.getString("ConnectorManagerImpl.Initializing_connector",
connectorName)); //$NON-NLS-1$
-
- this.state = ConnectorStatus.OPEN;
}
/**
* Stop this connector.
*/
- public void stop() {
- synchronized (this) {
- if (this.state == ConnectorStatus.CLOSED) {
- return;
- }
- this.state= ConnectorStatus.CLOSED;
- }
-
+ public void stop() {
+ stopped = true;
//ensure that all requests receive a response
for (ConnectorWork workItem : this.requestStates.values()) {
workItem.cancel();
@@ -289,12 +298,8 @@
return null;
}
- public ConnectorStatus getStatus() {
- return this.state;
- }
-
private void checkStatus() throws ConnectorException {
- if (this.state != ConnectorStatus.OPEN) {
+ if (stopped) {
throw new
ConnectorException(DQPPlugin.Util.getString("ConnectorManager.not_in_valid_state",
this.connectorName)); //$NON-NLS-1$
}
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java 2010-04-22
21:17:22 UTC (rev 2074)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java 2010-04-22
22:03:05 UTC (rev 2075)
@@ -24,6 +24,7 @@
import org.teiid.connector.api.ConnectorException;
+import com.metamatrix.common.buffer.BlockedException;
import com.metamatrix.dqp.message.AtomicResultsMessage;
/**
@@ -37,6 +38,6 @@
void close();
- AtomicResultsMessage execute() throws ConnectorException;
+ AtomicResultsMessage execute() throws ConnectorException, BlockedException;
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-04-22
21:17:22 UTC (rev 2074)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-04-22
22:03:05 UTC (rev 2075)
@@ -42,9 +42,11 @@
import org.teiid.connector.metadata.runtime.RuntimeMetadata;
import org.teiid.dqp.internal.datamgr.language.LanguageBridgeFactory;
import org.teiid.dqp.internal.datamgr.metadata.RuntimeMetadataImpl;
+import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.logging.api.CommandLogMessage.Event;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
+import com.metamatrix.common.buffer.BlockedException;
import com.metamatrix.common.buffer.TupleBuffer;
import com.metamatrix.common.log.LogConstants;
import com.metamatrix.common.log.LogManager;
@@ -64,12 +66,18 @@
public class ConnectorWorkItem implements ConnectorWork {
+ enum PermitMode {
+ BLOCKED, ACQUIRED, NOT_ACQUIRED
+ }
+
/* Permanent state members */
private AtomicRequestID id;
private ConnectorManager manager;
private AtomicRequestMessage requestMsg;
private Connector connector;
private QueryMetadataInterface queryMetadata;
+ private PermitMode permitMode = PermitMode.NOT_ACQUIRED;
+ private AbstractWorkItem awi;
/* Created on new request */
private Connection connection;
@@ -89,7 +97,7 @@
private AtomicBoolean isCancelled = new AtomicBoolean();
- ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager) throws
ConnectorException {
+ ConnectorWorkItem(AtomicRequestMessage message, AbstractWorkItem awi,
ConnectorManager manager) throws ConnectorException {
this.id = message.getAtomicRequestID();
this.requestMsg = message;
this.manager = manager;
@@ -116,11 +124,24 @@
if (requestMsg.isTransactional() && this.connectorEnv.isXaCapable()) {
this.securityContext.setTransactional(true);
}
+ this.awi = awi;
}
public AtomicRequestID getId() {
return id;
}
+
+ public PermitMode getPermitMode() {
+ return permitMode;
+ }
+
+ public void setPermitMode(PermitMode permitMode) {
+ this.permitMode = permitMode;
+ }
+
+ public AbstractWorkItem getParent() {
+ return awi;
+ }
public void cancel() {
try {
@@ -196,10 +217,14 @@
return new ConnectorException(t);
}
- public AtomicResultsMessage execute() throws ConnectorException {
+ public AtomicResultsMessage execute() throws ConnectorException, BlockedException {
if(isCancelled()) {
throw new ConnectorException("Request canceled"); //$NON-NLS-1$
}
+
+ if (!this.securityContext.isTransactional()) {
+ this.manager.acquireConnectionLock(this);
+ }
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[]
{this.requestMsg.getAtomicRequestID(), "Processing NEW request:",
this.requestMsg.getCommand()}); //$NON-NLS-1$
try {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2010-04-22
21:17:22 UTC (rev 2074)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2010-04-22
22:03:05 UTC (rev 2075)
@@ -92,7 +92,7 @@
return this.threadState == ThreadState.IDLE;
}
- protected void moreWork() {
+ public void moreWork() {
moreWork(true);
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2010-04-22
21:17:22 UTC (rev 2074)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2010-04-22
22:03:05 UTC (rev 2075)
@@ -346,8 +346,8 @@
return aqr;
}
- ConnectorWork executeRequest(AtomicRequestMessage aqr, String connectorName) throws
ConnectorException {
- return getCM(connectorName).executeRequest(aqr);
+ ConnectorWork executeRequest(AtomicRequestMessage aqr, AbstractWorkItem awi, String
connectorName) throws ConnectorException {
+ return getCM(connectorName).executeRequest(aqr, awi);
}
/**
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-04-22
21:17:22 UTC (rev 2074)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-04-22
22:03:05 UTC (rev 2075)
@@ -101,7 +101,7 @@
void open() throws MetaMatrixComponentException, MetaMatrixProcessingException {
try {
if (this.cwi == null) {
- this.cwi = this.dataMgr.executeRequest(aqr, this.connectorName);
+ this.cwi = this.dataMgr.executeRequest(aqr, this.workItem,
this.connectorName);
Assertion.isNull(workItem.getConnectorRequest(aqr.getAtomicRequestID()));
workItem.addConnectorRequest(aqr.getAtomicRequestID(), this);
}
Modified: trunk/engine/src/test/java/com/metamatrix/dqp/service/AutoGenDataService.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/dqp/service/AutoGenDataService.java 2010-04-22
21:17:22 UTC (rev 2074)
+++
trunk/engine/src/test/java/com/metamatrix/dqp/service/AutoGenDataService.java 2010-04-22
22:03:05 UTC (rev 2075)
@@ -33,6 +33,7 @@
import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
import org.teiid.dqp.internal.datamgr.impl.ConnectorWork;
import org.teiid.dqp.internal.datamgr.impl.ConnectorWorkItem;
+import org.teiid.dqp.internal.process.AbstractWorkItem;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.dqp.message.AtomicRequestMessage;
@@ -67,7 +68,7 @@
}
@Override
- public ConnectorWork executeRequest(AtomicRequestMessage message)
+ public ConnectorWork executeRequest(AtomicRequestMessage message, AbstractWorkItem
awi)
throws ConnectorException {
if (throwExceptionOnExecute) {
throw new ConnectorException("Connector Exception"); //$NON-NLS-1$
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java 2010-04-22
21:17:22 UTC (rev 2074)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java 2010-04-22
22:03:05 UTC (rev 2075)
@@ -31,7 +31,9 @@
import org.mockito.Mockito;
import org.teiid.connector.api.Connector;
import org.teiid.connector.api.ConnectorEnvironment;
+import org.teiid.dqp.internal.process.AbstractWorkItem;
+import com.metamatrix.common.buffer.BlockedException;
import com.metamatrix.dqp.message.AtomicRequestID;
import com.metamatrix.dqp.message.AtomicRequestMessage;
import com.metamatrix.dqp.message.RequestID;
@@ -46,7 +48,7 @@
static ConnectorManager getConnectorManager(ConnectorEnvironment env) throws Exception
{
final FakeConnector c = new FakeConnector();
c.setConnectorEnvironment(env);
- ConnectorManager cm = new ConnectorManager("FakeConnector") { //$NON-NLS-1$
+ ConnectorManager cm = new ConnectorManager("FakeConnector", 1) {
//$NON-NLS-1$
Connector getConnector() {
return c;
}
@@ -70,7 +72,7 @@
}
void helpAssureOneState() throws Exception {
- csm.executeRequest(request);
+ csm.executeRequest(request, Mockito.mock(AbstractWorkItem.class));
ConnectorWork state = csm.getState(request.getAtomicRequestID());
assertEquals(state, csm.getState(request.getAtomicRequestID()));
}
@@ -102,5 +104,34 @@
assertEquals("Expected size of 1", 1, csm.size()); //$NON-NLS-1$
}
+
+ public void testQueuing() throws Exception {
+ ConnectorWork workItem = csm.executeRequest(request,
Mockito.mock(AbstractWorkItem.class));
+ workItem.execute();
+
+ AbstractWorkItem awi1 = Mockito.mock(AbstractWorkItem.class);
+ ConnectorWork workItem1 =
csm.executeRequest(TestConnectorWorkItem.createNewAtomicRequestMessage(2, 1), awi1);
+
+ AbstractWorkItem awi2 = Mockito.mock(AbstractWorkItem.class);
+ ConnectorWork workItem2 =
csm.executeRequest(TestConnectorWorkItem.createNewAtomicRequestMessage(3, 1), awi2);
+
+ try {
+ workItem1.execute();
+ fail("expected exception"); //$NON-NLS-1$
+ } catch (BlockedException e) {
+
+ }
+ workItem.close();
+
+ try {
+ workItem2.execute(); //ensure that another item cannot jump in the queue
+ fail("expected exception"); //$NON-NLS-1$
+ } catch (BlockedException e) {
+
+ }
+
+ Mockito.verify(awi1).moreWork();
+ workItem1.execute();
+ }
}
\ No newline at end of file
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java 2010-04-22
21:17:22 UTC (rev 2074)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java 2010-04-22
22:03:05 UTC (rev 2075)
@@ -38,6 +38,7 @@
import org.teiid.connector.api.ProcedureExecution;
import org.teiid.connector.language.Call;
import org.teiid.dqp.internal.datamgr.language.LanguageBridgeFactory;
+import org.teiid.dqp.internal.process.AbstractWorkItem;
import org.teiid.dqp.internal.process.DQPWorkContext;
import com.metamatrix.dqp.message.AtomicRequestMessage;
@@ -116,7 +117,8 @@
Command command = helpGetCommand("update bqt1.smalla set stringkey = 1 where
stringkey = 2", EXAMPLE_BQT); //$NON-NLS-1$
AtomicRequestMessage arm = createNewAtomicRequestMessage(1, 1);
arm.setCommand(command);
- ConnectorWorkItem synchConnectorWorkItem = new ConnectorWorkItem(arm,
TestConnectorManager.getConnectorManager(Mockito.mock(ConnectorEnvironment.class)));
+ ConnectorWorkItem synchConnectorWorkItem = new ConnectorWorkItem(arm,
Mockito.mock(AbstractWorkItem.class),
+ TestConnectorManager.getConnectorManager(Mockito.mock(ConnectorEnvironment.class)));
return synchConnectorWorkItem.execute();
}
@@ -150,7 +152,7 @@
return Mockito.mock(Xid.class);
}} );
- new ConnectorWorkItem(requestMsg, cm);
+ new ConnectorWorkItem(requestMsg, Mockito.mock(AbstractWorkItem.class), cm);
}
@Ignore
@@ -178,7 +180,7 @@
return Mockito.mock(Xid.class);
}} );
- new ConnectorWorkItem(requestMsg, cm);
+ new ConnectorWorkItem(requestMsg, Mockito.mock(AbstractWorkItem.class), cm);
}
}
Modified:
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java 2010-04-22
21:17:22 UTC (rev 2074)
+++
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java 2010-04-22
22:03:05 UTC (rev 2075)
@@ -95,7 +95,7 @@
ConnectorManager createConnectorManger(String deployedConnectorName, int maxThreads)
{
- ConnectorManager mgr = new ConnectorManager(deployedConnectorName, maxThreads,
securityHelper);
+ ConnectorManager mgr = new ConnectorManager(deployedConnectorName, maxThreads);
return mgr;
}