[teiid-commits] teiid SVN: r4030 - in branches/8.0.x: engine/src/main/java/org/teiid/query/tempdata and 2 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Apr 24 12:25:15 EDT 2012


Author: shawkins
Date: 2012-04-24 12:25:13 -0400 (Tue, 24 Apr 2012)
New Revision: 4030

Modified:
   branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
   branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
   branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
   branches/8.0.x/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
   branches/8.0.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
   branches/8.0.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
   branches/8.0.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
   branches/8.0.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
   branches/8.0.x/jboss-integration/src/main/java/org/teiid/jboss/TeiidOperationHandler.java
Log:
TEIID-1986 spawning a request/session for asynch refreshes

Modified: branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2012-04-24 16:22:56 UTC (rev 4029)
+++ branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2012-04-24 16:25:13 UTC (rev 4030)
@@ -49,6 +49,7 @@
 import org.teiid.client.DQP;
 import org.teiid.client.RequestMessage;
 import org.teiid.client.ResultsMessage;
+import org.teiid.client.RequestMessage.StatementType;
 import org.teiid.client.lob.LobChunk;
 import org.teiid.client.metadata.MetadataResult;
 import org.teiid.client.plan.PlanNode;
@@ -485,21 +486,6 @@
         return resultsFuture;
     }
     
-//    /**
-//     * Cancels a node in the request. (This request is called by the 
-//     * client directly using the admin API), so if this does not support
-//     * partial results then remove the original request.
-//     * @throws MetaMatrixComponentException 
-//     */
-//    public void cancelAtomicRequest(AtomicRequestID requestID) throws MetaMatrixComponentException {                    
-//        RequestWorkItem workItem = safeGetWorkItem(requestID.getRequestID());
-//        if (workItem == null) {
-//    		LogManager.logDetail(LogConstants.CTX_DQP, "Could not cancel", requestID, "parent request does not exist"); //$NON-NLS-1$ //$NON-NLS-2$
-//        	return;
-//        }
-//        workItem.requestAtomicRequestCancel(requestID);
-//    }
-    
     RequestWorkItem getRequestWorkItem(RequestID reqID) throws TeiidProcessingException {
     	RequestWorkItem result = this.requests.get(reqID);
     	if (result == null) {
@@ -695,7 +681,27 @@
         
         DataTierManagerImpl processorDataManager = new DataTierManagerImpl(this, this.bufferManager, this.config.isDetectingChangeEvents());
         processorDataManager.setEventDistributor(eventDistributor);
-		dataTierMgr = new TempTableDataManager(processorDataManager, this.bufferManager, this.processWorkerPool, this.rsCache);
+		dataTierMgr = new TempTableDataManager(processorDataManager, this.bufferManager, this.rsCache);
+		dataTierMgr.setExecutor(new TempTableDataManager.RequestExecutor() {
+			
+			@Override
+			public void execute(String command, List<?> parameters) {
+				final String sessionId = DQPWorkContext.getWorkContext().getSessionId();
+				RequestMessage request = new RequestMessage(command);
+				request.setParameterValues(parameters);
+				request.setStatementType(StatementType.PREPARED);
+				ResultsFuture<ResultsMessage> result = executeRequest(0, request);
+				result.addCompletionListener(new ResultsFuture.CompletionListener<ResultsMessage>() {
+
+					@Override
+					public void onCompletion(
+							ResultsFuture<ResultsMessage> future) {
+						terminateSession(sessionId);
+					}
+					
+				});
+			}
+		});
         dataTierMgr.setEventDistributor(eventDistributor);
                 
         LogManager.logDetail(LogConstants.CTX_DQP, "DQPCore started maxThreads", this.config.getMaxThreads(), "maxActivePlans", this.maxActivePlans, "source concurrency", this.userRequestSourceConcurrency); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$

Modified: branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
===================================================================
--- branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java	2012-04-24 16:22:56 UTC (rev 4029)
+++ branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java	2012-04-24 16:25:13 UTC (rev 4030)
@@ -117,6 +117,7 @@
     private HashMap<String, DataPolicy> policies;
     private boolean useCallingThread;
     private Version clientVersion = Version.SEVEN_4;
+    private boolean admin;
     
     public DQPWorkContext() {
 	}
@@ -141,6 +142,10 @@
     public void setSecurityHelper(SecurityHelper securityHelper) {
 		this.securityHelper = securityHelper;
 	}
+    
+    public SecurityHelper getSecurityHelper() {
+		return securityHelper;
+	}
 
     /**
      * @return
@@ -150,10 +155,7 @@
     }
     
     public Subject getSubject() {
-    	if (session.getSubject() != null) {
-    		return session.getSubject();
-    	}
-    	return null;
+		return session.getSubject();
     }
     
     /**
@@ -311,4 +313,13 @@
 	public void setClientVersion(Version clientVersion) {
 		this.clientVersion = clientVersion;
 	}
+	
+	public void setAdmin(boolean admin) {
+		this.admin = admin;
+	}
+	
+	public boolean isAdmin() {
+		return admin;
+	}
+
 }

Modified: branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2012-04-24 16:22:56 UTC (rev 4029)
+++ branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2012-04-24 16:25:13 UTC (rev 4030)
@@ -61,19 +61,7 @@
 import org.teiid.events.EventDistributor;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
-import org.teiid.metadata.AbstractMetadataRecord;
-import org.teiid.metadata.Column;
-import org.teiid.metadata.ColumnStats;
-import org.teiid.metadata.Datatype;
-import org.teiid.metadata.ForeignKey;
-import org.teiid.metadata.FunctionMethod;
-import org.teiid.metadata.KeyRecord;
-import org.teiid.metadata.MetadataRepository;
-import org.teiid.metadata.Procedure;
-import org.teiid.metadata.ProcedureParameter;
-import org.teiid.metadata.Schema;
-import org.teiid.metadata.Table;
-import org.teiid.metadata.TableStats;
+import org.teiid.metadata.*;
 import org.teiid.query.QueryPlugin;
 import org.teiid.query.metadata.CompositeMetadataStore;
 import org.teiid.query.metadata.TempMetadataID;
@@ -182,7 +170,7 @@
 			return processSystemQuery(context, command, workItem.getDqpWorkContext());
 		}
 		
-		AtomicRequestMessage aqr = createRequest(context.getProcessorID(), command, modelName, connectorBindingId, nodeID);
+		AtomicRequestMessage aqr = createRequest(workItem, command, modelName, connectorBindingId, nodeID);
 		aqr.setCommandContext(context);
 		SourceHint sh = context.getSourceHint();
 		if (sh != null) {
@@ -572,11 +560,9 @@
 		return result;
 	}
 	
-	private AtomicRequestMessage createRequest(Object processorId,
+	private AtomicRequestMessage createRequest(RequestWorkItem workItem,
 			Command command, String modelName, String connectorBindingId, int nodeID)
-			throws TeiidProcessingException, TeiidComponentException {
-		RequestWorkItem workItem = requestMgr.getRequestWorkItem((RequestID)processorId);
-		
+			throws TeiidComponentException {
 	    RequestMessage request = workItem.requestMsg;
 		// build the atomic request based on original request + context info
         AtomicRequestMessage aqr = new AtomicRequestMessage(request, workItem.getDqpWorkContext(), nodeID);

Modified: branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2012-04-24 16:22:56 UTC (rev 4029)
+++ branches/8.0.x/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2012-04-24 16:25:13 UTC (rev 4030)
@@ -470,7 +470,7 @@
 		if (context == null) {
 			createCommandContext(command);
 		}
-		if (this.authorizationValidator != null) {
+		if (!this.workContext.isAdmin() && this.authorizationValidator != null) {
 			return this.authorizationValidator.validate(commandStr, command, metadata, context, type);
 		}
 		return false;

Modified: branches/8.0.x/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
===================================================================
--- branches/8.0.x/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java	2012-04-24 16:22:56 UTC (rev 4029)
+++ branches/8.0.x/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java	2012-04-24 16:25:13 UTC (rev 4030)
@@ -79,6 +79,7 @@
 		private Serializable loadingAddress;
 		private long ttl = -1;
 		private boolean valid;
+		private boolean asynch; //sub state of loading
 		
 		protected MatTableInfo() {}
 		
@@ -96,7 +97,9 @@
 				}
 				return true;
 			case LOADING:
-				if (!firstPass && localAddress instanceof Comparable<?> && ((Comparable)localAddress).compareTo(possibleLoadingAddress) < 0) {
+				if ((!firstPass && localAddress instanceof Comparable<?> && ((Comparable)localAddress).compareTo(possibleLoadingAddress) < 0)
+						|| (refresh && asynch)) {
+					this.asynch = false;
 					this.loadingAddress = possibleLoadingAddress; //ties go to the lowest address
 					return true;
 				}
@@ -130,6 +133,11 @@
 			notifyAll();
 		}
 		
+		public synchronized void setAsynchLoad() {
+			assert state == MatState.LOADING;
+			asynch = true;
+		}
+		
 		public synchronized void setTtl(long ttl) {
 			this.ttl = ttl;
 		}

Modified: branches/8.0.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- branches/8.0.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2012-04-24 16:22:56 UTC (rev 4029)
+++ branches/8.0.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2012-04-24 16:25:13 UTC (rev 4030)
@@ -27,15 +27,15 @@
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
-import java.util.concurrent.FutureTask;
 
+import org.teiid.adminapi.impl.SessionMetadata;
+import org.teiid.adminapi.impl.VDBMetaData;
 import org.teiid.api.exception.query.ExpressionEvaluationException;
 import org.teiid.api.exception.query.QueryMetadataException;
 import org.teiid.api.exception.query.QueryProcessingException;
 import org.teiid.api.exception.query.QueryResolverException;
 import org.teiid.api.exception.query.QueryValidatorException;
+import org.teiid.client.security.SessionToken;
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleBuffer;
@@ -47,13 +47,14 @@
 import org.teiid.core.util.Assertion;
 import org.teiid.core.util.StringUtil;
 import org.teiid.dqp.internal.process.CachedResults;
+import org.teiid.dqp.internal.process.DQPWorkContext;
 import org.teiid.dqp.internal.process.SessionAwareCache;
 import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
 import org.teiid.events.EventDistributor;
+import org.teiid.language.SQLConstants;
 import org.teiid.language.SQLConstants.Reserved;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
-import org.teiid.metadata.MetadataRepository;
 import org.teiid.metadata.FunctionMethod.Determinism;
 import org.teiid.query.QueryPlugin;
 import org.teiid.query.eval.Evaluator;
@@ -85,25 +86,33 @@
  */
 public class TempTableDataManager implements ProcessorDataManager {
 	
-    private static final String REFRESHMATVIEWROW = ".refreshmatviewrow"; //$NON-NLS-1$
+	public interface RequestExecutor {
+		void execute(String command, List<?> parameters);
+	}
+	
+	private static final String REFRESHMATVIEWROW = ".refreshmatviewrow"; //$NON-NLS-1$
 	private static final String REFRESHMATVIEW = ".refreshmatview"; //$NON-NLS-1$
 	public static final String CODE_PREFIX = "#CODE_"; //$NON-NLS-1$
-	
+	private static String REFRESH_SQL = SQLConstants.Reserved.CALL + ' ' + CoreConstants.SYSTEM_ADMIN_MODEL + REFRESHMATVIEW + "(?, ?)"; //$NON-NLS-1$
+
 	private ProcessorDataManager processorDataManager;
     private BufferManager bufferManager;
 	private SessionAwareCache<CachedResults> cache;
-    private Executor executor;
+    private RequestExecutor executor;
     
     private EventDistributor eventDistributor;
-
+	
     public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager bufferManager, 
-    		Executor executor, SessionAwareCache<CachedResults> cache){
+    		SessionAwareCache<CachedResults> cache){
         this.processorDataManager = processorDataManager;
         this.bufferManager = bufferManager;
-        this.executor = executor;
         this.cache = cache;
     }
     
+    public void setExecutor(RequestExecutor executor) {
+		this.executor = executor;
+	}
+    
     public void setEventDistributor(EventDistributor eventDistributor) {
 		this.eventDistributor = eventDistributor;
 	}
@@ -396,11 +405,12 @@
 				}
 			}
 			if (load) {
-				if (!info.isValid()) {
+				if (!info.isValid() || executor == null) {
 					//blocking load
 					loadGlobalTable(context, group, tableName, globalStore);
 				} else {
-					loadAsynch(context, group, tableName, globalStore);
+					info.setAsynchLoad();
+					loadAsynch(context, tableName);
 				}
 			} 
 			table = globalStore.getTempTableStore().getOrCreateTempTable(tableName, query, bufferManager, false, false, context);
@@ -417,16 +427,25 @@
 		return table.createTupleSource(query.getProjectedSymbols(), query.getCriteria(), query.getOrderBy());
 	}
 
-	private void loadAsynch(final CommandContext context,
-			final GroupSymbol group, final String tableName, final GlobalTableStore globalStore) {
-		Callable<Integer> toCall = new Callable<Integer>() {
+	private void loadAsynch(final CommandContext context, final String tableName) {
+		SessionMetadata session = createTemporarySession(context.getUserName(), "asynch-mat-view-load", context.getDQPWorkContext().getVDB()); //$NON-NLS-1$
+		session.setSubject(context.getSubject());
+		session.setSecurityDomain(context.getSession().getSecurityDomain());
+		DQPWorkContext workContext = new DQPWorkContext();
+		workContext.setAdmin(true);
+		DQPWorkContext current = context.getDQPWorkContext();
+		workContext.setSession(session);
+		workContext.setPolicies(current.getAllowedDataPolicies());
+		workContext.setSecurityHelper(current.getSecurityHelper());
+		final String viewName = tableName.substring(RelationalPlanner.MAT_PREFIX.length());
+		workContext.runInContext(new Runnable() {
+
 			@Override
-			public Integer call() throws Exception {
-				return loadGlobalTable(context, group, tableName, globalStore);
+			public void run() {
+				executor.execute(REFRESH_SQL, Arrays.asList(viewName, Boolean.FALSE.toString()));
 			}
-		};
-		FutureTask<Integer> task = new FutureTask<Integer>(toCall);
-		executor.execute(task);
+			
+		});
 	}
 
 	private int loadGlobalTable(CommandContext context,
@@ -515,4 +534,26 @@
 	public EventDistributor getEventDistributor() {
 		return this.eventDistributor;
 	}
+
+	/**
+	 * Create an unauthenticated session
+	 * @param userName
+	 * @param app
+	 * @param vdb
+	 * @return
+	 */
+	public static SessionMetadata createTemporarySession(String userName, String app, VDBMetaData vdb) {
+		long creationTime = System.currentTimeMillis();
+	    SessionMetadata newSession = new SessionMetadata();
+	    newSession.setSessionToken(new SessionToken(userName));
+	    newSession.setSessionId(newSession.getSessionToken().getSessionID());
+	    newSession.setUserName(userName);
+	    newSession.setCreatedTime(creationTime);
+	    newSession.setApplicationName(app); 
+	    newSession.setVDBName(vdb.getName());
+	    newSession.setVDBVersion(vdb.getVersion());
+	    newSession.setVdb(vdb);
+	    newSession.setEmbedded(true);
+		return newSession;
+	}
 }

Modified: branches/8.0.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
===================================================================
--- branches/8.0.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java	2012-04-24 16:22:56 UTC (rev 4029)
+++ branches/8.0.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java	2012-04-24 16:25:13 UTC (rev 4030)
@@ -35,7 +35,6 @@
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.BufferManagerFactory;
 import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.util.ExecutorUtils;
 import org.teiid.dqp.internal.process.CachedResults;
 import org.teiid.dqp.internal.process.QueryProcessorFactoryImpl;
 import org.teiid.dqp.internal.process.SessionAwareCache;
@@ -75,7 +74,7 @@
 		
 	    SessionAwareCache<CachedResults> cache = new SessionAwareCache<CachedResults>();
 	    cache.setTupleBufferCache(bm);
-		dataManager = new TempTableDataManager(hdm, bm, ExecutorUtils.getDirectExecutor(), cache);
+		dataManager = new TempTableDataManager(hdm, bm, cache);
 	}
 	
 	private void execute(String sql, List<?>... expectedResults) throws Exception {

Modified: branches/8.0.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- branches/8.0.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2012-04-24 16:22:56 UTC (rev 4029)
+++ branches/8.0.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2012-04-24 16:25:13 UTC (rev 4030)
@@ -46,7 +46,6 @@
 import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.types.XMLType;
-import org.teiid.core.util.ExecutorUtils;
 import org.teiid.dqp.internal.process.CachedResults;
 import org.teiid.dqp.internal.process.PreparedPlan;
 import org.teiid.dqp.internal.process.QueryProcessorFactoryImpl;
@@ -247,7 +246,7 @@
         if (!(dataManager instanceof TempTableDataManager)) {
     	    SessionAwareCache<CachedResults> cache = new SessionAwareCache<CachedResults>();
     	    cache.setTupleBufferCache(bufferMgr);
-        	dataManager = new TempTableDataManager(dataManager, bufferMgr, ExecutorUtils.getDirectExecutor(), cache);
+        	dataManager = new TempTableDataManager(dataManager, bufferMgr, cache);
         }        
         if (context.getQueryProcessorFactory() == null) {
         	context.setQueryProcessorFactory(new QueryProcessorFactoryImpl(bufferMgr, dataManager, new DefaultCapabilitiesFinder(), null, context.getMetadata()));

Modified: branches/8.0.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
===================================================================
--- branches/8.0.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java	2012-04-24 16:22:56 UTC (rev 4029)
+++ branches/8.0.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java	2012-04-24 16:25:13 UTC (rev 4030)
@@ -43,7 +43,6 @@
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.BufferManagerFactory;
 import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.util.ExecutorUtils;
 import org.teiid.dqp.internal.process.CachedResults;
 import org.teiid.dqp.internal.process.SessionAwareCache;
 import org.teiid.dqp.service.TransactionContext;
@@ -94,7 +93,7 @@
 	    BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
 	    SessionAwareCache<CachedResults> cache = new SessionAwareCache<CachedResults>();
 	    cache.setTupleBufferCache(bm);
-		dataManager = new TempTableDataManager(fdm, bm, ExecutorUtils.getDirectExecutor(), cache);
+		dataManager = new TempTableDataManager(fdm, bm, cache);
 	}
 	
 	@Test public void testRollbackNoExisting() throws Exception {

Modified: branches/8.0.x/jboss-integration/src/main/java/org/teiid/jboss/TeiidOperationHandler.java
===================================================================
--- branches/8.0.x/jboss-integration/src/main/java/org/teiid/jboss/TeiidOperationHandler.java	2012-04-24 16:22:56 UTC (rev 4029)
+++ branches/8.0.x/jboss-integration/src/main/java/org/teiid/jboss/TeiidOperationHandler.java	2012-04-24 16:25:13 UTC (rev 4030)
@@ -63,7 +63,6 @@
 import org.teiid.client.RequestMessage;
 import org.teiid.client.ResultsMessage;
 import org.teiid.client.plan.PlanNode;
-import org.teiid.client.security.SessionToken;
 import org.teiid.client.util.ResultsFuture;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.deployers.ExtendedPropertyMetadata;
@@ -75,6 +74,7 @@
 import org.teiid.dqp.internal.process.SessionAwareCache;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
+import org.teiid.query.tempdata.TempTableDataManager;
 
 abstract class TeiidOperationHandler extends BaseOperationHandler<DQPCore> {
 	List<TransportService> transports = new ArrayList<TransportService>();
@@ -721,11 +721,16 @@
 		String user = "CLI ADMIN"; //$NON-NLS-1$
 		LogManager.logDetail(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.getString("admin_executing", user, command)); //$NON-NLS-1$
 		
-		SessionMetadata session = createTemporarySession(vdbName, version, user);
+        VDBMetaData vdb = this.vdbRepo.getVDB(vdbName, version);
+        if (vdb == null) {
+        	throw new OperationFailedException(new ModelNode().set(IntegrationPlugin.Util.getString("wrong_vdb")));//$NON-NLS-1$
+        }
+        final SessionMetadata session = TempTableDataManager.createTemporarySession(user, "admin-console", vdb); //$NON-NLS-1$
 
 		final long requestID =  0L;
 		
 		DQPWorkContext context = new DQPWorkContext();
+		context.setUseCallingThread(true);
 		context.setSession(session);
 		
 		try {
@@ -735,7 +740,7 @@
 					
 					long start = System.currentTimeMillis();
 					RequestMessage request = new RequestMessage(command);
-					request.setExecutionId(0L);
+					request.setExecutionId(requestID);
 					request.setRowLimit(engine.getMaxRowsFetchSize()); // this would limit the number of rows that are returned.
 					Future<ResultsMessage> message = engine.executeRequest(requestID, request);
 					ResultsMessage rm = null;
@@ -770,7 +775,19 @@
 			});
 		} catch (Throwable t) {
 			throw new OperationFailedException(new ModelNode().set(t.getMessage()));
-		} 
+		} finally {
+			try {
+				context.runInContext(new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						engine.terminateSession(session.getSessionId());
+						return null;
+					}
+				});
+			} catch (Throwable e) {
+				throw new OperationFailedException(new ModelNode().set(e.getMessage()));
+			}
+		}
 	}
 	
 	private void writeResults(ModelNode resultsNode, List<String> columns,  List<? extends List<?>> results) throws SQLException {
@@ -826,27 +843,6 @@
 		}
 	}
 	
-	private SessionMetadata createTemporarySession(final String vdbName, final int version, final String userName) throws OperationFailedException{
-		
-        long creationTime = System.currentTimeMillis();
-
-        // Return a new session info object
-        SessionMetadata newSession = new SessionMetadata();
-        newSession.setSessionToken(new SessionToken(userName));
-        newSession.setSessionId(newSession.getSessionToken().getSessionID());
-        newSession.setUserName(userName);
-        newSession.setCreatedTime(creationTime);
-        newSession.setApplicationName("admin-console"); //$NON-NLS-1$
-        newSession.setVDBName(vdbName);
-        newSession.setVDBVersion(version);
-        
-        VDBMetaData vdb = this.vdbRepo.getVDB(vdbName, version);
-        if (vdb == null) {
-        	throw new OperationFailedException(new ModelNode().set(IntegrationPlugin.Util.getString("wrong_vdb")));//$NON-NLS-1$
-        }
-        newSession.setVdb(vdb);
-		return newSession;
-	}	
 }
 
 class GetVDB extends BaseOperationHandler<VDBRepository>{



More information about the teiid-commits mailing list