Author: shawkins
Date: 2011-08-26 11:17:29 -0400 (Fri, 26 Aug 2011)
New Revision: 3425
Added:
trunk/api/src/main/java/org/teiid/Replicated.java
trunk/cache-jbosscache/src/main/java/org/teiid/replication/
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
trunk/engine/src/main/java/org/teiid/query/ObjectReplicator.java
trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
Removed:
trunk/cache-jbosscache/src/main/java/org/teiid/events/jboss/
Modified:
trunk/api/src/main/java/org/teiid/events/EventDistributor.java
trunk/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml
trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
trunk/build/pom.xml
trunk/cache-jbosscache/pom.xml
trunk/connectors/connector-file/src/main/rar/META-INF/ra.xml
trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
trunk/engine/src/main/java/org/teiid/query/tempdata/AlterTempTable.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
trunk/engine/src/test/java/org/teiid/query/optimizer/relational/TestMaterialization.java
trunk/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
trunk/pom.xml
trunk/runtime/src/main/java/org/teiid/deployers/CompositeVDB.java
trunk/runtime/src/main/java/org/teiid/deployers/VDBLifeCycleListener.java
trunk/runtime/src/main/java/org/teiid/deployers/VDBRepository.java
trunk/runtime/src/test/java/org/teiid/deployers/TestCompositeVDB.java
trunk/test-integration/common/pom.xml
trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
Log:
TEIID-1727 TEIID-1673 adding matview load coordination and replication directly into
jgroups and correcting pom warnings
Added: trunk/api/src/main/java/org/teiid/Replicated.java
===================================================================
--- trunk/api/src/main/java/org/teiid/Replicated.java (rev 0)
+++ trunk/api/src/main/java/org/teiid/Replicated.java 2011-08-26 15:17:29 UTC (rev 3425)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Used to replicate Teiid components - this should be used in extension logic.
+ */
+(a)Target({ElementType.METHOD})
+(a)Retention(RetentionPolicy.RUNTIME)
+@Inherited
+@Documented
+public @interface Replicated {
+ /**
+ * @return true if members should be called asynchronously. asynch methods should be
void.
+ */
+ boolean asynch() default true;
+ /**
+ * @return the timeout in milliseconds, or 0 if no timeout. affects only synch calls.
+ */
+ long timeout() default 0;
+ /**
+ * @return true if only remote members should be called. should not be used with
replicateState. method should be void.
+ */
+ boolean remoteOnly() default false;
+ /**
+ * @return true if the remote members should have a partial state replication called
using the first argument as the state after
+ * the local method has been invoked. should not be used with remoteOnly.
+ */
+ boolean replicateState() default false;
+
+}
\ No newline at end of file
Property changes on: trunk/api/src/main/java/org/teiid/Replicated.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/api/src/main/java/org/teiid/events/EventDistributor.java
===================================================================
--- trunk/api/src/main/java/org/teiid/events/EventDistributor.java 2011-08-25 01:22:57 UTC
(rev 3424)
+++ trunk/api/src/main/java/org/teiid/events/EventDistributor.java 2011-08-26 15:17:29 UTC
(rev 3425)
@@ -24,6 +24,7 @@
import java.util.List;
+import org.teiid.Replicated;
import org.teiid.metadata.ColumnStats;
import org.teiid.metadata.Table;
import org.teiid.metadata.TableStats;
@@ -45,6 +46,7 @@
* @param tuple
* @param delete
*/
+ @Replicated(remoteOnly=true)
void updateMatViewRow(String vdbName, int vdbVersion, String schema, String viewName,
List<?> tuple, boolean delete);
/**
@@ -54,6 +56,7 @@
* @param schema
* @param tableNames
*/
+ @Replicated(remoteOnly=true)
void dataModification(String vdbName, int vdbVersion, String schema, String...
tableNames);
/**
@@ -65,6 +68,7 @@
* @param columnName
* @param stats
*/
+ @Replicated(remoteOnly=true)
void setColumnStats(String vdbName, int vdbVersion, String schemaName,
String tableName, String columnName, ColumnStats stats);
@@ -76,6 +80,7 @@
* @param tableName
* @param stats
*/
+ @Replicated(remoteOnly=true)
void setTableStats(String vdbName, int vdbVersion, String schemaName,
String tableName, TableStats stats);
@@ -87,6 +92,7 @@
* @param name
* @param value
*/
+ @Replicated(remoteOnly=true)
void setProperty(String vdbName, int vdbVersion, String uuid, String name, String
value);
/**
@@ -99,6 +105,7 @@
* @param triggerDefinition
* @param enabled
*/
+ @Replicated(remoteOnly=true)
void setInsteadOfTriggerDefinition(String vdbName, int vdbVersion, String schema, String
viewName, Table.TriggerEvent triggerEvent, String triggerDefinition, Boolean enabled);
/**
@@ -109,6 +116,7 @@
* @param procName
* @param definition
*/
+ @Replicated(remoteOnly=true)
void setProcedureDefinition(String vdbName, int vdbVersion, String schema, String
procName, String definition);
/**
@@ -119,13 +127,7 @@
* @param viewName
* @param definition
*/
+ @Replicated(remoteOnly=true)
void setViewDefinition(String vdbName, int vdbVersion, String schema, String viewName,
String definition);
- /**
- *
- * @param vdbName
- * @param vdbVersion
- * @param viewName
- */
- void refreshMatView(String vdbName, int vdbVersion, String tableName);
}
Modified:
trunk/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml
===================================================================
---
trunk/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml 2011-08-25
01:22:57 UTC (rev 3424)
+++
trunk/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml 2011-08-26
15:17:29 UTC (rev 3425)
@@ -138,14 +138,13 @@
</property>
</bean>
- <bean name="EventDistributorFactory"
class="org.teiid.events.jboss.JGroupsEventDistributor">
- <property name="jndiName">teiid/event-distributor</property>
+ <bean name="ObjectReplicator"
class="org.teiid.replication.jboss.JGroupsObjectReplicator">
+ <property name="jndiName">teiid/replicator</property>
<property name="channelFactory">
<inject bean="JChannelFactory" />
</property>
- <property
name="clusterName">${jboss.partition.name:DefaultPartition}-teiid-events</property>
+ <property
name="clusterName">${jboss.partition.name:DefaultPartition}-teiid-rep</property>
<property
name="multiplexerStack">${jboss.default.jgroups.stack:udp}</property>
- <property
name="localEventDistributorName">teiid/engine-deployer</property>
</bean>
</deployment>
\ No newline at end of file
Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-08-26
15:17:29 UTC (rev 3425)
@@ -131,7 +131,7 @@
<!-- Maximum size of lob allowed through ODBC connection in bytes (default
5MB) -->
<property name="maxODBCLobSizeAllowed">5242880</property>
<!-- The JNDI name of the Teiid Event Distributor -->
- <property
name="eventDistributorName">teiid/event-distributor</property>
+ <property
name="objectReplicatorName">teiid/replicator</property>
<!-- Set to true for the engine to detect local change events. Should be
disabled if using external change data capture tools. (default true) -->
<property name="detectingChangeEvents">true</property>
</bean>
Modified: trunk/build/pom.xml
===================================================================
--- trunk/build/pom.xml 2011-08-25 01:22:57 UTC (rev 3424)
+++ trunk/build/pom.xml 2011-08-26 15:17:29 UTC (rev 3425)
@@ -12,7 +12,7 @@
<dependency>
<groupId>org.jboss.teiid</groupId>
<artifactId>teiid-client-jdk15</artifactId>
- <version>${version}</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.sf.retrotranslator</groupId>
@@ -119,9 +119,9 @@
<mainClass>net.sf.retrotranslator.transformer.Retrotranslator</mainClass>
<arguments>
<argument>-srcjar</argument>
-
<argument>${pom.basedir}/target/teiid-${pom.version}-client.jar</argument>
+
<argument>${project.basedir}/target/teiid-${project.version}-client.jar</argument>
<argument>-destjar</argument>
-
<argument>${pom.basedir}/target/teiid-${pom.version}-client-jdk15.jar</argument>
+
<argument>${project.basedir}/target/teiid-${project.version}-client-jdk15.jar</argument>
<argument>-embed</argument>
<argument>org.teiid.retroruntime</argument>
</arguments>
Modified: trunk/cache-jbosscache/pom.xml
===================================================================
--- trunk/cache-jbosscache/pom.xml 2011-08-25 01:22:57 UTC (rev 3424)
+++ trunk/cache-jbosscache/pom.xml 2011-08-26 15:17:29 UTC (rev 3425)
@@ -30,6 +30,10 @@
<artifactId>jbosscache-core</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
<dependency>
<groupId>org.jboss.man</groupId>
<artifactId>jboss-managed</artifactId>
Added:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
===================================================================
---
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
(rev 0)
+++
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jboss;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class JGroupsInputStream extends InputStream {
+
+ static long TIME_OUT = 15000; //TODO make configurable
+
+ private volatile byte[] buf;
+ private volatile int index=0;
+ private ReentrantLock lock = new ReentrantLock();
+ private Condition write = lock.newCondition();
+ private Condition doneReading = lock.newCondition();
+
+ @Override
+ public int read() throws IOException {
+ if (index < 0) {
+ return -1;
+ }
+ if (buf == null) {
+ lock.lock();
+ try {
+ write.await(TIME_OUT, TimeUnit.MILLISECONDS);
+ if (index < 0) {
+ return -1;
+ }
+ if (buf == null) {
+ throw new IOException(new TimeoutException());
+ }
+ } catch(InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ if (index == buf.length) {
+ lock.lock();
+ try {
+ buf = null;
+ index = 0;
+ doneReading.signal();
+ } finally {
+ lock.unlock();
+ }
+ return read();
+ }
+ return buf[index++] & 0xff;
+ }
+
+ @Override
+ public void close() {
+ lock.lock();
+ try {
+ buf = null;
+ index = -1;
+ doneReading.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void receive(byte[] bytes) throws InterruptedException {
+ lock.lock();
+ try {
+ if (index == -1) {
+ return;
+ }
+ if (buf != null) {
+ doneReading.await();
+ }
+ if (index == -1) {
+ return;
+ }
+ buf = bytes;
+ if (bytes == null) {
+ index = -1;
+ }
+ write.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+}
\ No newline at end of file
Property changes on:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
---
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
(rev 0)
+++
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -0,0 +1,446 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jboss;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.ChannelFactory;
+import org.jgroups.ExtendedReceiverAdapter;
+import org.jgroups.Message;
+import org.jgroups.View;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.MethodLookup;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.util.Promise;
+import org.jgroups.util.RspList;
+import org.jgroups.util.Util;
+import org.teiid.Replicated;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.query.ObjectReplicator;
+import org.teiid.query.ReplicatedObject;
+
+public class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
+
+ private static final long serialVersionUID = -6851804958313095166L;
+ private static final String CREATE_STATE = "createState"; //$NON-NLS-1$
+ private static final String BUILD_STATE = "buildState"; //$NON-NLS-1$
+ private static final String FINISH_STATE = "finishState"; //$NON-NLS-1$
+
+ private final class StreamingRunner implements Runnable {
+ private final Object object;
+ private final String stateId;
+ private final JGroupsInputStream is;
+
+ private StreamingRunner(Object object, String stateId, JGroupsInputStream is) {
+ this.object = object;
+ this.stateId = stateId;
+ this.is = is;
+ }
+
+ @Override
+ public void run() {
+ try {
+ ((ReplicatedObject)object).setState(stateId, is);
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set " + stateId);
//$NON-NLS-1$
+ } catch (Exception e) {
+ LogManager.logError(LogConstants.CTX_RUNTIME, e, "error setting state " +
stateId); //$NON-NLS-1$
+ } finally {
+ is.close();
+ }
+ }
+ }
+
+ private final static class ReplicatedInvocationHandler<S> extends
ExtendedReceiverAdapter implements
+ InvocationHandler, Serializable {
+
+ private static final long serialVersionUID = -2943462899945966103L;
+ private final S object;
+ private RpcDispatcher disp;
+ private final HashMap<Method, Short> methodMap;
+ protected Vector<Address> remoteMembers = new Vector<Address>();
+ protected final transient Promise<Boolean> state_promise=new
Promise<Boolean>();
+
+ private ReplicatedInvocationHandler(S object,
+ HashMap<Method, Short> methodMap) {
+ this.object = object;
+ this.methodMap = methodMap;
+ }
+
+ public void setDisp(RpcDispatcher disp) {
+ this.disp = disp;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ Short methodNum = methodMap.get(method);
+ if (methodNum == null || remoteMembers.isEmpty()) {
+ if (methodNum != null) {
+ Replicated annotation = method.getAnnotation(Replicated.class);
+ if (annotation != null && annotation.remoteOnly()) {
+ return null;
+ }
+ }
+ try {
+ return method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ }
+ try {
+ Replicated annotation = method.getAnnotation(Replicated.class);
+ if (annotation.replicateState()) {
+ Object result = null;
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ Vector<Address> dests = null;
+ synchronized (remoteMembers) {
+ dests = new Vector<Address>(remoteMembers);
+ }
+ ReplicatedObject ro = (ReplicatedObject)object;
+ String stateId = (String)args[0];
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating
state", stateId); //$NON-NLS-1$
+ JGroupsOutputStream oStream = new JGroupsOutputStream(disp, dests, stateId,
(short)(methodMap.size() - 3));
+ try {
+ ro.getState(stateId, oStream);
+ } finally {
+ oStream.close();
+ }
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state",
stateId); //$NON-NLS-1$
+ return result;
+ }
+ MethodCall call=new MethodCall(methodNum, args);
+ Vector<Address> dests = null;
+ if (annotation.remoteOnly()) {
+ synchronized (remoteMembers) {
+ dests = new Vector<Address>(remoteMembers);
+ }
+ }
+ RspList responses = disp.callRemoteMethods(dests, call,
annotation.asynch()?GroupRequest.GET_NONE:GroupRequest.GET_ALL, annotation.timeout());
+ if (annotation.asynch()) {
+ return null;
+ }
+ Vector<Object> results = responses.getResults();
+ if (method.getReturnType() == boolean.class) {
+ for (Object o : results) {
+ if (!Boolean.TRUE.equals(o)) {
+ return false;
+ }
+ }
+ return true;
+ } else if (method.getReturnType() == Collection.class) {
+ ArrayList<Object> result = new ArrayList<Object>();
+ for (Object o : results) {
+ result.addAll((Collection)o);
+ }
+ return results;
+ }
+ return null;
+ } catch(Exception e) {
+ throw new RuntimeException(method + " " + args + "
failed"); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ }
+
+ @Override
+ public void viewAccepted(View newView) {
+ if (newView.getMembers() != null) {
+ synchronized (remoteMembers) {
+ remoteMembers.removeAll(newView.getMembers());
+ if (object instanceof ReplicatedObject && !remoteMembers.isEmpty()) {
+ ((ReplicatedObject)object).droppedMembers(new
HashSet<Serializable>(remoteMembers));
+ }
+ remoteMembers.clear();
+ remoteMembers.addAll(newView.getMembers());
+ remoteMembers.remove(this.disp.getChannel().getLocalAddress());
+ }
+ }
+ }
+
+ @Override
+ public void setState(InputStream istream) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loading initial
state"); //$NON-NLS-1$
+ try {
+ ((ReplicatedObject)object).setState(istream);
+ state_promise.setResult(Boolean.TRUE);
+ } catch (Exception e) {
+ state_promise.setResult(Boolean.FALSE);
+ LogManager.logError(LogConstants.CTX_RUNTIME, e, "error loading initial
state"); //$NON-NLS-1$
+ } finally {
+ Util.close(istream);
+ }
+ }
+
+ @Override
+ public void getState(OutputStream ostream) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "getting initial
state"); //$NON-NLS-1$
+ try {
+ ((ReplicatedObject)object).getState(ostream);
+ } catch (Exception e) {
+ LogManager.logError(LogConstants.CTX_RUNTIME, e, "error gettting initial
state"); //$NON-NLS-1$
+ } finally {
+ Util.close(ostream);
+ }
+ }
+ }
+
+ private interface Streaming {
+ void createState(String id);
+ void buildState(String id, byte[] bytes);
+ void finishState(String id);
+ }
+
+ private transient ChannelFactory channelFactory;
+ private String multiplexerStack;
+ private String clusterName;
+ private String jndiName;
+ //TODO: this should be configurable, or use a common executor
+ private transient Executor executor = Executors.newCachedThreadPool();
+
+ public ChannelFactory getChannelFactory() {
+ return channelFactory;
+ }
+
+ public void setJndiName(String jndiName) {
+ this.jndiName = jndiName;
+ }
+
+ public String getJndiName() {
+ return jndiName;
+ }
+
+ public String getMultiplexerStack() {
+ return multiplexerStack;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setChannelFactory(ChannelFactory channelFactory) {
+ this.channelFactory = channelFactory;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public void setMultiplexerStack(String multiplexerStack) {
+ this.multiplexerStack = multiplexerStack;
+ }
+
+ public void start() throws Exception {
+ if (this.channelFactory == null) {
+ return; //no need to distribute events
+ }
+ if (jndiName != null) {
+ final InitialContext ic = new InitialContext();
+ org.jboss.util.naming.Util.bind(ic, jndiName, this);
+ }
+ }
+
+ public void stop() {
+ if (jndiName != null) {
+ final InitialContext ic ;
+ try {
+ ic = new InitialContext() ;
+ org.jboss.util.naming.Util.unbind(ic, jndiName) ;
+ } catch (final NamingException ne) {
+ }
+ }
+ }
+
+ public void stop(Object object) {
+ ReplicatedInvocationHandler<?> handler = (ReplicatedInvocationHandler<?>)
Proxy.getInvocationHandler(object);
+ Channel c = handler.disp.getChannel();
+ handler.disp.stop();
+ c.close();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T, S> T replicate(String mux_id,
+ Class<T> iface, final S object, long startTimeout) throws Exception {
+ Channel channel = this.channelFactory.createMultiplexerChannel(this.multiplexerStack,
mux_id);
+ Method[] methods = iface.getMethods();
+
+ final HashMap<Method, Short> methodMap = new HashMap<Method, Short>();
+ final ArrayList<Method> methodList = new ArrayList<Method>();
+
+ for (Method method : methods) {
+ if (method.getAnnotation(Replicated.class) == null) {
+ continue;
+ }
+ methodList.add(method);
+ methodMap.put(method, (short)(methodList.size() - 1));
+ }
+
+ //add in streaming methods
+ Method createState = JGroupsObjectReplicator.Streaming.class.getMethod(CREATE_STATE,
new Class<?>[] {String.class});
+ methodList.add(createState);
+ methodMap.put(createState, (short)(methodList.size() - 1));
+ Method buildState = JGroupsObjectReplicator.Streaming.class.getMethod(BUILD_STATE, new
Class<?>[] {String.class, byte[].class});
+ methodList.add(buildState);
+ methodMap.put(buildState, (short)(methodList.size() - 1));
+ Method finishState = JGroupsObjectReplicator.Streaming.class.getMethod(FINISH_STATE,
new Class<?>[] {String.class});
+ methodList.add(finishState);
+ methodMap.put(finishState, (short)(methodList.size() - 1));
+
+ ReplicatedInvocationHandler<S> proxy = new
ReplicatedInvocationHandler<S>(object, methodMap);
+ /*
+ * TODO: could have an object implement streaming
+ * Override the normal handle method to support streaming
+ */
+ RpcDispatcher disp = new RpcDispatcher(channel, proxy, proxy, object) {
+ Map<List<?>, JGroupsInputStream> inputStreams = new
ConcurrentHashMap<List<?>, JGroupsInputStream>();
+ @Override
+ public Object handle(Message req) {
+ Object body=null;
+
+ if(req == null || req.getLength() == 0) {
+ if(log.isErrorEnabled()) log.error("message or message buffer is
null"); //$NON-NLS-1$
+ return null;
+ }
+
+ try {
+ body=req_marshaller != null?
+ req_marshaller.objectFromByteBuffer(req.getBuffer(),
req.getOffset(), req.getLength())
+ : req.getObject();
+ }
+ catch(Throwable e) {
+ if(log.isErrorEnabled()) log.error("exception marshalling
object", e); //$NON-NLS-1$
+ return e;
+ }
+
+ if(!(body instanceof MethodCall)) {
+ if(log.isErrorEnabled()) log.error("message does not contain a
MethodCall object"); //$NON-NLS-1$
+
+ // create an exception to represent this and return it
+ return new IllegalArgumentException("message does not contain a
MethodCall object") ; //$NON-NLS-1$
+ }
+
+ final MethodCall method_call=(MethodCall)body;
+
+ try {
+ if(log.isTraceEnabled())
+ log.trace("[sender=" + req.getSrc() + "], method_call:
" + method_call); //$NON-NLS-1$ //$NON-NLS-2$
+
+ if(method_lookup == null)
+ throw new Exception("MethodCall uses ID=" +
method_call.getId() + ", but method_lookup has not been set"); //$NON-NLS-1$
//$NON-NLS-2$
+
+ if (method_call.getId() >= methodList.size() - 3) {
+ Serializable address = req.getSrc();
+ String stateId = (String)method_call.getArgs()[0];
+ List<?> key = Arrays.asList(stateId, address);
+ JGroupsInputStream is = inputStreams.get(key);
+ if (method_call.getId() == methodList.size() - 3) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "create
state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive(null);
+ }
+ is = new JGroupsInputStream();
+ this.inputStreams.put(key, is);
+ executor.execute(new StreamingRunner(object, stateId, is));
+ } else if (method_call.getId() == methodList.size() - 2) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "building
state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive((byte[])method_call.getArgs()[1]);
+ }
+ } else if (method_call.getId() == methodList.size() - 1) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "finished
state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive(null);
+ }
+ this.inputStreams.remove(key);
+ }
+ return null;
+ }
+
+ Method m=method_lookup.findMethod(method_call.getId());
+ if(m == null)
+ throw new Exception("no method found for " +
method_call.getId()); //$NON-NLS-1$
+ method_call.setMethod(m);
+
+ return method_call.invoke(server_obj);
+ }
+ catch(Throwable x) {
+ return x;
+ }
+ }
+ };
+
+ proxy.setDisp(disp);
+ disp.setMethodLookup(new MethodLookup() {
+ public Method findMethod(short id) {
+ return methodList.get(id);
+ }
+ });
+
+ T replicatedProxy = (T)
Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]
{iface}, proxy);
+
+ channel.connect(clusterName);
+ if (object instanceof ReplicatedObject) {
+ ((ReplicatedObject)object).setLocalAddress(channel.getLocalAddress());
+ boolean getState = channel.getState(null, startTimeout);
+ if (getState) {
+ boolean loaded = proxy.state_promise.getResult(startTimeout);
+ if (loaded) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded");
//$NON-NLS-1$
+ } else {
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load timeout");
//$NON-NLS-1$
+ }
+ } else {
+ LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " first member or timeout
exceeded"); //$NON-NLS-1$
+ }
+ }
+
+ return replicatedProxy;
+ }
+
+}
Property changes on:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
===================================================================
---
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
(rev 0)
+++
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -0,0 +1,96 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jboss;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Vector;
+
+import org.jgroups.Address;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.RpcDispatcher;
+import org.teiid.core.types.Streamable;
+
+public class JGroupsOutputStream extends OutputStream {
+
+ static final int CHUNK_SIZE=Streamable.STREAMING_BATCH_SIZE_IN_BYTES;
+
+ protected final RpcDispatcher disp;
+ protected final Vector<Address> dests;
+ protected final String stateId;
+ protected final short methodOffset;
+
+ private volatile boolean closed=false;
+ private final byte[] buffer=new byte[CHUNK_SIZE];
+ private int index=0;
+
+ public JGroupsOutputStream(RpcDispatcher disp, Vector<Address> dests, String
stateId, short methodOffset) {
+ this.disp=disp;
+ this.dests=dests;
+ this.stateId=stateId;
+ this.methodOffset = methodOffset;
+ disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[]
{stateId}), GroupRequest.GET_NONE, 0);
+ }
+
+ public void close() throws IOException {
+ if(closed) {
+ return;
+ }
+ flush();
+ try {
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new
Object[] {stateId}), GroupRequest.GET_NONE, 0);
+ } catch(Exception e) {
+ }
+ closed=true;
+ }
+
+ public void flush() throws IOException {
+ checkClosed();
+ try {
+ if(index == 0) {
+ return;
+ }
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new
Object[] {stateId, Arrays.copyOf(buffer, index)}), GroupRequest.GET_NONE, 0);
+ index=0;
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void checkClosed() throws IOException {
+ if(closed) {
+ throw new IOException("output stream is closed"); //$NON-NLS-1$
+ }
+ }
+
+ public void write(int b) throws IOException {
+ checkClosed();
+ if(index >= buffer.length) {
+ flush();
+ }
+ buffer[index++]=(byte)b;
+ }
+
+}
\ No newline at end of file
Property changes on:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/connectors/connector-file/src/main/rar/META-INF/ra.xml
===================================================================
--- trunk/connectors/connector-file/src/main/rar/META-INF/ra.xml 2011-08-25 01:22:57 UTC
(rev 3424)
+++ trunk/connectors/connector-file/src/main/rar/META-INF/ra.xml 2011-08-26 15:17:29 UTC
(rev 3425)
@@ -1,10 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
-<connector
xmlns="http://java.sun.com/xml/ns/j2ee"
-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-
xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee
-
http://java.sun.com/xml/ns/j2ee/connector_1_5.xsd"
- version="1.5">
+<connector version="1.5">
<vendor-name>Red Hat Middleware LLC</vendor-name>
<eis-type>Teiid Text Connector</eis-type>
@@ -55,7 +51,7 @@
<config-property>
<description>{$display:"Allow Parent
Paths"}</description>
<config-property-name>AllowParentPaths</config-property-name>
-
<config-property-type>java.lang.Boolean</config-property-type>
+ <config-property-type>boolean</config-property-type>
<config-property-value>true</config-property-value>
</config-property>
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/STree.java 2011-08-25 01:22:57 UTC
(rev 3424)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/STree.java 2011-08-26 15:17:29 UTC
(rev 3425)
@@ -23,6 +23,8 @@
package org.teiid.common.buffer;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
@@ -31,6 +33,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
+import org.teiid.client.BatchSerializer;
import org.teiid.common.buffer.SPage.SearchResult;
import org.teiid.core.TeiidComponentException;
import org.teiid.query.processor.relational.ListNestedSortComparator;
@@ -88,6 +91,29 @@
this.keytypes = Arrays.copyOf(types, keyLength);
}
+ public void writeValuesTo(ObjectOutputStream oos) throws TeiidComponentException,
IOException {
+ SPage page = header[0];
+ oos.writeInt(this.rowCount.get());
+ while (true) {
+ TupleBatch batch = page.getValues();
+ BatchSerializer.writeBatch(oos, types, batch.getAllTuples());
+ if (page.next == null) {
+ break;
+ }
+ }
+ }
+
+ public void readValuesFrom(ObjectInputStream ois) throws IOException,
ClassNotFoundException, TeiidComponentException {
+ int size = ois.readInt();
+ int sizeHint = this.getExpectedHeight(size);
+ while (this.getRowCount() < size) {
+ List[] batch = BatchSerializer.readBatch(ois, types);
+ for (List list : batch) {
+ this.insert(list, InsertMode.ORDERED, sizeHint);
+ }
+ }
+ }
+
protected SPage findChildTail(SPage page) {
if (page == null) {
page = header[header.length - 1];
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -41,7 +41,7 @@
import org.teiid.query.metadata.TempMetadataID;
import org.teiid.query.metadata.TransformationMetadata;
import org.teiid.query.optimizer.relational.RelationalPlanner;
-import org.teiid.query.tempdata.TempTableStore;
+import org.teiid.query.tempdata.GlobalTableStore;
import org.teiid.query.util.CommandContext;
/**
@@ -117,17 +117,17 @@
}
VDBMetaData vdb = DQPWorkContext.getWorkContext().getVDB();
TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
- TempTableStore globalStore = vdb.getAttachment(TempTableStore.class);
+ GlobalTableStore globalStore = vdb.getAttachment(GlobalTableStore.class);
if (!externalNames.isEmpty()) {
this.objectsAccessed = new HashSet<Object>(externalNames.size());
for (List<String> key : this.externalNames) {
if (key.size() == 1) {
String matTableName = key.get(0);
- TempMetadataID id = globalStore.getMetadataStore().getTempGroupID(matTableName);
+ TempMetadataID id =
globalStore.getTempTableStore().getMetadataStore().getTempGroupID(matTableName);
if (id == null) {
//if the id is null, then create a local instance
String viewFullName =
matTableName.substring(RelationalPlanner.MAT_PREFIX.length());
- id = globalStore.getGlobalTempTableMetadataId(tm.getGroupID(viewFullName), tm);
+ id = globalStore.getGlobalTempTableMetadataId(tm.getGroupID(viewFullName));
}
this.objectsAccessed.add(id);
} else {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -28,7 +28,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.FutureTask;
@@ -43,11 +42,10 @@
import org.teiid.adminapi.Request.ThreadState;
import org.teiid.adminapi.impl.CacheStatisticsMetadata;
import org.teiid.adminapi.impl.RequestMetadata;
-import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
import org.teiid.cache.CacheConfiguration;
-import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.client.DQP;
import org.teiid.client.RequestMessage;
import org.teiid.client.ResultsMessage;
@@ -59,29 +57,24 @@
import org.teiid.client.xa.XidImpl;
import org.teiid.common.buffer.BufferManager;
import org.teiid.core.TeiidComponentException;
-import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.types.Streamable;
-import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.RequestID;
import org.teiid.dqp.service.BufferService;
import org.teiid.dqp.service.TransactionContext;
-import org.teiid.dqp.service.TransactionContext.Scope;
import org.teiid.dqp.service.TransactionService;
+import org.teiid.dqp.service.TransactionContext.Scope;
import org.teiid.events.EventDistributor;
import org.teiid.logging.CommandLogMessage;
-import org.teiid.logging.CommandLogMessage.Event;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
+import org.teiid.logging.CommandLogMessage.Event;
import org.teiid.metadata.MetadataRepository;
import org.teiid.query.QueryPlugin;
-import org.teiid.query.metadata.QueryMetadataInterface;
-import org.teiid.query.metadata.TempMetadataAdapter;
-import org.teiid.query.optimizer.relational.RelationalPlanner;
import org.teiid.query.tempdata.TempTableDataManager;
import org.teiid.query.tempdata.TempTableStore;
@@ -738,96 +731,12 @@
DataTierManagerImpl processorDataManager = new
DataTierManagerImpl(this,this.bufferService, this.config.isDetectingChangeEvents());
processorDataManager.setEventDistributor(eventDistributor);
processorDataManager.setMetadataRepository(metadataRepository);
- dataTierMgr = new TempTableDataManager(processorDataManager, this.bufferManager,
this.processWorkerPool, this.rsCache, this.matTables, this.cacheFactory);
+ dataTierMgr = new TempTableDataManager(processorDataManager, this.bufferManager,
this.processWorkerPool, this.rsCache);
dataTierMgr.setEventDistributor(eventDistributor);
LogManager.logDetail(LogConstants.CTX_DQP, "DQPCore started
maxThreads", this.config.getMaxThreads(), "maxActivePlans",
this.maxActivePlans, "source concurrency", this.userRequestSourceConcurrency);
//$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
- public void synchronizeInternalMaterializedViews(final ContextProvider contextProvider)
{
- if (!cacheFactory.isReplicated() || matTables == null) {
- return;
- }
- Set<CacheID> keys = this.matTables.replicatableKeys();
- for (final CacheID key:keys) {
- if (key.getSql().startsWith(RelationalPlanner.MAT_PREFIX)) {
- refreshMatView(contextProvider, key.getVDBKey().getName(),
key.getVDBKey().getVersion(),
key.getSql().substring(RelationalPlanner.MAT_PREFIX.length()));
- }
- }
- }
-
- public void refreshMatView(final ContextProvider contextProvider, final String vdbName,
final int vdbVersion, final String viewName) {
- if (!cacheFactory.isReplicated() || matTables == null) {
- return;
- }
-
- final DQPWorkContext context = contextProvider.getContext(vdbName, vdbVersion);
-
- final VDBMetaData vdb = context.getVDB();
- if (vdb == null) {
- return;
- }
-
- final TempTableStore globalStore = vdb.getAttachment(TempTableStore.class);
- if (globalStore == null) {
- return;
- }
- DQPWorkContext.setWorkContext(context);
-
- Runnable work = new Runnable() {
- @Override
- public void run() {
- QueryMetadataInterface metadata = vdb.getAttachment(QueryMetadataInterface.class);
- TempTableStore tempStore = new TempTableStore("internal"); //$NON-NLS-1$
- TempMetadataAdapter tma = new TempMetadataAdapter(metadata,
tempStore.getMetadataStore());
- try {
- dataTierMgr.refreshMatView(vdb.getName(), vdb.getVersion(), viewName, tma,
globalStore);
- } catch (TeiidException e) {
- LogManager.logError(LogConstants.CTX_DQP, e,
QueryPlugin.Util.getString("error_refresh", viewName )); //$NON-NLS-1$
- }
- }
- };
- addWork(work);
- }
-
- public void updateMatViewRow(final ContextProvider contextProvider, final String
vdbName, final int vdbVersion, final String schema,
- final String viewName, final List<?> tuple, final boolean delete) {
-
- if (!cacheFactory.isReplicated() || matTables == null) {
- return;
- }
-
- final DQPWorkContext context = contextProvider.getContext(vdbName, vdbVersion);
-
- final VDBMetaData vdb = context.getVDB();
- if (vdb == null) {
- return;
- }
-
- final TempTableStore globalStore = vdb.getAttachment(TempTableStore.class);
- if (globalStore == null) {
- return;
- }
-
- Runnable work = new Runnable() {
- @Override
- public void run() {
- context.runInContext(new Runnable() {
- @Override
- public void run() {
- try {
- dataTierMgr.updateMatViewRow(globalStore, RelationalPlanner.MAT_PREFIX + (schema +
'.' + viewName).toUpperCase(), tuple, delete);
- } catch (TeiidException e) {
- LogManager.logError(LogConstants.CTX_DQP, e,
QueryPlugin.Util.getString("DQPCore.unable_to_process_event")); //$NON-NLS-1$
- }
- }
- });
- }
- };
- addWork(work);
-
- }
-
public void setBufferService(BufferService service) {
this.bufferService = service;
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2011-08-25
01:22:57 UTC (rev 3424)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -89,8 +89,8 @@
import org.teiid.query.sql.symbol.Constant;
import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.sql.visitor.GroupCollectorVisitor;
-import org.teiid.query.tempdata.TempTableStore;
-import org.teiid.query.tempdata.TempTableStore.MatTableInfo;
+import org.teiid.query.tempdata.GlobalTableStore;
+import org.teiid.query.tempdata.GlobalTableStoreImpl.MatTableInfo;
import org.teiid.query.util.CommandContext;
/**
@@ -243,13 +243,13 @@
Integer cardinaltity = null;
Boolean valid = null;
if (table.getMaterializedTable() == null) {
- TempTableStore globalStore = context.getGlobalTableStore();
+ GlobalTableStore globalStore = context.getGlobalTableStore();
matTableName = RelationalPlanner.MAT_PREFIX+table.getFullName().toUpperCase();
MatTableInfo info = globalStore.getMatTableInfo(matTableName);
valid = info.isValid();
state = info.getState().name();
updated = info.getUpdateTime()==-1?null:new Timestamp(info.getUpdateTime());
- TempMetadataID id = globalStore.getMetadataStore().getTempGroupID(matTableName);
+ TempMetadataID id =
globalStore.getTempTableStore().getMetadataStore().getTempGroupID(matTableName);
if (id != null) {
cardinaltity = id.getCardinality();
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -83,6 +83,7 @@
import org.teiid.query.sql.symbol.Reference;
import org.teiid.query.sql.visitor.GroupCollectorVisitor;
import org.teiid.query.sql.visitor.ReferenceCollectorVisitor;
+import org.teiid.query.tempdata.GlobalTableStore;
import org.teiid.query.tempdata.TempTableStore;
import org.teiid.query.util.CommandContext;
import org.teiid.query.util.ContextProperties;
@@ -128,7 +129,7 @@
protected Command userCommand;
protected boolean returnsUpdateCount;
- private TempTableStore globalTables;
+ private GlobalTableStore globalTables;
private SessionAwareCache<PreparedPlan> planCache;
private boolean resultSetCacheEnabled = true;
private int userRequestConcurrency;
@@ -185,7 +186,7 @@
VDBMetaData vdbMetadata = workContext.getVDB();
metadata = vdbMetadata.getAttachment(QueryMetadataInterface.class);
- globalTables = vdbMetadata.getAttachment(TempTableStore.class);
+ globalTables = vdbMetadata.getAttachment(GlobalTableStore.class);
if (metadata == null) {
throw new
TeiidComponentException(QueryPlugin.Util.getString("DQPCore.Unable_to_load_metadata_for_VDB_name__{0},_version__{1}",
this.vdbName, this.vdbVersion)); //$NON-NLS-1$
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2011-08-25
01:22:57 UTC (rev 3424)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -287,10 +287,6 @@
return true;
}
- public String getSql() {
- return sql;
- }
-
void setUserName(String name) {
this.userName = name;
}
Added: trunk/engine/src/main/java/org/teiid/query/ObjectReplicator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/ObjectReplicator.java
(rev 0)
+++ trunk/engine/src/main/java/org/teiid/query/ObjectReplicator.java 2011-08-26 15:17:29
UTC (rev 3425)
@@ -0,0 +1,31 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.query;
+
+public interface ObjectReplicator {
+
+ public <T, S> T replicate(String id, Class<T> iface, S object, long
startTimeout) throws Exception;
+
+ public void stop(Object o);
+
+}
Property changes on: trunk/engine/src/main/java/org/teiid/query/ObjectReplicator.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
(rev 0)
+++ trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-08-26 15:17:29
UTC (rev 3425)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.query;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * Optional interface to be implemented by a replicated object to support full and
partial state transfer.
+ *
+ */
+public interface ReplicatedObject {
+
+ /**
+ * Allows an application to write a state through a provided OutputStream.
+ *
+ * @param ostream the OutputStream
+ */
+ void getState(OutputStream ostream);
+
+ /**
+ * Allows an application to write a partial state through a provided OutputStream.
+ *
+ * @param state_id id of the partial state requested
+ * @param ostream the OutputStream
+ */
+ void getState(String state_id, OutputStream ostream);
+
+ /**
+ * Allows an application to read a state through a provided InputStream.
+ *
+ * @param istream the InputStream
+ */
+ void setState(InputStream istream);
+
+ /**
+ * Allows an application to read a partial state through a provided InputStream.
+ *
+ * @param state_id id of the partial state requested
+ * @param istream the InputStream
+ */
+ void setState(String state_id, InputStream istream);
+
+ /**
+ * Allows the replicator to set the local address from the channel
+ * @param address
+ */
+ void setLocalAddress(Serializable address);
+
+ /**
+ * Called when members are dropped
+ * @param addresses
+ */
+ void droppedMembers(Collection<Serializable> addresses);
+
+}
Property changes on: trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified:
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java 2011-08-25
01:22:57 UTC (rev 3424)
+++
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -1216,7 +1216,7 @@
CacheHint hint = null;
boolean isImplicitGlobal = matMetadataId == null;
if (isImplicitGlobal) {
- TempMetadataID tid =
context.getGlobalTableStore().getGlobalTempTableMetadataId(metadataID, metadata);
+ TempMetadataID tid =
context.getGlobalTableStore().getGlobalTempTableMetadataId(metadataID);
matTableName = tid.getID();
hint = tid.getCacheHint();
if (hint != null) {
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2011-08-25
01:22:57 UTC (rev 3424)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -46,7 +46,6 @@
import org.teiid.query.processor.BatchCollector.BatchProducer;
import org.teiid.query.sql.symbol.AliasSymbol;
import org.teiid.query.sql.symbol.Expression;
-import org.teiid.query.sql.symbol.SingleElementSymbol;
import org.teiid.query.util.CommandContext;
@@ -358,7 +357,7 @@
/**
* Helper method for all the node that will filter the elements needed for the next
node.
*/
- public static int[] getProjectionIndexes(Map<SingleElementSymbol, Integer>
tupleElements, List<? extends Expression> projectElements) {
+ public static int[] getProjectionIndexes(Map<? extends Expression, Integer>
tupleElements, List<? extends Expression> projectElements) {
int[] result = new int[projectElements.size()];
int i = 0;
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/AlterTempTable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/AlterTempTable.java 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/AlterTempTable.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -1,12 +1,3 @@
-package org.teiid.query.tempdata;
-
-import java.util.List;
-
-import org.teiid.query.sql.LanguageVisitor;
-import org.teiid.query.sql.lang.Command;
-import org.teiid.query.sql.symbol.ElementSymbol;
-import org.teiid.query.sql.symbol.SingleElementSymbol;
-
/*
* JBoss, Home of Professional Open Source.
* See the COPYRIGHT.txt file distributed with this work for information
@@ -29,6 +20,15 @@
* 02110-1301 USA.
*/
+package org.teiid.query.tempdata;
+
+import java.util.List;
+
+import org.teiid.query.sql.LanguageVisitor;
+import org.teiid.query.sql.lang.Command;
+import org.teiid.query.sql.symbol.ElementSymbol;
+import org.teiid.query.sql.symbol.SingleElementSymbol;
+
public class AlterTempTable extends Command {
private String tempTable;
Added: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
(rev 0)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.query.tempdata;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.teiid.Replicated;
+import org.teiid.api.exception.query.QueryMetadataException;
+import org.teiid.api.exception.query.QueryResolverException;
+import org.teiid.api.exception.query.QueryValidatorException;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.query.metadata.TempMetadataID;
+import org.teiid.query.sql.symbol.GroupSymbol;
+import org.teiid.query.tempdata.GlobalTableStoreImpl.MatTableInfo;
+
+public interface GlobalTableStore {
+
+ TempMetadataID getGlobalTempTableMetadataId(Object groupID) throws
QueryMetadataException, TeiidComponentException, QueryResolverException,
QueryValidatorException;
+
+ TempMetadataID getCodeTableMetadataId(String codeTableName,
+ String returnElementName, String keyElementName,
+ String matTableName) throws TeiidComponentException,
+ QueryMetadataException;
+
+ MatTableInfo getMatTableInfo(String matTableName);
+
+ TempTableStore getTempTableStore();
+
+ Serializable getLocalAddress();
+
+ List<?> updateMatViewRow(String matTableName, List<?> tuple, boolean delete)
throws TeiidComponentException;
+
+ TempTable createMatTable(String tableName, GroupSymbol group)
+ throws TeiidComponentException, QueryMetadataException, QueryResolverException,
QueryValidatorException;
+
+ @Replicated
+ void failedLoad(String matTableName);
+
+ @Replicated(asynch=false, timeout=5000)
+ boolean needsLoading(String matTableName, Serializable loadingAddress,
+ boolean firstPass, boolean refresh, boolean invalidate);
+
+ @Replicated(replicateState=true)
+ void loaded(String matTableName, TempTable table);
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -0,0 +1,471 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.query.tempdata;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.teiid.api.exception.query.QueryMetadataException;
+import org.teiid.api.exception.query.QueryResolverException;
+import org.teiid.api.exception.query.QueryValidatorException;
+import org.teiid.common.buffer.BufferManager;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.types.DataTypeManager;
+import org.teiid.language.SQLConstants;
+import org.teiid.language.SQLConstants.Reserved;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.query.ReplicatedObject;
+import org.teiid.query.mapping.relational.QueryNode;
+import org.teiid.query.metadata.QueryMetadataInterface;
+import org.teiid.query.metadata.TempMetadataAdapter;
+import org.teiid.query.metadata.TempMetadataID;
+import org.teiid.query.metadata.TempMetadataStore;
+import org.teiid.query.optimizer.relational.RelationalPlanner;
+import org.teiid.query.resolver.QueryResolver;
+import org.teiid.query.resolver.util.ResolverUtil;
+import org.teiid.query.sql.lang.CacheHint;
+import org.teiid.query.sql.lang.Command;
+import org.teiid.query.sql.lang.Create;
+import org.teiid.query.sql.symbol.ElementSymbol;
+import org.teiid.query.sql.symbol.GroupSymbol;
+
+public class GlobalTableStoreImpl implements GlobalTableStore, ReplicatedObject {
+
+ public enum MatState {
+ NEEDS_LOADING,
+ LOADING,
+ FAILED_LOAD,
+ LOADED
+ }
+
+ public class MatTableInfo {
+ private long updateTime = -1;
+ private MatState state = MatState.NEEDS_LOADING;
+ private Serializable loadingAddress;
+ private long ttl = -1;
+ private boolean valid;
+
+ protected MatTableInfo() {}
+
+ private synchronized boolean shouldLoad(Serializable possibleLoadingAddress, boolean
firstPass, boolean refresh, boolean invalidate) {
+ if (invalidate) {
+ LogManager.logDetail(LogConstants.CTX_MATVIEWS, this, "invalidating");
//$NON-NLS-1$
+ valid = false;
+ }
+ switch (state) {
+ case NEEDS_LOADING:
+ case FAILED_LOAD:
+ if (!firstPass) {
+ this.loadingAddress = possibleLoadingAddress;
+ setState(MatState.LOADING, null);
+ }
+ return true;
+ case LOADING:
+ if (!firstPass && localAddress instanceof Comparable<?> &&
((Comparable)localAddress).compareTo(possibleLoadingAddress) < 0) {
+ this.loadingAddress = possibleLoadingAddress; //ties go to the lowest address
+ return true;
+ }
+ return false;
+ case LOADED:
+ if (!firstPass
+ || refresh
+ || ttl >= 0 && System.currentTimeMillis() - updateTime - ttl > 0) {
+ if (firstPass) {
+ setState(MatState.NEEDS_LOADING, null);
+ } else {
+ this.loadingAddress = possibleLoadingAddress;
+ setState(MatState.LOADING, null);
+ }
+ return true;
+ }
+ return false;
+ }
+ throw new AssertionError();
+ }
+
+ private synchronized void setState(MatState state, Boolean valid) {
+ MatState oldState = this.state;
+ long timestamp = System.currentTimeMillis();
+ LogManager.logDetail(LogConstants.CTX_MATVIEWS, this, "setting matState to",
state, valid, timestamp, "old values", oldState, this.valid); //$NON-NLS-1$
//$NON-NLS-2$
+ if (valid != null) {
+ this.valid = valid;
+ }
+ this.state = state;
+ this.updateTime = System.currentTimeMillis();
+ notifyAll();
+ }
+
+ public synchronized void setTtl(long ttl) {
+ this.ttl = ttl;
+ }
+
+ public synchronized long getUpdateTime() {
+ return updateTime;
+ }
+
+ public synchronized MatState getState() {
+ return state;
+ }
+
+ public synchronized boolean isUpToDate() {
+ return isValid() && (ttl < 0 || System.currentTimeMillis() - updateTime -
ttl <= 0);
+ }
+
+ public synchronized boolean isValid() {
+ return valid;
+ }
+
+ public synchronized long getTtl() {
+ return ttl;
+ }
+
+ }
+
+ private ConcurrentHashMap<String, MatTableInfo> matTables = new
ConcurrentHashMap<String, MatTableInfo>();
+ private TempTableStore tableStore = new TempTableStore("SYSTEM");
//$NON-NLS-1$
+ private BufferManager bufferManager;
+ private QueryMetadataInterface metadata;
+ private Serializable localAddress;
+
+ public GlobalTableStoreImpl(BufferManager bufferManager, QueryMetadataInterface
metadata) {
+ this.bufferManager = bufferManager;
+ this.metadata = new TempMetadataAdapter(metadata, new TempMetadataStore());
+ }
+
+ public synchronized MatTableInfo getMatTableInfo(final String tableName) {
+ MatTableInfo info = matTables.get(tableName);
+ if (info == null) {
+ info = new MatTableInfo();
+ matTables.put(tableName, info);
+ }
+ return info;
+ }
+
+ @Override
+ public void failedLoad(String matTableName) {
+ MatTableInfo info = getMatTableInfo(matTableName);
+ synchronized (info) {
+ if (info.state != MatState.LOADED) {
+ info.setState(MatState.FAILED_LOAD, null);
+ }
+ }
+ }
+
+ @Override
+ public boolean needsLoading(String matTableName, Serializable loadingAddress, boolean
firstPass, boolean refresh, boolean invalidate) {
+ MatTableInfo info = getMatTableInfo(matTableName);
+ return info.shouldLoad(loadingAddress, firstPass, refresh, invalidate);
+ }
+
+ @Override
+ public TempMetadataID getGlobalTempTableMetadataId(Object viewId)
+ throws QueryMetadataException, TeiidComponentException, QueryResolverException,
QueryValidatorException {
+ String matViewName = metadata.getFullName(viewId);
+ String matTableName = RelationalPlanner.MAT_PREFIX+matViewName.toUpperCase();
+ GroupSymbol group = new GroupSymbol(matViewName);
+ group.setMetadataID(viewId);
+ TempMetadataID id = tableStore.tempMetadataStore.getTempGroupID(matTableName);
+ //define the table preserving the key/index information and ensure that only a single
instance exists
+ if (id == null) {
+ synchronized (viewId) {
+ id = tableStore.tempMetadataStore.getTempGroupID(matTableName);
+ if (id == null) {
+ id = tableStore.tempMetadataStore.addTempGroup(matTableName,
ResolverUtil.resolveElementsInGroup(group, metadata), false, true);
+ id.setQueryNode(metadata.getVirtualPlan(viewId));
+ id.setCardinality(metadata.getCardinality(viewId));
+ id.setOriginalMetadataID(viewId);
+
+ Object pk = metadata.getPrimaryKey(viewId);
+ if (pk != null) {
+ ArrayList<TempMetadataID> primaryKey = resolveIndex(metadata, id, pk);
+ id.setPrimaryKey(primaryKey);
+ }
+ Collection keys = metadata.getUniqueKeysInGroup(viewId);
+ for (Object key : keys) {
+ id.addUniqueKey(resolveIndex(metadata, id, key));
+ }
+ Collection indexes = metadata.getIndexesInGroup(viewId);
+ for (Object index : indexes) {
+ id.addIndex(resolveIndex(metadata, id, index));
+ }
+ }
+ }
+ }
+ updateCacheHint(viewId, group, id);
+ return id;
+ }
+
+ @Override
+ public TempMetadataID getCodeTableMetadataId(
+ String codeTableName, String returnElementName,
+ String keyElementName, String matTableName) throws TeiidComponentException,
+ QueryMetadataException {
+ ElementSymbol keyElement = new ElementSymbol(matTableName + ElementSymbol.SEPARATOR +
keyElementName);
+ ElementSymbol returnElement = new ElementSymbol(matTableName +
ElementSymbol.SEPARATOR + returnElementName);
+ keyElement.setType(DataTypeManager.getDataTypeClass(metadata.getElementType(metadata.getElementID(codeTableName
+ ElementSymbol.SEPARATOR + keyElementName))));
+
returnElement.setType(DataTypeManager.getDataTypeClass(metadata.getElementType(metadata.getElementID(codeTableName
+ ElementSymbol.SEPARATOR + returnElementName))));
+ TempMetadataID id =
this.getTempTableStore().getMetadataStore().getTempGroupID(matTableName);
+ if (id == null) {
+ synchronized (this) {
+ id = this.getTempTableStore().getMetadataStore().addTempGroup(matTableName,
Arrays.asList(keyElement, returnElement), false, true);
+ String queryString = Reserved.SELECT + ' ' + keyElementName + "
," + returnElementName + ' ' + Reserved.FROM + ' ' + codeTableName;
//$NON-NLS-1$
+ id.setQueryNode(new QueryNode(queryString));
+ id.setPrimaryKey(id.getElements().subList(0, 1));
+ CacheHint hint = new CacheHint(true, null);
+ id.setCacheHint(hint);
+ }
+ }
+ return id;
+ }
+
+ private void updateCacheHint(Object viewId, GroupSymbol group,
+ TempMetadataID id) throws TeiidComponentException,
+ QueryMetadataException, QueryResolverException,
+ QueryValidatorException {
+ Command c = QueryResolver.resolveView(group, metadata.getVirtualPlan(viewId),
SQLConstants.Reserved.SELECT, metadata).getCommand();
+ CacheHint hint = c.getCacheHint();
+ id.setCacheHint(hint);
+ }
+
+ static ArrayList<TempMetadataID> resolveIndex(
+ QueryMetadataInterface metadata, TempMetadataID id, Object pk)
+ throws TeiidComponentException, QueryMetadataException {
+ List cols = metadata.getElementIDsInKey(pk);
+ ArrayList<TempMetadataID> primaryKey = new
ArrayList<TempMetadataID>(cols.size());
+ for (Object coldId : cols) {
+ int pos = metadata.getPosition(coldId) - 1;
+ primaryKey.add(id.getElements().get(pos));
+ }
+ return primaryKey;
+ }
+
+ @Override
+ public void loaded(String matTableName, TempTable table) {
+ this.tableStore.swapTempTable(matTableName, table);
+ this.getMatTableInfo(matTableName).setState(MatState.LOADED, true);
+ }
+
+ @Override
+ public List<?> updateMatViewRow(String matTableName, List<?> tuple,
+ boolean delete) throws TeiidComponentException {
+ TempTable tempTable = tableStore.getTempTable(matTableName);
+ if (tempTable != null) {
+ return tempTable.updateTuple(tuple, delete);
+ }
+ return null;
+ }
+
+ @Override
+ public TempTableStore getTempTableStore() {
+ return this.tableStore;
+ }
+
+ @Override
+ public TempTable createMatTable(final String tableName, GroupSymbol group) throws
TeiidComponentException,
+ QueryMetadataException, QueryResolverException, QueryValidatorException {
+ Create create = new Create();
+ create.setTable(group);
+ List<ElementSymbol> allColumns = ResolverUtil.resolveElementsInGroup(group,
metadata);
+ create.setElementSymbolsAsColumns(allColumns);
+ Object pk = metadata.getPrimaryKey(group.getMetadataID());
+ if (pk != null) {
+ List<ElementSymbol> pkColumns = resolveIndex(metadata, allColumns, pk);
+ create.getPrimaryKey().addAll(pkColumns);
+ }
+ TempTable table = getTempTableStore().addTempTable(tableName, create, bufferManager,
false);
+ table.setUpdatable(false);
+ CacheHint hint = table.getCacheHint();
+ if (hint != null) {
+ table.setPreferMemory(hint.getPrefersMemory());
+ if (hint.getTtl() != null) {
+ getMatTableInfo(tableName).setTtl(hint.getTtl());
+ }
+ if (pk != null) {
+ table.setUpdatable(hint.isUpdatable());
+ }
+ }
+ return table;
+ }
+
+ /**
+ * Return a list of ElementSymbols for the given index/key object
+ */
+ public static List<ElementSymbol> resolveIndex(QueryMetadataInterface metadata,
List<ElementSymbol> allColumns, Object pk)
+ throws TeiidComponentException, QueryMetadataException {
+ Collection<?> pkIds = metadata.getElementIDsInKey(pk);
+ List<ElementSymbol> pkColumns = new
ArrayList<ElementSymbol>(pkIds.size());
+ for (Object col : pkIds) {
+ pkColumns.add(allColumns.get(metadata.getPosition(col)-1));
+ }
+ return pkColumns;
+ }
+
+ //begin replication methods
+
+ @Override
+ public void setLocalAddress(Serializable address) {
+ this.localAddress = address;
+ }
+
+ @Override
+ public Serializable getLocalAddress() {
+ return localAddress;
+ }
+
+ @Override
+ public void getState(OutputStream ostream) {
+ try {
+ ObjectOutputStream oos = new ObjectOutputStream(ostream);
+ for (Map.Entry<String, TempTable> entry : tableStore.getTempTables().entrySet())
{
+ sendTable(entry.getKey(), oos, true);
+ }
+ oos.writeObject(null);
+ oos.close();
+ } catch (IOException e) {
+ throw new TeiidRuntimeException(e);
+ } catch (TeiidComponentException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setState(InputStream istream) {
+ try {
+ ObjectInputStream ois = new ObjectInputStream(istream);
+ while (true) {
+ String tableName = (String)ois.readObject();
+ if (tableName == null) {
+ break;
+ }
+ loadTable(tableName, ois);
+ }
+ ois.close();
+ } catch (Exception e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void getState(String stateId, OutputStream ostream) {
+ try {
+ ObjectOutputStream oos = new ObjectOutputStream(ostream);
+ sendTable(stateId, oos, false);
+ oos.close();
+ } catch (IOException e) {
+ throw new TeiidRuntimeException(e);
+ } catch (TeiidComponentException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+
+ private void sendTable(String stateId, ObjectOutputStream oos, boolean writeName)
+ throws IOException, TeiidComponentException {
+ TempTable tempTable = this.tableStore.getTempTable(stateId);
+ if (tempTable == null) {
+ return;
+ }
+ MatTableInfo info = getMatTableInfo(stateId);
+ if (!info.isValid()) {
+ return;
+ }
+ if (writeName) {
+ oos.writeObject(stateId);
+ }
+ oos.writeLong(info.updateTime);
+ oos.writeObject(info.loadingAddress);
+ oos.writeObject(info.state);
+ tempTable.writeTo(oos);
+ }
+
+ @Override
+ public void setState(String stateId, InputStream istream) {
+ try {
+ ObjectInputStream ois = new ObjectInputStream(istream);
+ loadTable(stateId, ois);
+ ois.close();
+ } catch (Exception e) {
+ MatTableInfo info = this.getMatTableInfo(stateId);
+ info.setState(MatState.FAILED_LOAD, null);
+ throw new TeiidRuntimeException(e);
+ }
+ }
+
+ private void loadTable(String stateId, ObjectInputStream ois)
+ throws TeiidComponentException, QueryMetadataException,
+ QueryResolverException, QueryValidatorException, IOException,
+ ClassNotFoundException {
+ LogManager.logDetail(LogConstants.CTX_DQP, "loading table from remote
stream", stateId); //$NON-NLS-1$
+ long updateTime = ois.readLong();
+ Serializable loadingAddress = (Serializable) ois.readObject();
+ MatState state = (MatState)ois.readObject();
+ GroupSymbol group = new GroupSymbol(stateId);
+ if (stateId.startsWith(RelationalPlanner.MAT_PREFIX)) {
+ String viewName = stateId.substring(RelationalPlanner.MAT_PREFIX.length());
+ Object viewId = this.metadata.getGroupID(viewName);
+ group.setMetadataID(getGlobalTempTableMetadataId(viewId));
+ } else {
+ String viewName = stateId.substring(TempTableDataManager.CODE_PREFIX.length());
+ int index = viewName.lastIndexOf('.');
+ String returnElementName = viewName.substring(index + 1);
+ viewName = viewName.substring(0, index);
+ index = viewName.lastIndexOf('.');
+ String keyElementName = viewName.substring(index + 1);
+ viewName = viewName.substring(0, index);
+ group.setMetadataID(getCodeTableMetadataId(viewName, returnElementName,
keyElementName, stateId));
+ }
+ TempTable tempTable = this.createMatTable(stateId, group);
+ tempTable.readFrom(ois);
+ MatTableInfo info = this.getMatTableInfo(stateId);
+ synchronized (info) {
+ this.tableStore.swapTempTable(stateId, tempTable);
+ info.setState(state, true);
+ info.updateTime = updateTime;
+ info.loadingAddress = loadingAddress;
+ }
+ }
+
+ @Override
+ public void droppedMembers(Collection<Serializable> addresses) {
+ for (MatTableInfo info : this.matTables.values()) {
+ synchronized (info) {
+ if (info.getState() == MatState.LOADING
+ && addresses.contains(info.loadingAddress)) {
+ info.setState(MatState.FAILED_LOAD, null);
+ }
+ }
+ }
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java 2011-08-25 01:22:57
UTC (rev 3424)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java 2011-08-26 15:17:29
UTC (rev 3425)
@@ -22,6 +22,9 @@
package org.teiid.query.tempdata;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -72,7 +75,7 @@
* TODO: in this implementation blocked exceptions will not happen
* allowing for subquery evaluation though would cause pauses
*/
-class TempTable {
+public class TempTable {
private final class InsertUpdateProcessor extends UpdateProcessor {
@@ -275,7 +278,7 @@
private int keyBatchSize;
private int leafBatchSize;
- private Map columnMap;
+ private Map<ElementSymbol, Integer> columnMap;
private List<Integer> notNull = new LinkedList<Integer>();
private Map<Integer, AtomicInteger> sequences;
@@ -326,8 +329,17 @@
if (keyColumns.equals(indexColumns) || (indexTables != null &&
indexTables.containsKey(indexColumns))) {
return;
}
+ TempTable indexTable = createIndexTable(indexColumns, unique);
+ //TODO: ordered insert optimization
+ TupleSource ts = createTupleSource(indexTable.getColumns(), null, null);
+ indexTable.insert(ts, indexTable.getColumns());
+ indexTable.getTree().compact();
+ }
+
+ private TempTable createIndexTable(List<ElementSymbol> indexColumns,
+ boolean unique) {
List<ElementSymbol> allColumns = new
ArrayList<ElementSymbol>(indexColumns);
- for (ElementSymbol elementSymbol : keyColumns) {
+ for (ElementSymbol elementSymbol : columns.subList(0, tree.getKeyLength())) {
if (allColumns.indexOf(elementSymbol) < 0) {
allColumns.add(elementSymbol);
}
@@ -342,10 +354,8 @@
indexTables = new LinkedHashMap<List<ElementSymbol>, TempTable>();
indexTables.put(indexColumns, indexTable);
}
- //TODO: ordered insert optimization
- TupleSource ts = createTupleSource(allColumns, null, null);
- indexTable.insert(ts, allColumns);
- indexTable.getTree().compact();
+ indexTable.setUpdatable(this.updatable);
+ return indexTable;
}
private int reserveBuffers() {
@@ -531,7 +541,7 @@
BlockedException, TeiidComponentException {
List<Object> newTuple = new ArrayList<Object>(tuple);
for (Map.Entry<ElementSymbol, Expression> entry :
update.getClauseMap().entrySet()) {
- newTuple.set((Integer)columnMap.get(entry.getKey()),
eval.evaluate(entry.getValue(), tuple));
+ newTuple.set(columnMap.get(entry.getKey()), eval.evaluate(entry.getValue(),
tuple));
}
if (primaryKeyChangePossible) {
browser.removed();
@@ -628,6 +638,44 @@
}
}
+ void writeTo(ObjectOutputStream oos) throws TeiidComponentException, IOException {
+ this.lock.readLock().lock();
+ try {
+ this.tree.writeValuesTo(oos);
+ if (this.indexTables == null) {
+ oos.writeInt(0);
+ } else {
+ oos.writeInt(this.indexTables.size());
+ for (Map.Entry<List<ElementSymbol>, TempTable> entry :
this.indexTables.entrySet()) {
+ oos.writeBoolean(entry.getValue().uniqueColIndex > 0);
+ oos.writeInt(entry.getKey().size());
+ for (ElementSymbol es : entry.getKey()) {
+ oos.writeInt(this.columnMap.get(es));
+ }
+ entry.getValue().writeTo(oos);
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ void readFrom(ObjectInputStream ois) throws TeiidComponentException, IOException,
ClassNotFoundException {
+ this.tree.readValuesFrom(ois);
+ int numIdx = ois.readInt();
+ for (int i = 0; i < numIdx; i++) {
+ boolean unique = ois.readBoolean();
+ int numCols = ois.readInt();
+ ArrayList<ElementSymbol> indexColumns = new
ArrayList<ElementSymbol>(numCols);
+ for (int j = 0; j < numCols; j++) {
+ int colIndex = ois.readInt();
+ indexColumns.add(this.columns.get(colIndex));
+ }
+ TempTable tt = this.createIndexTable(indexColumns, unique);
+ tt.readFrom(ois);
+ }
+ }
+
List<?> updateTuple(List<?> tuple, boolean remove) throws
TeiidComponentException {
try {
lock.writeLock().lock();
@@ -698,7 +746,7 @@
return tid.getID() + " (" + columns + ")\n"; //$NON-NLS-1$
//$NON-NLS-2$
}
- Map getColumnMap() {
+ Map<ElementSymbol, Integer> getColumnMap() {
return this.columnMap;
}
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-08-25
01:22:57 UTC (rev 3424)
+++
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -22,10 +22,8 @@
package org.teiid.query.tempdata;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
@@ -37,10 +35,6 @@
import org.teiid.api.exception.query.QueryProcessingException;
import org.teiid.api.exception.query.QueryResolverException;
import org.teiid.api.exception.query.QueryValidatorException;
-import org.teiid.cache.Cache;
-import org.teiid.cache.CacheConfiguration;
-import org.teiid.cache.CacheFactory;
-import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBuffer;
@@ -49,7 +43,6 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
-import org.teiid.core.util.HashCodeUtil;
import org.teiid.core.util.StringUtil;
import org.teiid.dqp.internal.process.CachedResults;
import org.teiid.dqp.internal.process.SessionAwareCache;
@@ -62,7 +55,6 @@
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.query.QueryPlugin;
import org.teiid.query.eval.Evaluator;
-import org.teiid.query.mapping.relational.QueryNode;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.metadata.TempMetadataID;
import org.teiid.query.optimizer.relational.RelationalPlanner;
@@ -92,10 +84,8 @@
import org.teiid.query.sql.symbol.Expression;
import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.sql.visitor.ExpressionMappingVisitor;
-import org.teiid.query.tempdata.TempTableStore.MatState;
-import org.teiid.query.tempdata.TempTableStore.MatTableInfo;
+import org.teiid.query.tempdata.GlobalTableStoreImpl.MatTableInfo;
import org.teiid.query.util.CommandContext;
-import org.teiid.vdb.runtime.VDBKey;
/**
* This proxy ProcessorDataManager is used to handle temporary tables.
@@ -107,57 +97,21 @@
private static final String REFRESHMATVIEWROW = ".refreshmatviewrow";
//$NON-NLS-1$
private static final String REFRESHMATVIEW = ".refreshmatview"; //$NON-NLS-1$
- private static final String CODE_PREFIX = "#CODE_"; //$NON-NLS-1$
+ public static final String CODE_PREFIX = "#CODE_"; //$NON-NLS-1$
private ProcessorDataManager processorDataManager;
private BufferManager bufferManager;
private SessionAwareCache<CachedResults> cache;
private Executor executor;
- private static class MatTableKey implements Serializable {
- private static final long serialVersionUID = 5481692896572663992L;
- String name;
- VDBKey vdb;
-
- @Override
- public int hashCode() {
- return HashCodeUtil.hashCode(name.hashCode(), vdb);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (!(obj instanceof MatTableKey)) {
- return false;
- }
- MatTableKey other = (MatTableKey)obj;
- return this.name.equals(other.name) && this.vdb.equals(other.vdb);
- }
- }
-
- private static class MatTableEntry implements Serializable {
- private static final long serialVersionUID = 8559613701442751579L;
- long lastUpdate = System.currentTimeMillis();
- boolean valid;
- }
-
- private Cache<MatTableKey, MatTableEntry> tables;
- private SessionAwareCache<CachedResults> distributedCache;
private EventDistributor eventDistributor;
public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager
bufferManager,
- Executor executor, SessionAwareCache<CachedResults> cache,
SessionAwareCache<CachedResults> distibutedCache, CacheFactory cacheFactory){
+ Executor executor, SessionAwareCache<CachedResults> cache){
this.processorDataManager = processorDataManager;
this.bufferManager = bufferManager;
this.executor = executor;
this.cache = cache;
- this.distributedCache = distibutedCache;
- if (distibutedCache != null) {
- CacheConfiguration cc = new CacheConfiguration(Policy.LRU, -1, -1,
"MaterializationUpdates"); //$NON-NLS-1$
- tables = cacheFactory.get(cc.getLocation(), cc);
- }
}
public void setEventDistributor(EventDistributor eventDistributor) {
@@ -316,37 +270,21 @@
QueryValidatorException, TeiidProcessingException,
ExpressionEvaluationException {
QueryMetadataInterface metadata = context.getMetadata();
- TempTableStore globalStore = context.getGlobalTableStore();
+ GlobalTableStore globalStore = context.getGlobalTableStore();
if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(), REFRESHMATVIEW)) {
Object groupID = validateMatView(metadata,
(String)((Constant)proc.getParameter(1).getExpression()).getValue());
- Object matTableId =
context.getGlobalTableStore().getGlobalTempTableMetadataId(groupID, metadata);
+ Object matTableId = globalStore.getGlobalTempTableMetadataId(groupID);
String matViewName = metadata.getFullName(groupID);
String matTableName = metadata.getFullName(matTableId);
LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview
for", matViewName); //$NON-NLS-1$
- MatTableInfo info = globalStore.getMatTableInfo(matTableName);
- Long loadTime = null;
- boolean useCache = false;
- if (this.distributedCache != null) {
- MatTableKey key = new MatTableKey();
- key.name = matTableName;
- key.vdb = new VDBKey(context.getVdbName(),context.getVdbVersion());
- MatTableEntry entry = this.tables.get(key);
- useCache = (entry != null && entry.valid && entry.lastUpdate >
info.getUpdateTime());
- if (useCache) {
- loadTime = entry.lastUpdate;
- }
- }
boolean invalidate =
Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
- if (invalidate) {
- touchTable(context, matTableName, false, System.currentTimeMillis());
- }
- MatState oldState = info.setState(MatState.NEEDS_LOADING,
invalidate?Boolean.FALSE:null, null);
- if (oldState == MatState.LOADING) {
+ boolean needsLoading = globalStore.needsLoading(matTableName,
globalStore.getLocalAddress(), true, true, invalidate);
+ if (!needsLoading) {
return CollectionTupleSource.createUpdateCountTupleSource(-1);
}
GroupSymbol matTable = new GroupSymbol(matTableName);
matTable.setMetadataID(matTableId);
- int rowCount = loadGlobalTable(context, matTable, matTableName, matViewName,
globalStore, info, invalidate?null:loadTime, !invalidate && useCache);
+ int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore);
return CollectionTupleSource.createUpdateCountTupleSource(rowCount);
} else if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(),
REFRESHMATVIEWROW)) {
Object groupID = validateMatView(metadata,
(String)((Constant)proc.getParameter(1).getExpression()).getValue());
@@ -364,7 +302,7 @@
if (!info.isValid()) {
return CollectionTupleSource.createUpdateCountTupleSource(-1);
}
- TempTable tempTable = globalStore.getOrCreateTempTable(matTableName, new Query(),
bufferManager, false);
+ TempTable tempTable = globalStore.getTempTableStore().getTempTable(matTableName);
if (!tempTable.isUpdatable()) {
throw new
QueryProcessingException(QueryPlugin.Util.getString("TempTableDataManager.row_refresh_updatable",
matViewName)); //$NON-NLS-1$
}
@@ -381,10 +319,11 @@
if (tuple == null) {
delete = true;
tuple = Arrays.asList(key.getValue());
+ } else {
+ tuple = new ArrayList<Object>(tuple); //ensure the list is serializable
}
- List<?> result = updateMatViewRow(globalStore, matTableName, tuple, delete);
- if (result != null && eventDistributor != null) {
- result = new ArrayList<Object>(result); //ensure the list is serializable
+ List<?> result = globalStore.updateMatViewRow(matTableName, tuple, delete);
+ if (eventDistributor != null) {
this.eventDistributor.updateMatViewRow(context.getVdbName(), context.getVdbVersion(),
metadata.getName(metadata.getModelID(groupID)), metadata.getName(groupID), tuple,
delete);
}
return CollectionTupleSource.createUpdateCountTupleSource(result != null ? 1 : 0);
@@ -392,36 +331,6 @@
return null;
}
- public List<?> updateMatViewRow(TempTableStore globalStore,
- String matTableName, List<?> tuple, boolean delete)
- throws QueryProcessingException, TeiidComponentException {
- TempTable tempTable = globalStore.getOrCreateTempTable(matTableName, new Query(),
bufferManager, false);
- return tempTable.updateTuple(tuple, delete);
- }
-
- public void refreshMatView(String vdbName, int vdbVersion, String viewName,
- QueryMetadataInterface metadata, TempTableStore globalStore)
- throws QueryProcessingException, TeiidComponentException, TeiidProcessingException {
-
- Object groupID = validateMatView(metadata, viewName);
- Object matTableId = globalStore.getGlobalTempTableMetadataId(groupID, metadata);
- String matViewName = metadata.getFullName(groupID);
- String matTableName = metadata.getFullName(matTableId);
- LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview
for", matViewName); //$NON-NLS-1$
- MatTableInfo info = globalStore.getMatTableInfo(matTableName);
-
- MatState oldState = info.setState(MatState.NEEDS_LOADING, Boolean.FALSE, null);
- if (oldState == MatState.LOADING) {
- return;
- }
- GroupSymbol matTable = new GroupSymbol(matTableName);
- matTable.setMetadataID(matTableId);
- CommandContext context = new CommandContext(new Object(), "internal",
"internal", vdbName, vdbVersion); //$NON-NLS-1$ //$NON-NLS-2$
- context.setMetadata(metadata);
- context.setGlobalTableStore(globalStore);
- loadGlobalTable(context, matTable, matTableName, matViewName, globalStore, info, null,
true);
- }
-
private Object validateMatView(QueryMetadataInterface metadata, String viewName) throws
TeiidComponentException,
TeiidProcessingException {
try {
@@ -444,44 +353,38 @@
if (!group.isTempGroupSymbol()) {
return null;
}
- String viewName = null;
final String tableName = group.getNonCorrelationName().toUpperCase();
boolean remapColumns = !tableName.equalsIgnoreCase(group.getName());
- TempMetadataID groupID = (TempMetadataID)group.getMetadataID();
- if (groupID.getOriginalMetadataID() != null) {
- viewName = context.getMetadata().getFullName(groupID.getOriginalMetadataID());
- }
TempTable table = null;
if (group.isGlobalTable()) {
- final TempTableStore globalStore = context.getGlobalTableStore();
+ final GlobalTableStore globalStore = context.getGlobalTableStore();
final MatTableInfo info = globalStore.getMatTableInfo(tableName);
- Long loadTime = null;
- if (this.distributedCache != null) {
- MatTableKey key = new MatTableKey();
- key.name = tableName;
- key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
-
- MatTableEntry entry = this.tables.get(key);
- boolean notValid = !info.isValid();
- if (entry != null && entry.lastUpdate > info.getUpdateTime()
- && info.getState() != MatState.LOADING
- //TODO: use extension metadata or a config parameter to make this skew
configurable
- && !(!notValid && entry.valid && info.getState() ==
MatState.LOADED && entry.lastUpdate < info.getUpdateTime() + 30000)) {
- //trigger a remote load due to the cache being more up to date than the local copy
- info.setState(MatState.NEEDS_LOADING, notValid?false:entry.valid, null);
- loadTime = entry.lastUpdate;
+ boolean load = false;
+ while (!info.isUpToDate()) {
+ load = globalStore.needsLoading(tableName, globalStore.getLocalAddress(), true,
false, false);
+ if (load) {
+ load = globalStore.needsLoading(tableName, globalStore.getLocalAddress(), false,
false, false);
+ if (load) {
+ break;
+ }
}
+ synchronized (info) {
+ try {
+ info.wait(30000);
+ } catch (InterruptedException e) {
+ throw new TeiidComponentException(e);
+ }
+ }
}
- boolean load = info.shouldLoad();
if (load) {
if (!info.isValid()) {
//blocking load
- loadGlobalTable(context, group, tableName, viewName, globalStore, info, loadTime,
true);
+ loadGlobalTable(context, group, tableName, globalStore);
} else {
- loadAsynch(context, group, tableName, viewName, globalStore, info, loadTime);
+ loadAsynch(context, group, tableName, globalStore);
}
}
- table = globalStore.getOrCreateTempTable(tableName, query, bufferManager, false);
+ table = globalStore.getTempTableStore().getOrCreateTempTable(tableName, query,
bufferManager, false);
context.accessedDataObject(group.getMetadataID());
} else {
table = contextStore.getOrCreateTempTable(tableName, query, bufferManager, true);
@@ -511,13 +414,11 @@
}
private void loadAsynch(final CommandContext context,
- final GroupSymbol group, final String tableName, final String viewName,
- final TempTableStore globalStore, final MatTableInfo info,
- final Long loadTime) {
+ final GroupSymbol group, final String tableName, final GlobalTableStore globalStore)
{
Callable<Integer> toCall = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
- return loadGlobalTable(context, group, tableName, viewName, globalStore, info,
loadTime, true);
+ return loadGlobalTable(context, group, tableName, globalStore);
}
};
FutureTask<Integer> task = new FutureTask<Integer>(toCall);
@@ -525,89 +426,39 @@
}
private int loadGlobalTable(CommandContext context,
- GroupSymbol group, final String tableName, final String viewName,
- TempTableStore globalStore, MatTableInfo info, Long loadTime, boolean useCache)
+ GroupSymbol group, final String tableName, GlobalTableStore globalStore)
throws TeiidComponentException, TeiidProcessingException {
LogManager.logInfo(LogConstants.CTX_MATVIEWS,
QueryPlugin.Util.getString("TempTableDataManager.loading", tableName));
//$NON-NLS-1$
QueryMetadataInterface metadata = context.getMetadata();
- Create create = new Create();
- create.setTable(group);
List<ElementSymbol> allColumns = ResolverUtil.resolveElementsInGroup(group,
metadata);
- create.setElementSymbolsAsColumns(allColumns);
- Object pk = metadata.getPrimaryKey(group.getMetadataID());
- if (pk != null) {
- List<ElementSymbol> pkColumns = resolveIndex(metadata, allColumns, pk);
- create.getPrimaryKey().addAll(pkColumns);
- }
- TempTable table = globalStore.addTempTable(tableName, create, bufferManager, false);
+ TempTable table = globalStore.createMatTable(tableName, group);
table.setUpdatable(false);
- CacheHint hint = table.getCacheHint();
- boolean updatable = false;
- if (hint != null) {
- table.setPreferMemory(hint.getPrefersMemory());
- if (hint.getTtl() != null) {
- info.setTtl(hint.getTtl());
- }
- if (pk != null) {
- updatable = hint.isUpdatable();
- }
- }
int rowCount = -1;
- boolean viewFetched = false;
try {
String fullName = metadata.getFullName(group.getMetadataID());
- TupleSource ts = null;
- CacheID cid = null;
- if (distributedCache != null) {
- cid = new CacheID(new ParseInfo(), fullName, context.getVdbName(),
- context.getVdbVersion(), context.getConnectionID(), context.getUserName());
- if (useCache) {
- CachedResults cr = this.distributedCache.get(cid);
- if (cr != null) {
- ts = cr.getResults().createIndexedTupleSource();
- LogManager.logInfo(LogConstants.CTX_MATVIEWS,
QueryPlugin.Util.getString("TempTableDataManager.cache_load", tableName));
//$NON-NLS-1$
- }
- }
- }
+ String transformation = metadata.getVirtualPlan(group.getMetadataID()).getQuery();
+ QueryProcessor qp =
context.getQueryProcessorFactory().createQueryProcessor(transformation, fullName,
context);
+ qp.setNonBlocking(true);
+ qp.getContext().setDataObjects(null);
+ TupleSource ts = new BatchCollector.BatchProducerTupleSource(qp);
- List<ElementSymbol> variables = table.getColumns();
-
- if (ts == null) {
- variables = allColumns;
- //TODO: coordinate a distributed load
- //TODO: order by primary key nulls first - then have an insert ordered optimization
- String transformation = metadata.getVirtualPlan(group.getMetadataID()).getQuery();
- QueryProcessor qp =
context.getQueryProcessorFactory().createQueryProcessor(transformation, fullName,
context);
- qp.setNonBlocking(true);
- qp.getContext().setDataObjects(null);
- if (distributedCache != null) {
- CachedResults cr = new CachedResults();
- BatchCollector bc = qp.createBatchCollector();
- TupleBuffer tb = bc.collectTuples();
- cr.setResults(tb, qp.getProcessorPlan());
- touchTable(context, fullName, true, info.getUpdateTime());
- this.distributedCache.put(cid, Determinism.VDB_DETERMINISTIC, cr, info.getTtl());
- ts = tb.createIndexedTupleSource();
- viewFetched = true;
- } else {
- ts = new BatchCollector.BatchProducerTupleSource(qp);
- }
- }
-
//TODO: if this insert fails, it's unnecessary to do the undo processing
- table.insert(ts, variables);
+ table.insert(ts, allColumns);
table.getTree().compact();
rowCount = table.getRowCount();
//TODO: could pre-process indexes to remove overlap
for (Object index : metadata.getIndexesInGroup(group.getMetadataID())) {
- List<ElementSymbol> columns = resolveIndex(metadata, allColumns, index);
+ List<ElementSymbol> columns = GlobalTableStoreImpl.resolveIndex(metadata,
allColumns, index);
table.addIndex(columns, false);
}
for (Object key : metadata.getUniqueKeysInGroup(group.getMetadataID())) {
- List<ElementSymbol> columns = resolveIndex(metadata, allColumns, key);
+ List<ElementSymbol> columns = GlobalTableStoreImpl.resolveIndex(metadata,
allColumns, key);
table.addIndex(columns, true);
}
- table.setUpdatable(updatable);
+ CacheHint hint = table.getCacheHint();
+ if (hint != null && table.getPkLength() > 0) {
+ table.setUpdatable(hint.isUpdatable());
+ }
} catch (TeiidComponentException e) {
LogManager.logError(LogConstants.CTX_MATVIEWS, e,
QueryPlugin.Util.getString("TempTableDataManager.failed_load", tableName));
//$NON-NLS-1$
throw e;
@@ -616,66 +467,30 @@
throw e;
} finally {
if (rowCount == -1) {
- info.setState(MatState.FAILED_LOAD, null, null);
+ globalStore.failedLoad(tableName);
} else {
- globalStore.swapTempTable(tableName, table);
- info.setState(MatState.LOADED, true, loadTime);
- if (viewFetched & viewName != null && this.eventDistributor != null) {
- this.eventDistributor.refreshMatView(context.getVdbName(), context.getVdbVersion(),
viewName);
- }
+ globalStore.loaded(tableName, table);
LogManager.logInfo(LogConstants.CTX_MATVIEWS,
QueryPlugin.Util.getString("TempTableDataManager.loaded", tableName, rowCount));
//$NON-NLS-1$
}
}
return rowCount;
}
- private void touchTable(CommandContext context, String fullName, boolean valid, long
loadtime) {
- MatTableKey key = new MatTableKey();
- key.name = fullName;
- key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
- MatTableEntry matTableEntry = new MatTableEntry();
- matTableEntry.valid = valid;
- matTableEntry.lastUpdate = loadtime;
- tables.put(key, matTableEntry, null);
- }
-
- /**
- * Return a list of ElementSymbols for the given index/key object
- */
- private List<ElementSymbol> resolveIndex(QueryMetadataInterface metadata,
- List<ElementSymbol> allColumns, Object pk)
- throws TeiidComponentException, QueryMetadataException {
- Collection<?> pkIds = metadata.getElementIDsInKey(pk);
- List<ElementSymbol> pkColumns = new
ArrayList<ElementSymbol>(pkIds.size());
- for (Object col : pkIds) {
- pkColumns.add(allColumns.get(metadata.getPosition(col)-1));
- }
- return pkColumns;
- }
-
public Object lookupCodeValue(CommandContext context, String codeTableName,
String returnElementName, String keyElementName, Object keyValue)
throws BlockedException, TeiidComponentException,
TeiidProcessingException {
String matTableName = CODE_PREFIX + (codeTableName + ElementSymbol.SEPARATOR +
keyElementName + ElementSymbol.SEPARATOR + returnElementName).toUpperCase();
+ QueryMetadataInterface metadata = context.getMetadata();
+ TempMetadataID id =
context.getGlobalTableStore().getCodeTableMetadataId(codeTableName,
+ returnElementName, keyElementName, matTableName);
+
ElementSymbol keyElement = new ElementSymbol(matTableName + ElementSymbol.SEPARATOR
+ keyElementName);
ElementSymbol returnElement = new ElementSymbol(matTableName +
ElementSymbol.SEPARATOR + returnElementName);
-
- QueryMetadataInterface metadata = context.getMetadata();
-
keyElement.setType(DataTypeManager.getDataTypeClass(metadata.getElementType(metadata.getElementID(codeTableName
+ ElementSymbol.SEPARATOR + keyElementName))));
returnElement.setType(DataTypeManager.getDataTypeClass(metadata.getElementType(metadata.getElementID(codeTableName
+ ElementSymbol.SEPARATOR + returnElementName))));
- TempMetadataID id =
context.getGlobalTableStore().getMetadataStore().getTempGroupID(matTableName);
- if (id == null) {
- id = context.getGlobalTableStore().getMetadataStore().addTempGroup(matTableName,
Arrays.asList(keyElement, returnElement), false, true);
- String queryString = Reserved.SELECT + ' ' + keyElementName + "
," + returnElementName + ' ' + Reserved.FROM + ' ' + codeTableName;
//$NON-NLS-1$
- id.setQueryNode(new QueryNode(queryString));
- id.setPrimaryKey(id.getElements().subList(0, 1));
- CacheHint hint = new CacheHint(true, null);
- id.setCacheHint(hint);
- }
Query query = RelationalPlanner.createMatViewQuery(id, matTableName,
Arrays.asList(returnElement), true);
query.setCriteria(new CompareCriteria(keyElement, CompareCriteria.EQ, new
Constant(keyValue)));
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -23,31 +23,20 @@
package org.teiid.query.tempdata;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.teiid.api.exception.query.QueryMetadataException;
import org.teiid.api.exception.query.QueryProcessingException;
-import org.teiid.api.exception.query.QueryResolverException;
-import org.teiid.api.exception.query.QueryValidatorException;
import org.teiid.common.buffer.BufferManager;
-import org.teiid.core.TeiidComponentException;
-import org.teiid.language.SQLConstants;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.query.QueryPlugin;
-import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.metadata.TempMetadataID;
import org.teiid.query.metadata.TempMetadataStore;
-import org.teiid.query.optimizer.relational.RelationalPlanner;
-import org.teiid.query.resolver.QueryResolver;
import org.teiid.query.resolver.command.TempTableResolver;
-import org.teiid.query.resolver.util.ResolverUtil;
-import org.teiid.query.sql.lang.CacheHint;
import org.teiid.query.sql.lang.Command;
import org.teiid.query.sql.lang.Create;
import org.teiid.query.sql.lang.Insert;
@@ -56,91 +45,8 @@
public class TempTableStore {
- public enum MatState {
- NEEDS_LOADING,
- LOADING,
- FAILED_LOAD,
- LOADED
- }
-
- public static class MatTableInfo {
- private long updateTime = -1;
- private MatState state = MatState.NEEDS_LOADING;
- private long ttl = -1;
- private boolean valid;
-
- synchronized boolean shouldLoad() throws TeiidComponentException {
- for (;;) {
- switch (state) {
- case NEEDS_LOADING:
- case FAILED_LOAD:
- setState(MatState.LOADING);
- return true;
- case LOADING:
- if (valid) {
- return false;
- }
- try {
- wait();
- } catch (InterruptedException e) {
- throw new TeiidComponentException(e);
- }
- continue;
- case LOADED:
- if (ttl >= 0 && System.currentTimeMillis() - updateTime - ttl > 0) {
- setState(MatState.LOADING);
- return true;
- }
- return false;
- }
- }
- }
-
- public synchronized MatState setState(MatState state, Boolean valid, Long timestamp) {
- MatState oldState = this.state;
- LogManager.logDetail(LogConstants.CTX_MATVIEWS, this, "setting matState to",
state, valid, timestamp, "old values", oldState, this.valid); //$NON-NLS-1$
//$NON-NLS-2$
- if (valid != null) {
- this.valid = valid;
- }
- setState(state);
- if (timestamp != null) {
- this.updateTime = timestamp;
- }
- notifyAll();
- return oldState;
- }
-
- private void setState(MatState state) {
- this.state = state;
- this.updateTime = System.currentTimeMillis();
- }
-
- public synchronized void setTtl(long ttl) {
- this.ttl = ttl;
- }
-
- public synchronized long getUpdateTime() {
- return updateTime;
- }
-
- public synchronized MatState getState() {
- return state;
- }
-
- public synchronized boolean isValid() {
- return valid;
- }
-
- public synchronized long getTtl() {
- return ttl;
- }
-
- }
-
- private ConcurrentHashMap<String, MatTableInfo> matTables = new
ConcurrentHashMap<String, MatTableInfo>();
-
- private TempMetadataStore tempMetadataStore = new TempMetadataStore(new
ConcurrentHashMap<String, TempMetadataID>());
- private Map<String, TempTable> groupToTupleSourceID = new
ConcurrentHashMap<String, TempTable>();
+ TempMetadataStore tempMetadataStore = new TempMetadataStore(new
ConcurrentHashMap<String, TempMetadataID>());
+ private Map<String, TempTable> tempTables = new ConcurrentHashMap<String,
TempTable>();
private String sessionID;
private TempTableStore parentTempTableStore;
@@ -148,21 +54,12 @@
this.sessionID = sessionID;
}
- public MatTableInfo getMatTableInfo(final String tableName) {
- MatTableInfo newInfo = new MatTableInfo();
- MatTableInfo info = matTables.putIfAbsent(tableName, newInfo);
- if (info == null) {
- info = newInfo;
- }
- return info;
- }
-
public void setParentTempTableStore(TempTableStore parentTempTableStore) {
this.parentTempTableStore = parentTempTableStore;
}
public boolean hasTempTable(String tempTableName) {
- return groupToTupleSourceID.containsKey(tempTableName);
+ return tempTables.containsKey(tempTableName);
}
TempTable addTempTable(String tempTableName, Create create, BufferManager buffer,
boolean add) {
@@ -182,18 +79,18 @@
}
TempTable tempTable = new TempTable(id, buffer, columns,
create.getPrimaryKey().size(), sessionID);
if (add) {
- groupToTupleSourceID.put(tempTableName, tempTable);
+ tempTables.put(tempTableName, tempTable);
}
return tempTable;
}
void swapTempTable(String tempTableName, TempTable tempTable) {
- groupToTupleSourceID.put(tempTableName, tempTable);
+ tempTables.put(tempTableName, tempTable);
}
public void removeTempTableByName(String tempTableName) {
tempMetadataStore.removeTempGroup(tempTableName);
- TempTable table = this.groupToTupleSourceID.remove(tempTableName);
+ TempTable table = this.tempTables.remove(tempTableName);
if(table != null) {
table.remove();
}
@@ -204,18 +101,22 @@
}
public void removeTempTables() {
- for (String name : groupToTupleSourceID.keySet()) {
+ for (String name : tempTables.keySet()) {
removeTempTableByName(name);
}
}
public void setUpdatable(String name, boolean updatable) {
- TempTable table = groupToTupleSourceID.get(name);
+ TempTable table = tempTables.get(name);
if (table != null) {
table.setUpdatable(updatable);
}
}
+ TempTable getTempTable(String tempTableID) {
+ return this.tempTables.get(tempTableID);
+ }
+
TempTable getOrCreateTempTable(String tempTableID, Command command, BufferManager
buffer, boolean delegate) throws QueryProcessingException{
TempTable tempTable = getTempTable(tempTableID, command, buffer, delegate);
if (tempTable != null) {
@@ -243,7 +144,7 @@
private TempTable getTempTable(String tempTableID, Command command,
BufferManager buffer, boolean delegate)
throws QueryProcessingException {
- TempTable tsID = groupToTupleSourceID.get(tempTableID);
+ TempTable tsID = tempTables.get(tempTableID);
if(tsID != null) {
return tsID;
}
@@ -254,66 +155,11 @@
}
public Set<String> getAllTempTables() {
- return new HashSet<String>(this.groupToTupleSourceID.keySet());
+ return new HashSet<String>(this.tempTables.keySet());
}
-
- public TempMetadataID getGlobalTempTableMetadataId(Object viewId, QueryMetadataInterface
metadata)
- throws QueryMetadataException, TeiidComponentException, QueryResolverException,
QueryValidatorException {
- String matViewName = metadata.getFullName(viewId);
- String matTableName = RelationalPlanner.MAT_PREFIX+matViewName.toUpperCase();
- GroupSymbol group = new GroupSymbol(matViewName);
- group.setMetadataID(viewId);
- TempMetadataID id = tempMetadataStore.getTempGroupID(matTableName);
- //define the table preserving the key/index information and ensure that only a single
instance exists
- if (id == null) {
- synchronized (viewId) {
- id = tempMetadataStore.getTempGroupID(matTableName);
- if (id == null) {
- id = tempMetadataStore.addTempGroup(matTableName,
ResolverUtil.resolveElementsInGroup(group, metadata), false, true);
- id.setQueryNode(metadata.getVirtualPlan(viewId));
- id.setCardinality(metadata.getCardinality(viewId));
- id.setOriginalMetadataID(viewId);
-
- Object pk = metadata.getPrimaryKey(viewId);
- if (pk != null) {
- ArrayList<TempMetadataID> primaryKey = resolveIndex(metadata, id, pk);
- id.setPrimaryKey(primaryKey);
- }
- Collection keys = metadata.getUniqueKeysInGroup(viewId);
- for (Object key : keys) {
- id.addUniqueKey(resolveIndex(metadata, id, key));
- }
- Collection indexes = metadata.getIndexesInGroup(viewId);
- for (Object index : indexes) {
- id.addIndex(resolveIndex(metadata, id, index));
- }
- }
- }
- }
- updateCacheHint(viewId, metadata, group, id);
- return id;
+
+ Map<String, TempTable> getTempTables() {
+ return tempTables;
}
-
- private void updateCacheHint(Object viewId,
- QueryMetadataInterface metadata, GroupSymbol group,
- TempMetadataID id) throws TeiidComponentException,
- QueryMetadataException, QueryResolverException,
- QueryValidatorException {
- Command c = QueryResolver.resolveView(group, metadata.getVirtualPlan(viewId),
SQLConstants.Reserved.SELECT, metadata).getCommand();
- CacheHint hint = c.getCacheHint();
- id.setCacheHint(hint);
- }
-
- static ArrayList<TempMetadataID> resolveIndex(
- QueryMetadataInterface metadata, TempMetadataID id, Object pk)
- throws TeiidComponentException, QueryMetadataException {
- List cols = metadata.getElementIDsInKey(pk);
- ArrayList<TempMetadataID> primaryKey = new
ArrayList<TempMetadataID>(cols.size());
- for (Object coldId : cols) {
- int pos = metadata.getPosition(coldId) - 1;
- primaryKey.add(id.getElements().get(pos));
- }
- return primaryKey;
- }
}
Modified: trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -58,6 +58,7 @@
import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.sql.symbol.Expression;
import org.teiid.query.sql.util.VariableContext;
+import org.teiid.query.tempdata.GlobalTableStore;
import org.teiid.query.tempdata.TempTableStore;
/**
@@ -116,7 +117,7 @@
private BufferManager bufferManager;
- private TempTableStore globalTables;
+ private GlobalTableStore globalTables;
private SessionAwareCache<PreparedPlan> planCache;
@@ -484,11 +485,11 @@
globalState.bufferManager = bm;
}
- public TempTableStore getGlobalTableStore() {
+ public GlobalTableStore getGlobalTableStore() {
return globalState.globalTables;
}
- public void setGlobalTableStore(TempTableStore tempTableStore) {
+ public void setGlobalTableStore(GlobalTableStore tempTableStore) {
globalState.globalTables = tempTableStore;
}
Modified:
trunk/engine/src/test/java/org/teiid/query/optimizer/relational/TestMaterialization.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/optimizer/relational/TestMaterialization.java 2011-08-25
01:22:57 UTC (rev 3424)
+++
trunk/engine/src/test/java/org/teiid/query/optimizer/relational/TestMaterialization.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -37,7 +37,7 @@
import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.processor.relational.RelationalPlan;
import org.teiid.query.sql.lang.Command;
-import org.teiid.query.tempdata.TempTableStore;
+import org.teiid.query.tempdata.GlobalTableStoreImpl;
import org.teiid.query.unittest.RealMetadataFactory;
import org.teiid.query.util.CommandContext;
@@ -151,7 +151,8 @@
Command command = helpGetCommand(userSql, metadata, null);
CommandContext cc = new CommandContext();
- cc.setGlobalTableStore(new TempTableStore("SYSTEM"));
+ GlobalTableStoreImpl gts = new GlobalTableStoreImpl(null, metadata);
+ cc.setGlobalTableStore(gts);
ProcessorPlan plan = TestOptimizer.getPlan(command, metadata, getGenericFinder(),
analysis, true, cc);
TestOptimizer.checkAtomicQueries(new String[] {"SELECT
#MAT_MATVIEW.VGROUP2.x FROM #MAT_MATVIEW.VGROUP2"}, plan);
Collection<Annotation> annotations = analysis.getAnnotations();
@@ -168,7 +169,8 @@
Command command = helpGetCommand(userSql, metadata, null);
CommandContext cc = new CommandContext();
- cc.setGlobalTableStore(new TempTableStore("SYSTEM"));
+ GlobalTableStoreImpl gts = new GlobalTableStoreImpl(null, metadata);
+ cc.setGlobalTableStore(gts);
RelationalPlan plan = (RelationalPlan)TestOptimizer.getPlan(command, metadata,
getGenericFinder(), analysis, true, cc);
assertEquals(1f, plan.getRootNode().getEstimateNodeCardinality());
TestOptimizer.checkAtomicQueries(new String[] {"SELECT
#MAT_MATVIEW.VGROUP3.x, #MAT_MATVIEW.VGROUP3.y FROM #MAT_MATVIEW.VGROUP3 WHERE
#MAT_MATVIEW.VGROUP3.x = 'foo'"}, plan);
@@ -186,7 +188,8 @@
Command command = helpGetCommand(userSql, metadata, null);
CommandContext cc = new CommandContext();
- cc.setGlobalTableStore(new TempTableStore("SYSTEM"));
+ GlobalTableStoreImpl gts = new GlobalTableStoreImpl(null, metadata);
+ cc.setGlobalTableStore(gts);
ProcessorPlan plan = TestOptimizer.getPlan(command, metadata, getGenericFinder(),
analysis, true, cc);
TestOptimizer.checkAtomicQueries(new String[] {"SELECT
#MAT_MATVIEW.VGROUP4.x FROM #MAT_MATVIEW.VGROUP4"}, plan);
Collection<Annotation> annotations = analysis.getAnnotations();
Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java 2011-08-25
01:22:57 UTC (rev 3424)
+++
trunk/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -25,24 +25,29 @@
import static org.junit.Assert.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import org.junit.Before;
import org.junit.Test;
-import org.teiid.cache.DefaultCacheFactory;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.core.TeiidProcessingException;
import org.teiid.dqp.internal.process.CachedResults;
import org.teiid.dqp.internal.process.QueryProcessorFactoryImpl;
import org.teiid.dqp.internal.process.SessionAwareCache;
+import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.metadata.TempMetadataAdapter;
import org.teiid.query.optimizer.capabilities.CapabilitiesFinder;
import org.teiid.query.optimizer.capabilities.DefaultCapabilitiesFinder;
+import org.teiid.query.optimizer.relational.RelationalPlanner;
+import org.teiid.query.tempdata.GlobalTableStoreImpl;
import org.teiid.query.tempdata.TempTableDataManager;
import org.teiid.query.tempdata.TempTableStore;
+import org.teiid.query.tempdata.GlobalTableStoreImpl.MatTableInfo;
import org.teiid.query.unittest.RealMetadataFactory;
import org.teiid.query.util.CommandContext;
@@ -52,20 +57,21 @@
private TempMetadataAdapter metadata;
private TempTableDataManager dataManager;
private TempTableStore tempStore;
- private TempTableStore globalStore;
+ private GlobalTableStoreImpl globalStore;
private ProcessorPlan previousPlan;
private HardcodedDataManager hdm;
@Before public void setUp() {
tempStore = new TempTableStore("1"); //$NON-NLS-1$
- globalStore = new TempTableStore("SYSTEM");
- metadata = new TempMetadataAdapter(RealMetadataFactory.exampleMaterializedView(),
tempStore.getMetadataStore());
+ BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+ QueryMetadataInterface actualMetadata =
RealMetadataFactory.exampleMaterializedView();
+ globalStore = new GlobalTableStoreImpl(bm, actualMetadata);
+ metadata = new TempMetadataAdapter(actualMetadata, tempStore.getMetadataStore());
hdm = new HardcodedDataManager();
hdm.addData("SELECT matsrc.x FROM matsrc", new List[]
{Arrays.asList((String)null), Arrays.asList("one"),
Arrays.asList("two"), Arrays.asList("three")});
hdm.addData("SELECT mattable.info.e1, mattable.info.e2 FROM mattable.info",
new List[] {Arrays.asList("a", 1), Arrays.asList("a", 2)});
hdm.addData("SELECT mattable.info.e2, mattable.info.e1 FROM mattable.info",
new List[] {Arrays.asList(1, "a"), Arrays.asList(2, "a")});
- BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
SessionAwareCache<CachedResults> cache = new
SessionAwareCache<CachedResults>();
cache.setBufferManager(bm);
Executor executor = new Executor() {
@@ -74,7 +80,7 @@
command.run();
}
};
- dataManager = new TempTableDataManager(hdm, bm, executor, cache, cache, new
DefaultCacheFactory());
+ dataManager = new TempTableDataManager(hdm, bm, executor, cache);
}
private void execute(String sql, List<?>... expectedResults) throws Exception {
@@ -95,6 +101,25 @@
assertEquals(1, hdm.getCommandHistory().size());
}
+ @Test public void testReadWrite() throws Exception {
+ execute("SELECT * from vgroup3 where x = 'one'",
Arrays.asList("one", "zne"));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ String matTableName = RelationalPlanner.MAT_PREFIX + "MATVIEW.VGROUP3";
+ this.globalStore.getState(matTableName, baos);
+ MatTableInfo matTableInfo = this.globalStore.getMatTableInfo(matTableName);
+ long time = matTableInfo.getUpdateTime();
+ this.globalStore.failedLoad(matTableName);
+ this.globalStore.setState(matTableName, new ByteArrayInputStream(baos.toByteArray()));
+ assertEquals(time, matTableInfo.getUpdateTime());
+ execute("SELECT * from vgroup3 where x = 'one'",
Arrays.asList("one", "zne"));
+
+ execute("select lookup('mattable.info', 'e1', 'e2',
5)", Arrays.asList((String)null));
+ baos = new ByteArrayOutputStream();
+ String codeTableName = "#CODE_MATTABLE.INFO.E2.E1";
+ this.globalStore.getState(codeTableName, baos);
+ this.globalStore.setState(codeTableName, new
ByteArrayInputStream(baos.toByteArray()));
+ }
+
@Test(expected=TeiidProcessingException.class) public void
testCodeTableResponseException() throws Exception {
//duplicate key
execute("select lookup('mattable.info', 'e2', 'e1',
'a')");
Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -93,6 +93,7 @@
import org.teiid.query.sql.symbol.Reference;
import org.teiid.query.sql.util.VariableContext;
import org.teiid.query.sql.visitor.ReferenceCollectorVisitor;
+import org.teiid.query.tempdata.GlobalTableStoreImpl;
import org.teiid.query.tempdata.TempTableDataManager;
import org.teiid.query.tempdata.TempTableStore;
import org.teiid.query.unittest.RealMetadataFactory;
@@ -246,7 +247,8 @@
context.setTempTableStore(new TempTableStore(context.getConnectionID()));
}
if (context.getGlobalTableStore() == null) {
- context.setGlobalTableStore(new TempTableStore("SYSTEM"));
+ GlobalTableStoreImpl gts = new GlobalTableStoreImpl(bufferMgr,
context.getMetadata());
+ context.setGlobalTableStore(gts);
}
if (!(dataManager instanceof TempTableDataManager)) {
SessionAwareCache<CachedResults> cache = new
SessionAwareCache<CachedResults>();
@@ -257,7 +259,7 @@
command.run();
}
};
- dataManager = new TempTableDataManager(dataManager, bufferMgr, executor, cache,
null, null);
+ dataManager = new TempTableDataManager(dataManager, bufferMgr, executor,
cache);
}
if (context.getQueryProcessorFactory() == null) {
context.setQueryProcessorFactory(new QueryProcessorFactoryImpl(bufferMgr,
dataManager, new DefaultCapabilitiesFinder(), null, context.getMetadata()));
Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -81,7 +81,7 @@
command.run();
}
};
- dataManager = new TempTableDataManager(fdm, bm, executor, cache, null, null);
+ dataManager = new TempTableDataManager(fdm, bm, executor, cache);
}
@Test public void testInsertWithQueryExpression() throws Exception {
Modified:
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-08-25
01:22:57 UTC (rev 3424)
+++
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -60,10 +60,10 @@
import org.jboss.profileservice.spi.ProfileService;
import org.jboss.util.naming.Util;
import org.teiid.adminapi.Admin;
-import org.teiid.adminapi.Admin.Cache;
import org.teiid.adminapi.AdminComponentException;
import org.teiid.adminapi.AdminException;
import org.teiid.adminapi.AdminProcessingException;
+import org.teiid.adminapi.Admin.Cache;
import org.teiid.adminapi.impl.CacheStatisticsMetadata;
import org.teiid.adminapi.impl.DQPManagement;
import org.teiid.adminapi.impl.RequestMetadata;
@@ -83,13 +83,13 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.LRUCache;
+import org.teiid.deployers.CompositeVDB;
import org.teiid.deployers.ContainerLifeCycleListener;
import org.teiid.deployers.VDBLifeCycleListener;
import org.teiid.deployers.VDBRepository;
import org.teiid.deployers.VDBStatusChecker;
import org.teiid.dqp.internal.process.DQPConfiguration;
import org.teiid.dqp.internal.process.DQPCore;
-import org.teiid.dqp.internal.process.DQPCore.ContextProvider;
import org.teiid.dqp.internal.process.DQPWorkContext;
import org.teiid.dqp.internal.process.DataTierManagerImpl;
import org.teiid.dqp.internal.process.TransactionServerImpl;
@@ -110,11 +110,15 @@
import org.teiid.metadata.Procedure;
import org.teiid.metadata.Schema;
import org.teiid.metadata.Table;
+import org.teiid.metadata.TableStats;
import org.teiid.metadata.Table.TriggerEvent;
-import org.teiid.metadata.TableStats;
import org.teiid.net.TeiidURL;
+import org.teiid.query.ObjectReplicator;
import org.teiid.query.metadata.TransformationMetadata;
+import org.teiid.query.optimizer.relational.RelationalPlanner;
import org.teiid.query.processor.DdlPlan;
+import org.teiid.query.tempdata.GlobalTableStore;
+import org.teiid.query.tempdata.GlobalTableStoreImpl;
import org.teiid.security.SecurityHelper;
import org.teiid.transport.ClientServiceRegistry;
import org.teiid.transport.ClientServiceRegistryImpl;
@@ -148,12 +152,12 @@
private transient ProfileService profileService;
private transient String jndiName;
- private String eventDistributorName;
+ private transient ObjectReplicator objectReplicator;
+ private String objectReplicatorName;
private transient EventDistributor eventDistributor;
private transient EventDistributor eventDistributorProxy;
- private transient ContainerLifeCycleListener lifecycleListener;
- public RuntimeEngineDeployer() {
+ public RuntimeEngineDeployer() {
// TODO: this does not belong here
LogManager.setLogListener(new Log4jListener());
}
@@ -172,13 +176,18 @@
public void start() {
dqpCore.setTransactionService((TransactionService)LogManager.createLoggingProxy(LogConstants.CTX_TXN_LOG,
transactionServerImpl, new Class[] {TransactionService.class}, MessageLevel.DETAIL));
- if (this.eventDistributorName != null) {
+ if (this.objectReplicatorName != null) {
try {
InitialContext ic = new InitialContext();
- this.eventDistributor = (EventDistributor) ic.lookup(this.eventDistributorName);
+ this.objectReplicator = (ObjectReplicator) ic.lookup(this.objectReplicatorName);
+ try {
+ this.eventDistributor = this.objectReplicator.replicate(this.jndiName,
EventDistributor.class, this, 0);
+ } catch (Exception e) {
+ LogManager.logError(LogConstants.CTX_RUNTIME, e,
IntegrationPlugin.Util.getString("replication_failed", this)); //$NON-NLS-1$
+ }
} catch (NamingException ne) {
//log at a detail level since we may not be in the all profile
- LogManager.logDetail(LogConstants.CTX_RUNTIME, ne,
IntegrationPlugin.Util.getString("jndi_failed", new
Date(System.currentTimeMillis()).toString())); //$NON-NLS-1$
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, ne,
IntegrationPlugin.Util.getString("jndi_failed", this.objectReplicatorName));
//$NON-NLS-1$
}
}
this.dqpCore.setMetadataRepository(this.vdbRepository.getMetadataRepository());
@@ -260,7 +269,7 @@
Util.bind(ic, jndiName, this) ;
} catch (final NamingException ne) {
// Add jndi_failed to bundle
- LogManager.logError(LogConstants.CTX_RUNTIME, ne,
IntegrationPlugin.Util.getString("jndi_failed", new
Date(System.currentTimeMillis()).toString())); //$NON-NLS-1$
+ LogManager.logError(LogConstants.CTX_RUNTIME, ne,
IntegrationPlugin.Util.getString("jndi_failed", jndiName)); //$NON-NLS-1$
}
}
@@ -270,12 +279,25 @@
private Set<VDBKey> recentlyRemoved = Collections.newSetFromMap(new
LRUCache<VDBKey, Boolean>(10000));
@Override
- public void removed(String name, int version) {
+ public void removed(String name, int version, CompositeVDB vdb) {
recentlyRemoved.add(new VDBKey(name, version));
+ if (objectReplicator != null) {
+ GlobalTableStore gts = vdb.getVDB().getAttachment(GlobalTableStore.class);
+ objectReplicator.stop(gts);
+ }
}
@Override
- public void added(String name, int version) {
+ public void added(String name, int version, CompositeVDB vdb) {
+ GlobalTableStore gts = new GlobalTableStoreImpl(dqpCore.getBufferManager(),
vdb.getVDB().getAttachment(TransformationMetadata.class));
+ if (objectReplicator != null) {
+ try {
+ gts = objectReplicator.replicate(name + version, GlobalTableStore.class, gts,
300000);
+ } catch (Exception e) {
+ LogManager.logError(LogConstants.CTX_RUNTIME, e,
IntegrationPlugin.Util.getString("replication_failed", gts)); //$NON-NLS-1$
+ }
+ }
+ vdb.getVDB().addAttchment(GlobalTableStore.class, gts);
if (!recentlyRemoved.remove(new VDBKey(name, version))) {
return;
}
@@ -296,8 +318,6 @@
dqpCore.clearCache(Cache.QUERY_SERVICE_RESULT_SET_CACHE.toString(), name, version);
}
});
-
- synchronizeMaterializeViews();
}
public void stop() {
@@ -332,6 +352,10 @@
this.odbcSocket = null;
}
LogManager.logInfo(LogConstants.CTX_RUNTIME,
IntegrationPlugin.Util.getString("engine_stopped", new
Date(System.currentTimeMillis()).toString())); //$NON-NLS-1$
+
+ if (this.objectReplicator != null && this.eventDistributor != null) {
+ this.objectReplicator.stop(this.eventDistributor);
+ }
}
private void createClientServices() {
@@ -672,26 +696,31 @@
return newResults;
}
- public String getEventDistributorName() {
- return eventDistributorName;
+ public String getObjectReplicatorName() {
+ return objectReplicatorName;
}
- public void setEventDistributorName(String eventDistributorName) {
- this.eventDistributorName = eventDistributorName;
+ public void setObjectReplicatorName(String eventDistributorName) {
+ this.objectReplicatorName = eventDistributorName;
}
@Override
public void updateMatViewRow(String vdbName, int vdbVersion, String schema,
String viewName, List<?> tuple, boolean delete) {
- this.dqpCore.updateMatViewRow(getcontextProvider(), vdbName, vdbVersion, schema,
viewName, tuple, delete);
+ VDBMetaData metadata = this.vdbRepository.getVDB(vdbName, vdbVersion);
+ if (metadata != null) {
+ GlobalTableStore gts = metadata.getAttachment(GlobalTableStore.class);
+ if (gts != null) {
+ try {
+ gts.updateMatViewRow((RelationalPlanner.MAT_PREFIX + schema + '.' +
viewName).toUpperCase(), tuple, delete);
+ } catch (TeiidComponentException e) {
+ LogManager.logError(LogConstants.CTX_RUNTIME, e,
IntegrationPlugin.Util.getString("replication_failed",
"updateMatViewRow")); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ }
+ }
}
@Override
- public void refreshMatView(final String vdbName, final int vdbVersion, final String
viewName) {
- this.dqpCore.refreshMatView(getcontextProvider(), vdbName, vdbVersion, viewName);
- }
-
- @Override
public void dataModification(String vdbName, int vdbVersion, String schema,
String... tableNames) {
updateModified(true, vdbName, vdbVersion, schema, tableNames);
@@ -823,38 +852,9 @@
return this.eventDistributorProxy;
}
- private void synchronizeMaterializeViews() {
- this.lifecycleListener.addListener(new
ContainerLifeCycleListener.LifeCycleEventListener() {
- @Override
- public void onStartupFinish() {
- dqpCore.synchronizeInternalMaterializedViews(getcontextProvider());
- }
- @Override
- public void onShutdownStart() {
- }
- });
- }
-
- private ContextProvider getcontextProvider() {
- return new DQPCore.ContextProvider() {
- @Override
- public DQPWorkContext getContext(final String vdbName, final int vdbVersion) {
- return new DQPWorkContext() {
- public VDBMetaData getVDB() {
- return vdbRepository.getVDB(vdbName, vdbVersion);
- }
- public String getVdbName() {
- return vdbName;
- }
- public int getVdbVersion() {
- return vdbVersion;
- }
- };
- }
- };
- }
-
+ /**
+ * @param listener
+ */
public void setContainerLifeCycleListener(ContainerLifeCycleListener listener) {
- this.lifecycleListener = listener;
}
}
Modified: trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
===================================================================
--- trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2011-08-26
15:17:29 UTC (rev 3425)
@@ -42,7 +42,8 @@
class_not_found=Class {0} not found.
datasource_exists=Data source with name {0} already exists!
datasource_template_not_found=Template {0} for creating the data source is not found.
-jndi_failed=JNDI lookup failed.
+jndi_failed=JNDI lookup failed {0}.
+replication_failed=Could not replicate object {0}
distribute_failed=Deploy of the archive failed {0}
template_not_found=Template not found for {0}
admin_executing=JOPR admin {0} is executing command {1}
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-08-25 01:22:57 UTC (rev 3424)
+++ trunk/pom.xml 2011-08-26 15:17:29 UTC (rev 3425)
@@ -14,6 +14,7 @@
<scm>
<
connection>scm:svn:https://anonsvn.jboss.org/repos/teiid/trunk</con...
<
developerConnection>scm:svn:https://svn.jboss.org/repos/teiid/trunk<...
+ <
url>http://anonsvn.jboss.org/repos/teiid/trunk</url>
</scm>
<licenses>
<license>
@@ -108,6 +109,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
@@ -117,6 +119,7 @@
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
+ <version>2.9</version>
<configuration>
<includes>
<include>**/*TestCase.java</include>
@@ -161,7 +164,7 @@
<addDefaultImplementationEntries>
true</addDefaultImplementationEntries>
</manifest>
<manifestEntries>
- <Implementation-URL>${pom.url}</Implementation-URL>
+ <Implementation-URL>${project.url}</Implementation-URL>
</manifestEntries>
</archive>
</configuration>
@@ -169,6 +172,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
+ <version>2.1.2</version>
<executions>
<execution>
<id>attach-sources</id>
@@ -190,6 +194,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.8</version>
<configuration>
<aggregate>true</aggregate>
<maxmemory>512m</maxmemory>
Modified: trunk/runtime/src/main/java/org/teiid/deployers/CompositeVDB.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/deployers/CompositeVDB.java 2011-08-25 01:22:57
UTC (rev 3424)
+++ trunk/runtime/src/main/java/org/teiid/deployers/CompositeVDB.java 2011-08-26 15:17:29
UTC (rev 3425)
@@ -44,7 +44,6 @@
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.metadata.TransformationMetadata;
import org.teiid.query.metadata.TransformationMetadata.Resource;
-import org.teiid.query.tempdata.TempTableStore;
import org.teiid.vdb.runtime.VDBKey;
@@ -95,8 +94,6 @@
TransformationMetadata metadata = buildTransformationMetaData(mergedVDB,
getVisibilityMap(), getMetadataStores(), getUDF(), systemFunctions,
this.additionalStores);
mergedVDB.addAttchment(QueryMetadataInterface.class, metadata);
mergedVDB.addAttchment(TransformationMetadata.class, metadata);
- TempTableStore globalTables = new TempTableStore("SYSTEM"); //$NON-NLS-1$
- mergedVDB.addAttchment(TempTableStore.class, globalTables);
}
private static TransformationMetadata buildTransformationMetaData(VDBMetaData vdb,
LinkedHashMap<String, Resource> visibilityMap, MetadataStoreGroup stores,
UDFMetaData udf, FunctionTree systemFunctions, MetadataStore[] additionalStores) {
@@ -134,6 +131,13 @@
return this.mergedVDB;
}
+ public boolean hasChildVdb(VDBKey child) {
+ if (this.children != null) {
+ return this.children.containsKey(child);
+ }
+ return false;
+ }
+
private VDBMetaData buildVDB() {
VDBMetaData newMergedVDB = new VDBMetaData();
newMergedVDB.setName(this.vdb.getName());
Modified: trunk/runtime/src/main/java/org/teiid/deployers/VDBLifeCycleListener.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/deployers/VDBLifeCycleListener.java 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/runtime/src/main/java/org/teiid/deployers/VDBLifeCycleListener.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -22,6 +22,6 @@
package org.teiid.deployers;
public interface VDBLifeCycleListener {
- void added(String name, int version);
- void removed(String name, int version);
+ void added(String name, int version, CompositeVDB vdb);
+ void removed(String name, int version, CompositeVDB vdb);
}
Modified: trunk/runtime/src/main/java/org/teiid/deployers/VDBRepository.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/deployers/VDBRepository.java 2011-08-25 01:22:57
UTC (rev 3424)
+++ trunk/runtime/src/main/java/org/teiid/deployers/VDBRepository.java 2011-08-26 15:17:29
UTC (rev 3425)
@@ -101,7 +101,7 @@
cvdb = new CompositeVDB(vdb, stores, visibilityMap, udf,
this.systemFunctionManager.getSystemFunctions(), cmr, this.systemStore, odbcStore);
}
this.vdbRepo.put(vdbId(vdb), cvdb);
- notifyAdd(vdb.getName(), vdb.getVersion());
+ notifyAdd(vdb.getName(), vdb.getVersion(), cvdb);
}
private void updateFromMetadataRepository(CompositeVDB cvdb) {
@@ -270,9 +270,15 @@
if (removed != null) {
// if this VDB was part of another VDB; then remove them.
for (CompositeVDB other:this.vdbRepo.values()) {
- other.removeChild(key);
+ if (other.hasChildVdb(key)) {
+ notifyRemove(other.getVDB().getName(), other.getVDB().getVersion(), other);
+
+ other.removeChild(key);
+
+ notifyAdd(other.getVDB().getName(), other.getVDB().getVersion(), other);
+ }
}
- notifyRemove(key.getName(), key.getVersion());
+ notifyRemove(key.getName(), key.getVersion(), removed);
return true;
}
return false;
@@ -303,11 +309,11 @@
throw new
AdminProcessingException(RuntimePlugin.Util.getString("vdb_not_found",
sourceVDBName, sourceVDBVersion)); //$NON-NLS-1$
}
- notifyRemove(targetVDBName, targetVDBVersion);
+ notifyRemove(targetVDBName, targetVDBVersion, target);
// merge them
target.addChild(source);
- notifyAdd(targetVDBName, targetVDBVersion);
+ notifyAdd(targetVDBName, targetVDBVersion, target);
}
// this is called by mc
@@ -333,15 +339,15 @@
this.listeners.remove(listener);
}
- private void notifyAdd(String name, int version) {
+ private void notifyAdd(String name, int version, CompositeVDB vdb) {
for(VDBLifeCycleListener l:this.listeners) {
- l.added(name, version);
+ l.added(name, version, vdb);
}
}
- private void notifyRemove(String name, int version) {
+ private void notifyRemove(String name, int version, CompositeVDB vdb) {
for(VDBLifeCycleListener l:this.listeners) {
- l.removed(name, version);
+ l.removed(name, version, vdb);
}
}
Modified: trunk/runtime/src/test/java/org/teiid/deployers/TestCompositeVDB.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/deployers/TestCompositeVDB.java 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/runtime/src/test/java/org/teiid/deployers/TestCompositeVDB.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -149,8 +149,13 @@
@Test public void testRemoveChild() throws Exception {
CompositeVDB vdb = createCompositeVDB(RealMetadataFactory.exampleBQTStore(),
"bqt");
- vdb.removeChild(new VDBKey("foo", 1));
+ VDBKey child = new VDBKey("foo", 1);
+ vdb.removeChild(child);
assertNotNull(vdb.getVDB());
+ assertFalse(vdb.hasChildVdb(child));
+ vdb.addChild(createCompositeVDB(RealMetadataFactory.example1Store(),
"foo"));
+ assertTrue(vdb.hasChildVdb(child));
+ assertNotNull(vdb.getVDB());
}
}
Modified: trunk/test-integration/common/pom.xml
===================================================================
--- trunk/test-integration/common/pom.xml 2011-08-25 01:22:57 UTC (rev 3424)
+++ trunk/test-integration/common/pom.xml 2011-08-26 15:17:29 UTC (rev 3425)
@@ -20,6 +20,18 @@
<artifactId>h2</artifactId>
<version>1.2.147</version>
</dependency>
+ <dependency>
+ <artifactId>teiid-cache-jbosscache</artifactId>
+ <groupId>org.jboss.teiid</groupId>
+ </dependency>
+ <dependency>
+ <groupId>jgroups</groupId>
+ <artifactId>jgroups</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
Modified: trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-08-25
01:22:57 UTC (rev 3424)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -38,8 +38,10 @@
import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.client.DQP;
import org.teiid.client.security.ILogon;
+import org.teiid.deployers.CompositeVDB;
import org.teiid.deployers.MetadataStoreGroup;
import org.teiid.deployers.UDFMetaData;
+import org.teiid.deployers.VDBLifeCycleListener;
import org.teiid.deployers.VDBRepository;
import org.teiid.dqp.internal.datamgr.ConnectorManager;
import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
@@ -55,10 +57,14 @@
import org.teiid.metadata.index.VDBMetadataFactory;
import org.teiid.net.CommunicationException;
import org.teiid.net.ConnectionException;
+import org.teiid.query.ObjectReplicator;
import org.teiid.query.function.SystemFunctionManager;
+import org.teiid.query.metadata.TransformationMetadata;
import org.teiid.query.metadata.TransformationMetadata.Resource;
import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
import org.teiid.query.optimizer.capabilities.SourceCapabilities;
+import org.teiid.query.tempdata.GlobalTableStore;
+import org.teiid.query.tempdata.GlobalTableStoreImpl;
import org.teiid.services.SessionServiceImpl;
import org.teiid.transport.ClientServiceRegistry;
import org.teiid.transport.ClientServiceRegistryImpl;
@@ -74,14 +80,38 @@
VDBRepository repo = new VDBRepository();
private ConnectorManagerRepository cmr;
private boolean useCallingThread = true;
+ private ObjectReplicator replicator;
public FakeServer() {
this(new DQPConfiguration());
}
+ public void setReplicator(ObjectReplicator replicator) {
+ this.replicator = replicator;
+ }
+
public FakeServer(DQPConfiguration config) {
this.logon = new LogonImpl(sessionService, null);
-
+ this.repo.addListener(new VDBLifeCycleListener() {
+
+ @Override
+ public void removed(String name, int version, CompositeVDB vdb) {
+
+ }
+
+ @Override
+ public void added(String name, int version, CompositeVDB vdb) {
+ GlobalTableStore gts = new GlobalTableStoreImpl(dqp.getBufferManager(),
vdb.getVDB().getAttachment(TransformationMetadata.class));
+ if (replicator != null) {
+ try {
+ gts = replicator.replicate(name + version, GlobalTableStore.class, gts, 300000);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ vdb.getVDB().addAttchment(GlobalTableStore.class, gts);
+ }
+ });
this.repo.setSystemStore(VDBMetadataFactory.getSystem());
this.repo.setSystemFunctionManager(new SystemFunctionManager());
this.repo.odbcEnabled();
Added:
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
===================================================================
---
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
(rev 0)
+++
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java 2011-08-26
15:17:29 UTC (rev 3425)
@@ -0,0 +1,136 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.systemmodel;
+
+import static org.junit.Assert.*;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.jgroups.JChannelFactory;
+import org.junit.Test;
+import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.UnitTestUtil;
+import org.teiid.jdbc.FakeServer;
+import org.teiid.metadata.FunctionMethod;
+import org.teiid.metadata.FunctionParameter;
+import org.teiid.metadata.FunctionMethod.Determinism;
+import org.teiid.metadata.FunctionMethod.PushDown;
+import org.teiid.replication.jboss.JGroupsObjectReplicator;
+
+@SuppressWarnings("nls")
+public class TestMatViewReplication {
+
+ private static final String MATVIEWS = "matviews";
+ private static final boolean DEBUG = false;
+
+ @Test public void testReplication() throws Exception {
+ if (DEBUG) {
+ Logger logger = Logger.getLogger("org.teiid");
+ logger.setLevel(Level.FINEST);
+ for (Handler h : logger.getHandlers()) {
+ h.setLevel(Level.FINEST);
+ }
+ /*org.apache.log4j.Logger l = LogManager.getLogger("org.jgroups");
+ l.setLevel(org.apache.log4j.Level.TRACE);
+ ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
+ ca.setName("x");
+ l.addAppender(ca);*/
+ }
+
+ FakeServer server1 = createServer();
+
+ Connection c1 = server1.createConnection("jdbc:teiid:matviews");
+ Statement stmt = c1.createStatement();
+ stmt.execute("select * from TEST.RANDOMVIEW");
+ ResultSet rs = stmt.getResultSet();
+ assertTrue(rs.next());
+ double d1 = rs.getDouble(1);
+ double d2 = rs.getDouble(2);
+
+ FakeServer server2 = createServer();
+ Connection c2 = server2.createConnection("jdbc:teiid:matviews");
+ Statement stmt2 = c2.createStatement();
+ ResultSet rs2 = stmt2.executeQuery("select * from matviews where name =
'RandomView'");
+ assertTrue(rs2.next());
+ assertEquals("LOADED", rs2.getString("loadstate"));
+ assertEquals(true, rs2.getBoolean("valid"));
+ stmt2.execute("select * from TEST.RANDOMVIEW");
+ rs2 = stmt2.getResultSet();
+ assertTrue(rs2.next());
+ assertEquals(d1, rs2.getDouble(1), 0);
+ assertEquals(d2, rs2.getDouble(2), 0);
+
+ rs2 = stmt2.executeQuery("select * from (call
refreshMatView('TEST.RANDOMVIEW', false)) p");
+
+ Thread.sleep(1000);
+
+ //make sure we're still valid and the same
+ stmt.execute("select * from TEST.RANDOMVIEW");
+ rs = stmt.getResultSet();
+ assertTrue(rs.next());
+ d1 = rs.getDouble(1);
+ d2 = rs.getDouble(2);
+ stmt2.execute("select * from TEST.RANDOMVIEW");
+ rs2 = stmt2.getResultSet();
+ assertTrue(rs2.next());
+ assertEquals(d1, rs2.getDouble(1), 0);
+ assertEquals(d2, rs2.getDouble(2), 0);
+
+ //ensure a lookup is usable on each side
+ rs2 = stmt2.executeQuery("select lookup('sys.schemas', 'VDBName',
'name', 'SYS')");
+ Thread.sleep(1000);
+
+ rs = stmt.executeQuery("select lookup('sys.schemas', 'VDBName',
'name', 'SYS')");
+ rs.next();
+ assertEquals("matviews", rs.getString(1));
+
+ server1.stop();
+ server2.stop();
+ }
+
+ private FakeServer createServer() throws Exception {
+ FakeServer server = new FakeServer();
+
+ JGroupsObjectReplicator jor = new JGroupsObjectReplicator();
+ jor.setClusterName("demo");
+ jor.setMultiplexerStack("tcp");
+ JChannelFactory jcf = new JChannelFactory();
+
jcf.setMultiplexerConfig(this.getClass().getClassLoader().getResource("stacks.xml"));
//$NON-NLS-1$
+ jor.setChannelFactory(jcf);
+
+ server.setReplicator(jor);
+ HashMap<String, Collection<FunctionMethod>> udfs = new
HashMap<String, Collection<FunctionMethod>>();
+ udfs.put("funcs", Arrays.asList(new FunctionMethod("pause",
null, null, PushDown.CANNOT_PUSHDOWN, TestMatViews.class.getName(), "pause",
null, new FunctionParameter("return", DataTypeManager.DefaultDataTypes.INTEGER),
false, Determinism.NONDETERMINISTIC)));
+ server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() +
"/matviews.vdb", udfs);
+ return server;
+ }
+
+}
Property changes on:
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain