Author: jolee
Date: 2014-01-21 10:42:47 -0500 (Tue, 21 Jan 2014)
New Revision: 4625
Modified:
branches/7.7.x/client/src/main/java/org/teiid/adminapi/impl/SessionMetadata.java
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
branches/7.7.x/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
branches/7.7.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
Log:
BZ1012573 + TEIID-2795: TTL Snapshot Refresh does not fully reload Internal MV if the user
query contains LIMIT
Modified:
branches/7.7.x/client/src/main/java/org/teiid/adminapi/impl/SessionMetadata.java
===================================================================
---
branches/7.7.x/client/src/main/java/org/teiid/adminapi/impl/SessionMetadata.java 2014-01-17
20:57:20 UTC (rev 4624)
+++
branches/7.7.x/client/src/main/java/org/teiid/adminapi/impl/SessionMetadata.java 2014-01-21
15:42:47 UTC (rev 4625)
@@ -61,6 +61,7 @@
private transient LoginContext loginContext;
private transient Object securityContext;
private transient boolean embedded;
+ private transient Subject subject;
@Override
@ManagementProperty(description="Application associated with Session",
readOnly=true)
@@ -200,6 +201,9 @@
}
public void setLoginContext(LoginContext loginContext) {
+ if (loginContext != null) {
+ this.subject = loginContext.getSubject();
+ }
this.loginContext = loginContext;
}
@@ -212,7 +216,7 @@
}
public Subject getSubject() {
- return this.loginContext.getSubject();
+ return this.subject;
}
public void setEmbedded(boolean embedded) {
@@ -232,5 +236,9 @@
public void setClientHardwareAddress(String clientHardwareAddress) {
this.clientHardwareAddress = clientHardwareAddress;
}
+
+ public void setSubject(Subject subject) {
+ this.subject = subject;
+ }
}
Modified: branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2014-01-17
20:57:20 UTC (rev 4624)
+++
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2014-01-21
15:42:47 UTC (rev 4625)
@@ -51,6 +51,7 @@
import org.teiid.cache.CacheFactory;
import org.teiid.client.DQP;
import org.teiid.client.RequestMessage;
+import org.teiid.client.RequestMessage.StatementType;
import org.teiid.client.ResultsMessage;
import org.teiid.client.lob.LobChunk;
import org.teiid.client.metadata.MetadataResult;
@@ -794,7 +795,27 @@
DataTierManagerImpl processorDataManager = new
DataTierManagerImpl(this,this.bufferService, this.config.isDetectingChangeEvents());
processorDataManager.setEventDistributor(eventDistributor);
processorDataManager.setMetadataRepository(metadataRepository);
- dataTierMgr = new TempTableDataManager(processorDataManager, this.bufferManager,
this.processWorkerPool, this.rsCache);
+ dataTierMgr = new TempTableDataManager(processorDataManager, this.bufferManager,
this.rsCache);
+ dataTierMgr.setExecutor(new TempTableDataManager.RequestExecutor() {
+
+ @Override
+ public void execute(String command, List<?> parameters) {
+ final String sessionId = DQPWorkContext.getWorkContext().getSessionId();
+ RequestMessage request = new RequestMessage(command);
+ request.setParameterValues(parameters);
+ request.setStatementType(StatementType.PREPARED);
+ ResultsFuture<ResultsMessage> result = executeRequest(0, request);
+ result.addCompletionListener(new
ResultsFuture.CompletionListener<ResultsMessage>() {
+
+ @Override
+ public void onCompletion(
+ ResultsFuture<ResultsMessage> future) {
+ terminateSession(sessionId);
+ }
+
+ });
+ }
+ });
dataTierMgr.setEventDistributor(eventDistributor);
LogManager.logDetail(LogConstants.CTX_DQP, "DQPCore started
maxThreads", this.config.getMaxThreads(), "maxActivePlans",
this.maxActivePlans, "source concurrency", this.userRequestSourceConcurrency);
//$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java 2014-01-17
20:57:20 UTC (rev 4624)
+++
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java 2014-01-21
15:42:47 UTC (rev 4625)
@@ -110,6 +110,7 @@
private HashMap<String, DataPolicy> policies;
private boolean useCallingThread;
private Version clientVersion = Version.SEVEN_4;
+ private boolean admin;
public DQPWorkContext() {
}
@@ -304,4 +305,12 @@
public void setClientVersion(Version clientVersion) {
this.clientVersion = clientVersion;
}
+
+ public void setAdmin(boolean admin) {
+ this.admin = admin;
+ }
+
+ public boolean isAdmin() {
+ return admin;
+ }
}
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2014-01-17
20:57:20 UTC (rev 4624)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2014-01-21
15:42:47 UTC (rev 4625)
@@ -78,6 +78,7 @@
private Serializable loadingAddress;
private long ttl = -1;
private boolean valid;
+ private boolean asynch; //sub state of loading
protected MatTableInfo() {}
@@ -95,7 +96,9 @@
}
return true;
case LOADING:
- if (!firstPass && localAddress instanceof Comparable<?> &&
((Comparable)localAddress).compareTo(possibleLoadingAddress) < 0) {
+ if ((!firstPass && localAddress instanceof Comparable<?> &&
((Comparable)localAddress).compareTo(possibleLoadingAddress) < 0)
+ || (refresh && asynch)) {
+ this.asynch = false;
this.loadingAddress = possibleLoadingAddress; //ties go to the lowest address
return true;
}
@@ -129,6 +132,11 @@
notifyAll();
}
+ public synchronized void setAsynchLoad() {
+ assert state == MatState.LOADING;
+ asynch = true;
+ }
+
public synchronized void setTtl(long ttl) {
this.ttl = ttl;
}
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2014-01-17
20:57:20 UTC (rev 4624)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2014-01-21
15:42:47 UTC (rev 4625)
@@ -27,15 +27,15 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
-import java.util.concurrent.FutureTask;
+import org.teiid.adminapi.impl.SessionMetadata;
+import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.api.exception.query.ExpressionEvaluationException;
import org.teiid.api.exception.query.QueryMetadataException;
import org.teiid.api.exception.query.QueryProcessingException;
import org.teiid.api.exception.query.QueryResolverException;
import org.teiid.api.exception.query.QueryValidatorException;
+import org.teiid.client.security.SessionToken;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBuffer;
@@ -47,9 +47,11 @@
import org.teiid.core.util.Assertion;
import org.teiid.core.util.StringUtil;
import org.teiid.dqp.internal.process.CachedResults;
+import org.teiid.dqp.internal.process.DQPWorkContext;
import org.teiid.dqp.internal.process.SessionAwareCache;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
import org.teiid.events.EventDistributor;
+import org.teiid.language.SQLConstants;
import org.teiid.language.SQLConstants.Reserved;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
@@ -85,25 +87,32 @@
*/
public class TempTableDataManager implements ProcessorDataManager {
+ public interface RequestExecutor {
+ void execute(String command, List<?> parameters);
+ }
+
private static final String REFRESHMATVIEWROW = ".refreshmatviewrow";
//$NON-NLS-1$
private static final String REFRESHMATVIEW = ".refreshmatview"; //$NON-NLS-1$
public static final String CODE_PREFIX = "#CODE_"; //$NON-NLS-1$
+ private static String REFRESH_SQL = SQLConstants.Reserved.CALL + ' ' +
CoreConstants.SYSTEM_ADMIN_MODEL + REFRESHMATVIEW + "(?, ?)"; //$NON-NLS-1$
private ProcessorDataManager processorDataManager;
private BufferManager bufferManager;
private SessionAwareCache<CachedResults> cache;
- private Executor executor;
+ private RequestExecutor executor;
private EventDistributor eventDistributor;
- public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager
bufferManager,
- Executor executor, SessionAwareCache<CachedResults> cache){
+ public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager
bufferManager, SessionAwareCache<CachedResults> cache){
this.processorDataManager = processorDataManager;
this.bufferManager = bufferManager;
- this.executor = executor;
this.cache = cache;
}
+ public void setExecutor(RequestExecutor executor) {
+ this.executor = executor;
+ }
+
public void setEventDistributor(EventDistributor eventDistributor) {
this.eventDistributor = eventDistributor;
}
@@ -385,19 +394,22 @@
}
}
synchronized (info) {
- try {
- info.wait(30000);
- } catch (InterruptedException e) {
- throw new TeiidComponentException(e);
+ if (!info.isUpToDate()) {
+ try {
+ info.wait(30000);
+ } catch (InterruptedException e) {
+ throw new TeiidComponentException(e);
+ }
}
}
}
if (load) {
- if (!info.isValid()) {
+ // executor == null is a test related check. executor will not be null in normal
runtime
+ if (!info.isValid() | executor == null) {
//blocking load
loadGlobalTable(context, group, tableName, globalStore);
} else {
- loadAsynch(context, group, tableName, globalStore);
+ loadAsynch(context, group, tableName, globalStore,
context.getDQPWorkContext().getVDB(), info);
}
}
table = globalStore.getTempTableStore().getOrCreateTempTable(tableName, query,
bufferManager, false, false, context);
@@ -435,16 +447,54 @@
}
private void loadAsynch(final CommandContext context,
- final GroupSymbol group, final String tableName, final GlobalTableStore globalStore)
{
- Callable<Integer> toCall = new Callable<Integer>() {
+ final GroupSymbol group, final String tableName, final GlobalTableStore globalStore,
final VDBMetaData vdb, MatTableInfo info) {
+ info.setAsynchLoad();
+ DQPWorkContext workContext = createWorkContext(context, vdb);
+ final String viewName = tableName.substring(RelationalPlanner.MAT_PREFIX.length());
+ workContext.runInContext(new Runnable() {
@Override
- public Integer call() throws Exception {
- return loadGlobalTable(context, group, tableName, globalStore);
+ public void run() {
+ executor.execute(REFRESH_SQL, Arrays.asList(viewName, Boolean.FALSE));
}
- };
- FutureTask<Integer> task = new FutureTask<Integer>(toCall);
- executor.execute(task);
+ });
}
+
+ private DQPWorkContext createWorkContext(final CommandContext context,
+ VDBMetaData vdb) {
+ SessionMetadata session = createTemporarySession(context.getUserName(),
"asynch-mat-view-load", vdb); //$NON-NLS-1$
+ session.setSubject(context.getSubject());
+ session.setSecurityDomain(context.getSession().getSecurityDomain());
+ session.setSecurityContext(((SessionMetadata)context.getSession()).getSecurityContext());
+ DQPWorkContext workContext = new DQPWorkContext();
+ workContext.setAdmin(true);
+ DQPWorkContext current = context.getDQPWorkContext();
+ workContext.setSession(session);
+ workContext.setPolicies(current.getAllowedDataPolicies());
+ workContext.setSecurityHelper(current.getSecurityHelper());
+ return workContext;
+ }
+
+ /**
+ * Create an unauthenticated session
+ * @param userName
+ * @param app
+ * @param vdb
+ * @return
+ */
+ public static SessionMetadata createTemporarySession(String userName, String app,
VDBMetaData vdb) {
+ long creationTime = System.currentTimeMillis();
+ SessionMetadata newSession = new SessionMetadata();
+ newSession.setSessionToken(new SessionToken(userName));
+ newSession.setSessionId(newSession.getSessionToken().getSessionID());
+ newSession.setUserName(userName);
+ newSession.setCreatedTime(creationTime);
+ newSession.setApplicationName(app);
+ newSession.setVDBName(vdb.getName());
+ newSession.setVDBVersion(vdb.getVersion());
+ newSession.setVdb(vdb);
+ newSession.setEmbedded(true);
+ return newSession;
+ }
private int loadGlobalTable(CommandContext context,
GroupSymbol group, final String tableName, GlobalTableStore globalStore)
Modified:
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
===================================================================
---
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java 2014-01-17
20:57:20 UTC (rev 4624)
+++
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java 2014-01-21
15:42:47 UTC (rev 4625)
@@ -35,7 +35,6 @@
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.util.ExecutorUtils;
import org.teiid.dqp.internal.process.CachedResults;
import org.teiid.dqp.internal.process.QueryProcessorFactoryImpl;
import org.teiid.dqp.internal.process.SessionAwareCache;
@@ -45,9 +44,9 @@
import org.teiid.query.optimizer.capabilities.DefaultCapabilitiesFinder;
import org.teiid.query.optimizer.relational.RelationalPlanner;
import org.teiid.query.tempdata.GlobalTableStoreImpl;
+import org.teiid.query.tempdata.GlobalTableStoreImpl.MatTableInfo;
import org.teiid.query.tempdata.TempTableDataManager;
import org.teiid.query.tempdata.TempTableStore;
-import org.teiid.query.tempdata.GlobalTableStoreImpl.MatTableInfo;
import org.teiid.query.tempdata.TempTableStore.TransactionMode;
import org.teiid.query.unittest.RealMetadataFactory;
import org.teiid.query.util.CommandContext;
@@ -66,6 +65,7 @@
tempStore = new TempTableStore("1", TransactionMode.ISOLATE_WRITES);
//$NON-NLS-1$
BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
QueryMetadataInterface actualMetadata =
RealMetadataFactory.exampleMaterializedView();
+
globalStore = new GlobalTableStoreImpl(bm, actualMetadata);
metadata = new TempMetadataAdapter(actualMetadata, tempStore.getMetadataStore());
hdm = new HardcodedDataManager();
@@ -75,7 +75,7 @@
SessionAwareCache<CachedResults> cache = new
SessionAwareCache<CachedResults>();
cache.setBufferManager(bm);
- dataManager = new TempTableDataManager(hdm, bm, ExecutorUtils.getDirectExecutor(),
cache);
+ dataManager = new TempTableDataManager(hdm, bm, cache);
}
private void execute(String sql, List<?>... expectedResults) throws Exception {
@@ -83,6 +83,7 @@
cc.setTempTableStore(tempStore);
cc.setGlobalTableStore(globalStore);
cc.setMetadata(metadata);
+
CapabilitiesFinder finder = new DefaultCapabilitiesFinder();
previousPlan = TestProcessor.helpGetPlan(TestProcessor.helpParse(sql), metadata,
finder, cc);
cc.setQueryProcessorFactory(new
QueryProcessorFactoryImpl(BufferManagerFactory.getStandaloneBufferManager(), dataManager,
finder, null, metadata));
@@ -128,6 +129,7 @@
}
@Test public void testTtl() throws Exception {
+
execute("SELECT * from vgroup4 where x = 'one'",
Arrays.asList("one"));
assertEquals(1, hdm.getCommandHistory().size());
execute("SELECT * from vgroup4 where x is null",
Arrays.asList((String)null));
Modified:
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
---
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2014-01-17
20:57:20 UTC (rev 4624)
+++
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2014-01-21
15:42:47 UTC (rev 4625)
@@ -29,7 +29,15 @@
import java.math.BigInteger;
import java.sql.SQLException;
import java.sql.Timestamp;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
import org.junit.Test;
import org.teiid.api.exception.query.QueryPlannerException;
@@ -46,7 +54,6 @@
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.types.XMLType;
-import org.teiid.core.util.ExecutorUtils;
import org.teiid.core.util.PropertiesUtils;
import org.teiid.dqp.internal.process.CachedResults;
import org.teiid.dqp.internal.process.PreparedPlan;
@@ -67,8 +74,8 @@
import org.teiid.query.optimizer.FakeFunctionMetadataSource;
import org.teiid.query.optimizer.QueryOptimizer;
import org.teiid.query.optimizer.TestOptimizer;
+import org.teiid.query.optimizer.TestOptimizer.ComparisonMode;
import org.teiid.query.optimizer.TestRuleRaiseNull;
-import org.teiid.query.optimizer.TestOptimizer.ComparisonMode;
import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
import org.teiid.query.optimizer.capabilities.CapabilitiesFinder;
import org.teiid.query.optimizer.capabilities.DefaultCapabilitiesFinder;
@@ -249,7 +256,7 @@
if (!(dataManager instanceof TempTableDataManager)) {
SessionAwareCache<CachedResults> cache = new
SessionAwareCache<CachedResults>();
cache.setBufferManager(bufferMgr);
- dataManager = new TempTableDataManager(dataManager, bufferMgr,
ExecutorUtils.getDirectExecutor(), cache);
+ dataManager = new TempTableDataManager(dataManager, bufferMgr, cache);
}
if (context.getQueryProcessorFactory() == null) {
context.setQueryProcessorFactory(new QueryProcessorFactoryImpl(bufferMgr,
dataManager, new DefaultCapabilitiesFinder(), null, context.getMetadata()));
Modified:
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
===================================================================
---
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java 2014-01-17
20:57:20 UTC (rev 4624)
+++
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java 2014-01-21
15:42:47 UTC (rev 4625)
@@ -43,7 +43,6 @@
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.util.ExecutorUtils;
import org.teiid.dqp.internal.process.CachedResults;
import org.teiid.dqp.internal.process.SessionAwareCache;
import org.teiid.dqp.service.TransactionContext;
@@ -94,7 +93,7 @@
BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
SessionAwareCache<CachedResults> cache = new
SessionAwareCache<CachedResults>();
cache.setBufferManager(bm);
- dataManager = new TempTableDataManager(fdm, bm, ExecutorUtils.getDirectExecutor(),
cache);
+ dataManager = new TempTableDataManager(fdm, bm, cache);
}
@Test public void testRollbackNoExisting() throws Exception {