[teiid-commits] teiid SVN: r2075 - in trunk: engine/src/main/java/org/teiid/dqp/internal/process and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Apr 22 18:03:07 EDT 2010


Author: shawkins
Date: 2010-04-22 18:03:05 -0400 (Thu, 22 Apr 2010)
New Revision: 2075

Modified:
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/test/java/com/metamatrix/dqp/service/AutoGenDataService.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
   trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java
Log:
TEIID-1015 adding back queuing logic for non-transactional requests

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java	2010-04-22 21:17:22 UTC (rev 2074)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java	2010-04-22 22:03:05 UTC (rev 2075)
@@ -26,6 +26,7 @@
  */
 package org.teiid.dqp.internal.datamgr.impl;
 
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
@@ -46,10 +47,12 @@
 import org.teiid.connector.metadata.runtime.MetadataFactory;
 import org.teiid.connector.metadata.runtime.MetadataStore;
 import org.teiid.dqp.internal.cache.DQPContextCache;
+import org.teiid.dqp.internal.datamgr.impl.ConnectorWorkItem.PermitMode;
+import org.teiid.dqp.internal.process.AbstractWorkItem;
 import org.teiid.logging.api.CommandLogMessage;
 import org.teiid.logging.api.CommandLogMessage.Event;
-import org.teiid.security.SecurityHelper;
 
+import com.metamatrix.common.buffer.BlockedException;
 import com.metamatrix.common.log.LogConstants;
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.common.log.MessageLevel;
@@ -70,17 +73,9 @@
 @ManagementObject(isRuntime=true, componentType=@ManagementComponent(type="teiid",subtype="connectormanager"), properties=ManagementProperties.EXPLICIT)
 public class ConnectorManager  {
 	
-	public enum ConnectorStatus {
-		NOT_INITIALIZED, INIT_FAILED, OPEN, DATA_SOURCE_UNAVAILABLE, CLOSED, UNABLE_TO_CHECK;
-	}
-	
 	public static final int DEFAULT_MAX_THREADS = 20;
 	private String connectorName;
 	    
-    private SecurityHelper securityHelper;
-    
-    private volatile ConnectorStatus state = ConnectorStatus.NOT_INITIALIZED;
-
     //services acquired in start
     private BufferService bufferService;
     
@@ -88,25 +83,42 @@
     private ConcurrentHashMap<AtomicRequestID, ConnectorWorkItem> requestStates = new ConcurrentHashMap<AtomicRequestID, ConnectorWorkItem>();
 	
 	private SourceCapabilities cachedCapabilities;
+	
+	private int currentConnections;
+	private int maxConnections;
+	private LinkedList<ConnectorWorkItem> queuedRequests = new LinkedList<ConnectorWorkItem>();
+	
+	private volatile boolean stopped;
 
     public ConnectorManager(String name) {
-    	this(name, DEFAULT_MAX_THREADS, null);
+    	this(name, DEFAULT_MAX_THREADS);
     }
 	
-    public ConnectorManager(String name, int maxThreads, SecurityHelper securityHelper) {
+    public ConnectorManager(String name, int maxThreads) {
     	if (name == null) {
     		throw new IllegalArgumentException("Connector name can not be null"); //$NON-NLS-1$
     	}
     	if (maxThreads <= 0) {
     		maxThreads = DEFAULT_MAX_THREADS;
     	}
+    	this.maxConnections = maxThreads;
     	this.connectorName = name;
-    	this.securityHelper = securityHelper;
     }
     
-    SecurityHelper getSecurityHelper() {
-		return securityHelper;
-	}
+    public synchronized void acquireConnectionLock(ConnectorWorkItem item) throws BlockedException {
+    	switch (item.getPermitMode()) {
+    	case NOT_ACQUIRED: 
+    		if (currentConnections < maxConnections) {
+	    		currentConnections++;
+	    		item.setPermitMode(PermitMode.ACQUIRED);
+	    		return;
+	    	}
+	    	queuedRequests.add(item);
+	    	item.setPermitMode(PermitMode.BLOCKED);	
+    	case BLOCKED:
+    		throw BlockedException.INSTANCE;
+    	}
+    }
     
     public String getName() {
         return this.connectorName;
@@ -129,7 +141,6 @@
     	return factory.getMetadataStore();
 	}    
     
-    
     public SourceCapabilities getCapabilities() throws ConnectorException {
     	if (cachedCapabilities != null) {
     		return cachedCapabilities;
@@ -161,13 +172,13 @@
         }
     }
     
-    public ConnectorWork executeRequest(AtomicRequestMessage message) throws ConnectorException {
+    public ConnectorWork executeRequest(AtomicRequestMessage message, AbstractWorkItem awi) throws ConnectorException {
         // Set the connector ID to be used; if not already set. 
     	checkStatus();
     	AtomicRequestID atomicRequestId = message.getAtomicRequestID();
     	LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {atomicRequestId, "Create State"}); //$NON-NLS-1$
 
-    	ConnectorWorkItem item = new ConnectorWorkItem(message, this);
+    	ConnectorWorkItem item = new ConnectorWorkItem(message, awi, this);
         Assertion.isNull(requestStates.put(atomicRequestId, item), "State already existed"); //$NON-NLS-1$
         return item;
     }
@@ -182,7 +193,18 @@
      */
     void removeState(AtomicRequestID id) {
     	LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {id, "Remove State"}); //$NON-NLS-1$
-        requestStates.remove(id);    
+        ConnectorWorkItem cwi = requestStates.remove(id);
+        if (cwi != null && cwi.getPermitMode() == PermitMode.ACQUIRED) {
+        	synchronized (this) {
+	        	ConnectorWorkItem next = queuedRequests.pollFirst();
+	        	if (next == null) {
+	        		currentConnections--;
+	        		return;
+	        	}
+	        	next.setPermitMode(PermitMode.ACQUIRED);
+	        	next.getParent().moreWork();
+        	}
+        }
     }
 
     int size() {
@@ -196,28 +218,15 @@
     /**
      * initialize this <code>ConnectorManager</code>.
      */
-    public synchronized void start() {
-    	if (this.state != ConnectorStatus.NOT_INITIALIZED) {
-    		return;
-    	}
-    	this.state = ConnectorStatus.INIT_FAILED;
-        
+    public void start() {
         LogManager.logDetail(LogConstants.CTX_CONNECTOR, DQPPlugin.Util.getString("ConnectorManagerImpl.Initializing_connector", connectorName)); //$NON-NLS-1$
-
-    	this.state = ConnectorStatus.OPEN;
     }
     
     /**
      * Stop this connector.
      */
-    public void stop() {        
-        synchronized (this) {
-        	if (this.state == ConnectorStatus.CLOSED) {
-        		return;
-        	}
-            this.state= ConnectorStatus.CLOSED;
-		}
-        
+    public void stop() {    
+    	stopped = true;
         //ensure that all requests receive a response
         for (ConnectorWork workItem : this.requestStates.values()) {
     		workItem.cancel();
@@ -289,12 +298,8 @@
     	return null;
     }
     
-    public ConnectorStatus getStatus() {
-    	return this.state;
-    }
-    
     private void checkStatus() throws ConnectorException {
-    	if (this.state != ConnectorStatus.OPEN) {
+    	if (stopped) {
     		throw new ConnectorException(DQPPlugin.Util.getString("ConnectorManager.not_in_valid_state", this.connectorName)); //$NON-NLS-1$
     	}
     }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java	2010-04-22 21:17:22 UTC (rev 2074)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java	2010-04-22 22:03:05 UTC (rev 2075)
@@ -24,6 +24,7 @@
 
 import org.teiid.connector.api.ConnectorException;
 
+import com.metamatrix.common.buffer.BlockedException;
 import com.metamatrix.dqp.message.AtomicResultsMessage;
 
 /**
@@ -37,6 +38,6 @@
 
 	void close();
 
-	AtomicResultsMessage execute() throws ConnectorException;
+	AtomicResultsMessage execute() throws ConnectorException, BlockedException;
 
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java	2010-04-22 21:17:22 UTC (rev 2074)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java	2010-04-22 22:03:05 UTC (rev 2075)
@@ -42,9 +42,11 @@
 import org.teiid.connector.metadata.runtime.RuntimeMetadata;
 import org.teiid.dqp.internal.datamgr.language.LanguageBridgeFactory;
 import org.teiid.dqp.internal.datamgr.metadata.RuntimeMetadataImpl;
+import org.teiid.dqp.internal.process.AbstractWorkItem;
 import org.teiid.logging.api.CommandLogMessage.Event;
 
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
+import com.metamatrix.common.buffer.BlockedException;
 import com.metamatrix.common.buffer.TupleBuffer;
 import com.metamatrix.common.log.LogConstants;
 import com.metamatrix.common.log.LogManager;
@@ -64,12 +66,18 @@
 
 public class ConnectorWorkItem implements ConnectorWork {
 	
+	enum PermitMode {
+		BLOCKED, ACQUIRED, NOT_ACQUIRED
+	}
+	
 	/* Permanent state members */
 	private AtomicRequestID id;
     private ConnectorManager manager;
     private AtomicRequestMessage requestMsg;
     private Connector connector;
     private QueryMetadataInterface queryMetadata;
+    private PermitMode permitMode = PermitMode.NOT_ACQUIRED;
+    private AbstractWorkItem awi;
     
     /* Created on new request */
     private Connection connection;
@@ -89,7 +97,7 @@
     
     private AtomicBoolean isCancelled = new AtomicBoolean();
     
-    ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager) throws ConnectorException {
+    ConnectorWorkItem(AtomicRequestMessage message, AbstractWorkItem awi, ConnectorManager manager) throws ConnectorException {
         this.id = message.getAtomicRequestID();
         this.requestMsg = message;
         this.manager = manager;
@@ -116,11 +124,24 @@
         if (requestMsg.isTransactional() &&  this.connectorEnv.isXaCapable()) {
     		this.securityContext.setTransactional(true);
         }
+        this.awi = awi;
     }
     
     public AtomicRequestID getId() {
 		return id;
 	}
+    
+    public PermitMode getPermitMode() {
+		return permitMode;
+	}
+    
+    public void setPermitMode(PermitMode permitMode) {
+		this.permitMode = permitMode;
+	}
+    
+    public AbstractWorkItem getParent() {
+		return awi;
+	}
 
     public void cancel() {
     	try {
@@ -196,10 +217,14 @@
 		return new ConnectorException(t);
     }
     
-	public AtomicResultsMessage execute() throws ConnectorException {
+	public AtomicResultsMessage execute() throws ConnectorException, BlockedException {
         if(isCancelled()) {
     		throw new ConnectorException("Request canceled"); //$NON-NLS-1$
     	}
+        
+        if (!this.securityContext.isTransactional()) {
+        	this.manager.acquireConnectionLock(this);
+        }
 
     	LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.requestMsg.getAtomicRequestID(), "Processing NEW request:", this.requestMsg.getCommand()}); //$NON-NLS-1$                                     
     	try {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java	2010-04-22 21:17:22 UTC (rev 2074)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java	2010-04-22 22:03:05 UTC (rev 2075)
@@ -92,7 +92,7 @@
     	return this.threadState == ThreadState.IDLE;
     }
     
-    protected void moreWork() {
+    public void moreWork() {
     	moreWork(true);
     }
     

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2010-04-22 21:17:22 UTC (rev 2074)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2010-04-22 22:03:05 UTC (rev 2075)
@@ -346,8 +346,8 @@
 		return aqr;
 	}
 	
-	ConnectorWork executeRequest(AtomicRequestMessage aqr, String connectorName) throws ConnectorException {
-		return getCM(connectorName).executeRequest(aqr);
+	ConnectorWork executeRequest(AtomicRequestMessage aqr, AbstractWorkItem awi, String connectorName) throws ConnectorException {
+		return getCM(connectorName).executeRequest(aqr, awi);
 	}
 
     /** 

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2010-04-22 21:17:22 UTC (rev 2074)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2010-04-22 22:03:05 UTC (rev 2075)
@@ -101,7 +101,7 @@
     void open() throws MetaMatrixComponentException, MetaMatrixProcessingException {
         try {
 	        if (this.cwi == null) {
-	        	this.cwi = this.dataMgr.executeRequest(aqr, this.connectorName);
+	        	this.cwi = this.dataMgr.executeRequest(aqr, this.workItem, this.connectorName);
 	        	Assertion.isNull(workItem.getConnectorRequest(aqr.getAtomicRequestID()));
 	            workItem.addConnectorRequest(aqr.getAtomicRequestID(), this);
 	        }

Modified: trunk/engine/src/test/java/com/metamatrix/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/dqp/service/AutoGenDataService.java	2010-04-22 21:17:22 UTC (rev 2074)
+++ trunk/engine/src/test/java/com/metamatrix/dqp/service/AutoGenDataService.java	2010-04-22 22:03:05 UTC (rev 2075)
@@ -33,6 +33,7 @@
 import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
 import org.teiid.dqp.internal.datamgr.impl.ConnectorWork;
 import org.teiid.dqp.internal.datamgr.impl.ConnectorWorkItem;
+import org.teiid.dqp.internal.process.AbstractWorkItem;
 
 import com.metamatrix.common.types.DataTypeManager;
 import com.metamatrix.dqp.message.AtomicRequestMessage;
@@ -67,7 +68,7 @@
     }
 
     @Override
-    public ConnectorWork executeRequest(AtomicRequestMessage message)
+    public ConnectorWork executeRequest(AtomicRequestMessage message, AbstractWorkItem awi)
     		throws ConnectorException {
     	if (throwExceptionOnExecute) {
     		throw new ConnectorException("Connector Exception"); //$NON-NLS-1$

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java	2010-04-22 21:17:22 UTC (rev 2074)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java	2010-04-22 22:03:05 UTC (rev 2075)
@@ -31,7 +31,9 @@
 import org.mockito.Mockito;
 import org.teiid.connector.api.Connector;
 import org.teiid.connector.api.ConnectorEnvironment;
+import org.teiid.dqp.internal.process.AbstractWorkItem;
 
+import com.metamatrix.common.buffer.BlockedException;
 import com.metamatrix.dqp.message.AtomicRequestID;
 import com.metamatrix.dqp.message.AtomicRequestMessage;
 import com.metamatrix.dqp.message.RequestID;
@@ -46,7 +48,7 @@
 	static ConnectorManager getConnectorManager(ConnectorEnvironment env) throws Exception {
 		final FakeConnector c = new FakeConnector();
 		c.setConnectorEnvironment(env);		
-		ConnectorManager cm = new ConnectorManager("FakeConnector") { //$NON-NLS-1$
+		ConnectorManager cm = new ConnectorManager("FakeConnector", 1) { //$NON-NLS-1$
 			Connector getConnector() {
 				return c;
 			}
@@ -70,7 +72,7 @@
     }
 
     void helpAssureOneState() throws Exception {
-    	csm.executeRequest(request);
+    	csm.executeRequest(request, Mockito.mock(AbstractWorkItem.class));
     	ConnectorWork state = csm.getState(request.getAtomicRequestID());
     	assertEquals(state, csm.getState(request.getAtomicRequestID()));
     }
@@ -102,5 +104,34 @@
 
         assertEquals("Expected size of 1", 1, csm.size()); //$NON-NLS-1$
     }
+    
+    public void testQueuing() throws Exception {
+    	ConnectorWork workItem = csm.executeRequest(request, Mockito.mock(AbstractWorkItem.class));
+    	workItem.execute();
+    	
+    	AbstractWorkItem awi1 = Mockito.mock(AbstractWorkItem.class);
+    	ConnectorWork workItem1 = csm.executeRequest(TestConnectorWorkItem.createNewAtomicRequestMessage(2, 1), awi1);
+    	
+    	AbstractWorkItem awi2 = Mockito.mock(AbstractWorkItem.class);
+    	ConnectorWork workItem2 = csm.executeRequest(TestConnectorWorkItem.createNewAtomicRequestMessage(3, 1), awi2);
+
+    	try {
+    		workItem1.execute();
+    		fail("expected exception"); //$NON-NLS-1$
+    	} catch (BlockedException e) {
+    		
+    	}
+    	workItem.close();
+    	
+    	try {
+    		workItem2.execute(); //ensure that another item cannot jump in the queue
+    		fail("expected exception"); //$NON-NLS-1$
+    	} catch (BlockedException e) {
+    		
+    	}
+
+    	Mockito.verify(awi1).moreWork();
+    	workItem1.execute();
+    }
         
 }
\ No newline at end of file

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java	2010-04-22 21:17:22 UTC (rev 2074)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java	2010-04-22 22:03:05 UTC (rev 2075)
@@ -38,6 +38,7 @@
 import org.teiid.connector.api.ProcedureExecution;
 import org.teiid.connector.language.Call;
 import org.teiid.dqp.internal.datamgr.language.LanguageBridgeFactory;
+import org.teiid.dqp.internal.process.AbstractWorkItem;
 import org.teiid.dqp.internal.process.DQPWorkContext;
 
 import com.metamatrix.dqp.message.AtomicRequestMessage;
@@ -116,7 +117,8 @@
 		Command command = helpGetCommand("update bqt1.smalla set stringkey = 1 where stringkey = 2", EXAMPLE_BQT); //$NON-NLS-1$
 		AtomicRequestMessage arm = createNewAtomicRequestMessage(1, 1);
 		arm.setCommand(command);
-		ConnectorWorkItem synchConnectorWorkItem = new ConnectorWorkItem(arm, TestConnectorManager.getConnectorManager(Mockito.mock(ConnectorEnvironment.class)));
+		ConnectorWorkItem synchConnectorWorkItem = new ConnectorWorkItem(arm, Mockito.mock(AbstractWorkItem.class), 
+				TestConnectorManager.getConnectorManager(Mockito.mock(ConnectorEnvironment.class)));
 		return synchConnectorWorkItem.execute();
 	}
 	
@@ -150,7 +152,7 @@
 				return Mockito.mock(Xid.class);
 			}} );
 		
-		new ConnectorWorkItem(requestMsg, cm);
+		new ConnectorWorkItem(requestMsg, Mockito.mock(AbstractWorkItem.class), cm);
     }
     
 	@Ignore
@@ -178,7 +180,7 @@
 				return Mockito.mock(Xid.class);
 			}} );
 		
-		new ConnectorWorkItem(requestMsg, cm);
+		new ConnectorWorkItem(requestMsg, Mockito.mock(AbstractWorkItem.class), cm);
     }
 
 }

Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java	2010-04-22 21:17:22 UTC (rev 2074)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java	2010-04-22 22:03:05 UTC (rev 2075)
@@ -95,7 +95,7 @@
 
 
     ConnectorManager createConnectorManger(String deployedConnectorName, int maxThreads) {
-        ConnectorManager mgr = new ConnectorManager(deployedConnectorName, maxThreads, securityHelper);       
+        ConnectorManager mgr = new ConnectorManager(deployedConnectorName, maxThreads);       
         return mgr;
     }
     



More information about the teiid-commits mailing list