teiid SVN: r1777 - in trunk: client/src/main/java/com/metamatrix/dqp/message and 17 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2010-01-24 22:26:22 -0500 (Sun, 24 Jan 2010)
New Revision: 1777
Added:
trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java
Removed:
trunk/engine/src/main/java/com/metamatrix/common/buffer/ManagedBatch.java
Modified:
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java
trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMDataSource.java
trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java
trunk/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java
trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java
trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/JDBCBaseExecution.java
trunk/connectors/connector-jdbc/src/main/resources/connector-jdbc.xml
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBatch.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/FileStorageManager.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryStorageManager.java
trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java
trunk/engine/src/main/java/com/metamatrix/query/processor/CollectionTupleSource.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java
trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
Log:
TEIID-913 introducing a forward only buffermanager and taking steps to reduce memory consumption, including a canonical value map and streaming of batches to and from disk.
Modified: trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -151,7 +151,7 @@
readIsNullData(in, isNull);
for (int i = 0; i < batch.length; i++) {
if (!isNullObject(isNull, i)) {
- batch[i].set(col, readObject(in));
+ batch[i].set(col, DataTypeManager.getCanonicalValue(readObject(in)));
}
}
}
Modified: trunk/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -39,7 +39,7 @@
static final long serialVersionUID = 2258063872049251854L;
- public static final int DEFAULT_FETCH_SIZE = 2000;
+ public static final int DEFAULT_FETCH_SIZE = 2048;
private String[] commands;
private boolean isBatchedUpdate;
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -729,12 +729,6 @@
String partial = getExecutionProperty(ExecutionProperties.PROP_PARTIAL_RESULTS_MODE);
res.setPartialResults(Boolean.valueOf(partial).booleanValue());
- // Get fetch size
- res.setFetchSize(fetchSize);
-
- // Get cursor type
- res.setCursorType(this.resultSetType);
-
// Get xml validation mode
String validate = getExecutionProperty(ExecutionProperties.PROP_XML_VALIDATION);
if(validate == null) {
@@ -846,8 +840,8 @@
sqlOptions.toUpperCase().indexOf(ExecutionProperties.SQL_OPTION_SHOWPLAN.toUpperCase()) >= 0) {
reqMsg.setShowPlan(true);
}
-
- reqMsg.setFetchSize(getFetchSize());
+ reqMsg.setCursorType(this.resultSetType);
+ reqMsg.setFetchSize(this.fetchSize);
reqMsg.setStyleSheet(this.styleSheet);
reqMsg.setRowLimit(this.maxRows);
Modified: trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMDataSource.java
===================================================================
--- trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMDataSource.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMDataSource.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -613,7 +613,7 @@
ds.setServerName("hostName"); //$NON-NLS-1$
ds.setDatabaseName("vdbName"); //$NON-NLS-1$
ds.setPortNumber(1);
- assertEquals("jdbc:teiid:vdbName@mm://hostname:1;fetchSize=2000;ApplicationName=JDBC;serverURL=mm://hostname:1;a=b;VirtualDatabaseName=vdbName;foo=bar", ds.buildURL()); //$NON-NLS-1$
+ assertEquals("jdbc:teiid:vdbName@mm://hostname:1;fetchSize=2048;ApplicationName=JDBC;serverURL=mm://hostname:1;a=b;VirtualDatabaseName=vdbName;foo=bar", ds.buildURL()); //$NON-NLS-1$
}
public void testInvalidDataSource() {
Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -23,6 +23,7 @@
package com.metamatrix.common.types;
import java.io.IOException;
+import java.lang.ref.WeakReference;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Blob;
@@ -42,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.WeakHashMap;
import javax.xml.transform.Source;
@@ -61,6 +63,7 @@
import com.metamatrix.common.types.basic.NumberToLongTransform;
import com.metamatrix.common.types.basic.NumberToShortTransform;
import com.metamatrix.common.types.basic.ObjectToAnyTransform;
+import com.metamatrix.common.util.PropertiesUtils;
import com.metamatrix.core.CorePlugin;
import com.metamatrix.core.ErrorMessageKeys;
import com.metamatrix.core.MetaMatrixRuntimeException;
@@ -81,6 +84,11 @@
* </p>
*/
public class DataTypeManager {
+
+ private static final int MAX_VALUE_MAP_SIZE = 10000;
+ private static boolean USE_VALUE_CACHE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.useValueCache", true); //$NON-NLS-1$
+
+ private static Map<Class<?>, Map<Object, WeakReference<Object>>> valueMaps = new HashMap<Class<?>, Map<Object, WeakReference<Object>>>();
public static final int MAX_STRING_LENGTH = 4000;
@@ -418,19 +426,33 @@
*/
static void loadDataTypes() {
DataTypeManager.addDataType(DefaultDataTypes.BOOLEAN, DefaultDataClasses.BOOLEAN);
+ valueMaps.put(DefaultDataClasses.BOOLEAN, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.BYTE, DefaultDataClasses.BYTE);
+ valueMaps.put(DefaultDataClasses.BYTE, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.SHORT, DefaultDataClasses.SHORT);
+ valueMaps.put(DefaultDataClasses.SHORT, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.CHAR, DefaultDataClasses.CHAR);
+ valueMaps.put(DefaultDataClasses.CHAR, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.INTEGER, DefaultDataClasses.INTEGER);
+ valueMaps.put(DefaultDataClasses.INTEGER, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.LONG, DefaultDataClasses.LONG);
+ valueMaps.put(DefaultDataClasses.LONG, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.BIG_INTEGER, DefaultDataClasses.BIG_INTEGER);
+ valueMaps.put(DefaultDataClasses.BIG_INTEGER, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.FLOAT, DefaultDataClasses.FLOAT);
+ valueMaps.put(DefaultDataClasses.FLOAT, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.DOUBLE, DefaultDataClasses.DOUBLE);
+ valueMaps.put(DefaultDataClasses.DOUBLE, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.BIG_DECIMAL, DefaultDataClasses.BIG_DECIMAL);
+ valueMaps.put(DefaultDataClasses.BIG_DECIMAL, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.DATE, DefaultDataClasses.DATE);
+ valueMaps.put(DefaultDataClasses.DATE, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.TIME, DefaultDataClasses.TIME);
+ valueMaps.put(DefaultDataClasses.TIME, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.TIMESTAMP, DefaultDataClasses.TIMESTAMP);
+ valueMaps.put(DefaultDataClasses.TIMESTAMP, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.STRING, DefaultDataClasses.STRING);
+ valueMaps.put(DefaultDataClasses.STRING, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.CLOB, DefaultDataClasses.CLOB);
DataTypeManager.addDataType(DefaultDataTypes.XML, DefaultDataClasses.XML);
DataTypeManager.addDataType(DefaultDataTypes.OBJECT, DefaultDataClasses.OBJECT);
@@ -686,7 +708,8 @@
Object[] params = new Object[] { sourceType, targetClass, value};
throw new TransformationException(CorePlugin.Util.getString("ObjectToAnyTransform.Invalid_value", params)); //$NON-NLS-1$
}
- return (T) transform.transform(value);
+ T result = (T) transform.transform(value);
+ return getCanonicalValue(result);
}
public static boolean isNonComparable(String type) {
@@ -699,4 +722,29 @@
public static <S> void addSourceTransform(Class<S> sourceClass, SourceTransform<S, ?> transform) {
sourceConverters.put(sourceClass, transform);
}
+
+ @SuppressWarnings("unchecked")
+ public static <T> T getCanonicalValue(T value) {
+ if (USE_VALUE_CACHE) {
+ if (value == null) {
+ return null;
+ }
+ Map<Object, WeakReference<Object>> valueMap = valueMaps.get(value.getClass());
+ if (valueMap == null) {
+ return value;
+ }
+ WeakReference<Object> valueReference = valueMap.get(value);
+ Object canonicalValue = null;
+ if (valueReference != null) {
+ canonicalValue = valueReference.get();
+ }
+ if (canonicalValue != null) {
+ return (T)canonicalValue;
+ }
+ if (valueMap.size() <= MAX_VALUE_MAP_SIZE) {
+ valueMap.put(value, new WeakReference<Object>(value));
+ }
+ }
+ return value;
+ }
}
Modified: trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/JDBCBaseExecution.java
===================================================================
--- trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/JDBCBaseExecution.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/JDBCBaseExecution.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -80,7 +80,10 @@
this.logger = logger;
this.context = context;
- fetchSize = PropertiesUtils.getIntProperty(props, JDBCPropertyNames.FETCH_SIZE, context.getBatchSize());
+ fetchSize = PropertiesUtils.getIntProperty(props, JDBCPropertyNames.FETCH_SIZE, 0);
+ if (fetchSize == 0) {
+ fetchSize = context.getBatchSize();
+ }
maxResultRows = PropertiesUtils.getIntProperty(props, ConnectorPropertyNames.MAX_RESULT_ROWS, -1);
//if the connector work needs to throw an excpetion, set the size plus 1
if (maxResultRows > 0 && PropertiesUtils.getBooleanProperty(props, ConnectorPropertyNames.EXCEPTION_ON_MAX_ROWS, false)) {
Modified: trunk/connectors/connector-jdbc/src/main/resources/connector-jdbc.xml
===================================================================
--- trunk/connectors/connector-jdbc/src/main/resources/connector-jdbc.xml 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/connectors/connector-jdbc/src/main/resources/connector-jdbc.xml 2010-01-25 03:26:22 UTC (rev 1777)
@@ -17,6 +17,7 @@
<PropertyDefinition Name="ConnectionSource" DisplayName="Connection Source Class" ShortDescription="Driver, DataSource, or XADataSource class name" IsRequired="true" />
<PropertyDefinition Name="TrimStrings" DisplayName="Trim string flag" ShortDescription="Right Trim fixed character types returned as Strings - note that the native type must be char or nchar and the source must support the rtrim function." DefaultValue="false" PropertyType="Boolean" IsExpert="true" />
<PropertyDefinition Name="UseCommentsInSourceQuery" DisplayName="Use informational comments in Source Queries" ShortDescription="This will embed /*comment*/ style comment with session/request id in source SQL query for informational purposes" DefaultValue="false" PropertyType="Boolean" IsExpert="true"/>
+ <PropertyDefinition Name="FetchSize" DisplayName="Statement Fetch Size" ShortDescription="Statement Fetch Size" DefaultValue="0" PropertyType="Integer" IsExpert="true"/>
</ComponentType>
<ComponentType Name="Oracle Connector" ComponentTypeCode="2" Deployable="true" Deprecated="false" Monitorable="false" SuperComponentType="JDBC Connector" ParentComponentType="Connectors" LastChangedBy="ConfigurationStartup" CreatedBy="ConfigurationStartup">
<PropertyDefinition Name="ConnectionSource" DisplayName="Connection Source Class" ShortDescription="Driver, DataSource, or XADataSource class name" DefaultValue="oracle.jdbc.driver.OracleDriver" IsRequired="true" />
Added: trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java (rev 0)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.buffer;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+
+public interface BatchManager {
+
+ public interface ManagedBatch {
+
+ TupleBatch getBatch(boolean cache, String[] types) throws MetaMatrixComponentException;
+
+ void remove();
+
+ }
+
+ ManagedBatch createManagedBatch(TupleBatch batch) throws MetaMatrixComponentException;
+
+ void remove();
+
+}
Property changes on: trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -48,48 +48,17 @@
FINAL
}
- /**
- * Optional property - the max size of a batch sent between connector and query service.
- * Making batches larger reduces communication overhead between connector and query service
- * but increases the granularity of memory management on those batches. This value should
- * be a positive integer and defaults to 1000.
- */
- public static final String CONNECTOR_BATCH_SIZE = "metamatrix.buffer.connectorBatchSize"; //$NON-NLS-1$
- /**
- * Optional property - the max size of a batch sent internally within the query processor.
- * In general, these batches should be smaller than the connector batch size as there are
- * no communication costs with these batches. Smaller batches typically allow a user to
- * get their first results quicker and allow fine-grained buffer management on intermediate
- * results. This value should be a positive integer and defaults to 100.
- */
- public static final String PROCESSOR_BATCH_SIZE = "metamatrix.buffer.processorBatchSize"; //$NON-NLS-1$
- /**
- * Optional property - this value specifies the location to store temporary buffers to
- * large to fit in memory. Temporary buffer files will be created and destroyed in this
- * directory. This value should be a string specifying an absolute directory path.
- */
- public static final String BUFFER_STORAGE_DIRECTORY = "metamatrix.buffer.storageDirectory"; //$NON-NLS-1$
- /**
- * Optional property - this values specifies how many open file descriptors should be cached
- * in the storage directory. Increasing this value in heavy load may improve performance
- * but will use more file descriptors, which are a limited system resource.
- */
- public static final String MAX_OPEN_FILES = "metamatrix.buffer.maxOpenFiles"; //$NON-NLS-1$
- /**
- * Optional property - this values specifies the maximum size in MegaBytes that a buffer file can reach.
- * The default is 2048 MB (i.e. 2GB).
- */
- public static final String MAX_FILE_SIZE = "metamatrix.buffer.maxFileSize"; //$NON-NLS-1$
- /**
- * Optional property - the max number of batches to process at once in algorithms such as sorting.
- */
- public static final String MAX_PROCESSING_BATCHES = "metamatrix.buffer.maxProcessingBatches"; //$NON-NLS-1$
-
public static int DEFAULT_CONNECTOR_BATCH_SIZE = 2048;
public static int DEFAULT_PROCESSOR_BATCH_SIZE = 1024;
public static int DEFAULT_MAX_PROCESSING_BATCHES = 8;
+
+ /**
+ * The BufferManager may maintain at least this many batch references in memory.
+ *
+ * Up to 2x this value may be held by soft references.
+ */
public static int DEFAULT_RESERVE_BUFFERS = 64;
-
+
/**
* Get the batch size to use during query processing.
* @return Batch size (# of rows)
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -59,13 +59,22 @@
@Override
public void write(byte[] b, int off, int len) throws IOException {
- if (count + len <= buffer.length) {
- System.arraycopy(b, off, buffer, count, len);
- count += len;
+ if (len > buffer.length) {
+ flushBuffer();
+ writeDirect(b, off, len);
return;
}
+ int bufferedLength = Math.min(len, buffer.length - count);
+ if (count < buffer.length) {
+ System.arraycopy(b, off, buffer, count, bufferedLength);
+ count += bufferedLength;
+ if (bufferedLength == len) {
+ return;
+ }
+ }
flushBuffer();
- writeDirect(b, off, len);
+ System.arraycopy(b, off + bufferedLength, buffer, count, len - bufferedLength);
+ count += len - bufferedLength;
}
private void writeDirect(byte[] b, int off, int len) throws IOException {
@@ -77,7 +86,7 @@
}
}
- private void flushBuffer() throws IOException {
+ public void flushBuffer() throws IOException {
if (count > 0) {
writeDirect(buffer, 0, count);
count = 0;
@@ -119,6 +128,9 @@
}
}
+ private boolean removed;
+ private long len;
+
public void setCleanupReference(Object o) {
REFERENCES.add(new CleanupReference(o, this));
for (int i = 0; i < 10; i++) {
@@ -131,8 +143,10 @@
}
}
- private boolean removed;
-
+ public synchronized long getLength() {
+ return len;
+ }
+
public int read(long fileOffset, byte[] b, int offSet, int length)
throws MetaMatrixComponentException {
if (removed) {
@@ -155,18 +169,21 @@
} while (n < length);
}
- public long write(byte[] bytes) throws MetaMatrixComponentException {
- return write(bytes, 0, bytes.length);
+ public void write(byte[] bytes) throws MetaMatrixComponentException {
+ write(bytes, 0, bytes.length);
}
- public long write(byte[] bytes, int offset, int length) throws MetaMatrixComponentException {
+ public synchronized long write(byte[] bytes, int offset, int length) throws MetaMatrixComponentException {
if (removed) {
throw new MetaMatrixComponentException("already removed"); //$NON-NLS-1$
}
- return writeDirect(bytes, offset, length);
+ writeDirect(bytes, offset, length);
+ long result = len;
+ len += length;
+ return result;
}
- protected abstract long writeDirect(byte[] bytes, int offset, int length) throws MetaMatrixComponentException;
+ protected abstract void writeDirect(byte[] bytes, int offset, int length) throws MetaMatrixComponentException;
public void remove() {
if (!this.removed) {
@@ -177,9 +194,9 @@
protected abstract void removeDirect();
- public InputStream createInputStream() {
+ public InputStream createInputStream(final long start) {
return new InputStream() {
- private long offset;
+ private long offset = start;
@Override
public int read() throws IOException {
Deleted: trunk/engine/src/main/java/com/metamatrix/common/buffer/ManagedBatch.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/ManagedBatch.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/ManagedBatch.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -1,78 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer;
-
-import java.lang.ref.SoftReference;
-
-class ManagedBatch {
-
- private int beginRow;
- private SoftReference<TupleBatch> batchReference;
- private long offset;
- private int length;
-
- /**
- * Constructor for ManagedBatch.
- */
- public ManagedBatch(TupleBatch batch) {
- this.beginRow = batch.getBeginRow();
- this.batchReference = new SoftReference<TupleBatch>(batch);
- }
-
- /**
- * Get the begin row, must be >= 1
- * @return Begin row
- */
- public int getBeginRow() {
- return this.beginRow;
- }
-
- public String toString() {
- return "ManagedBatch[" + beginRow + "]"; //$NON-NLS-1$ //$NON-NLS-2$
- }
-
- public TupleBatch getBatch() {
- return this.batchReference.get();
- }
-
- public void setBatchReference(TupleBatch batch) {
- this.batchReference = new SoftReference<TupleBatch>(batch);
- }
-
- public int getLength() {
- return length;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public void setLength(int length) {
- this.length = length;
- }
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
-}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBatch.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBatch.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBatch.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -26,6 +26,7 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.io.Serializable;
import java.util.List;
import com.metamatrix.common.batch.BatchSerializer;
@@ -40,8 +41,10 @@
* This object is immutable and Serializable;
*/
public class TupleBatch implements Externalizable {
-
- private int rowOffset;
+
+ private static final long serialVersionUID = 6304443387337336957L;
+
+ private int rowOffset;
private List[] tuples;
// Optional state
@@ -140,6 +143,10 @@
this.terminationFlag = terminationFlag;
}
+ public String[] getDataTypes() {
+ return types;
+ }
+
public void setDataTypes(String[] types) {
this.types = types;
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -22,35 +22,29 @@
package com.metamatrix.common.buffer;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.common.types.Streamable;
-import com.metamatrix.core.util.AccessibleByteArrayOutputStream;
import com.metamatrix.core.util.Assertion;
import com.metamatrix.dqp.DQPPlugin;
-import com.metamatrix.query.execution.QueryExecPlugin;
+import com.metamatrix.query.sql.symbol.Expression;
public class TupleBuffer {
class TupleSourceImpl implements IndexedTupleSource {
- private SoftReference<TupleBatch> currentBatch;
private int currentRow = 1;
private int mark = 1;
private List<?> currentTuple;
+ private TupleBatch batch;
@Override
public int getCurrentIndex() {
@@ -80,47 +74,30 @@
private List<?> getCurrentTuple() throws MetaMatrixComponentException,
BlockedException {
- TupleBatch batch = getBatch();
- if(batch.getRowCount() == 0) {
- // Check if last
- if(isFinal) {
- currentBatch = null;
- return null;
- }
- throw BlockedException.INSTANCE;
- }
-
- return batch.getTuple(currentRow);
+ if (currentRow <= rowCount) {
+ if (forwardOnly) {
+ if (batch == null || currentRow > batch.getEndRow()) {
+ batch = getBatch(currentRow);
+ }
+ return batch.getTuple(currentRow);
+ }
+ //TODO: determine if we should directly hold a soft reference here
+ return getRow(currentRow);
+ }
+ if(isFinal) {
+ return null;
+ }
+ throw BlockedException.INSTANCE;
}
@Override
public void closeSource()
throws MetaMatrixComponentException{
- currentBatch = null;
+ batch = null;
mark = 1;
reset();
}
- // Retrieves the necessary batch based on the currentRow
- TupleBatch getBatch() throws MetaMatrixComponentException{
- TupleBatch batch = null;
- if (currentBatch != null) {
- batch = currentBatch.get();
- }
- if (batch != null) {
- if (currentRow <= batch.getEndRow() && currentRow >= batch.getBeginRow()) {
- return batch;
- }
- currentBatch = null;
- }
-
- batch = TupleBuffer.this.getBatch(currentRow);
- if (batch != null) {
- currentBatch = new SoftReference<TupleBatch>(batch);
- }
- return batch;
- }
-
@Override
public boolean hasNext() throws MetaMatrixComponentException {
if (this.currentTuple != null) {
@@ -149,34 +126,56 @@
this.currentTuple = null;
}
}
+
+ @Override
+ public int available() {
+ return 0;
+ }
}
+
+ /**
+ * Gets the data type names for each of the input expressions, in order.
+ * @param expressions List of Expressions
+ * @return
+ * @since 4.2
+ */
+ public static String[] getTypeNames(List expressions) {
+ if (expressions == null) {
+ return null;
+ }
+ String[] types = new String[expressions.size()];
+ for (ListIterator i = expressions.listIterator(); i.hasNext();) {
+ Expression expr = (Expression)i.next();
+ types[i.previousIndex()] = DataTypeManager.getDataTypeName(expr.getType());
+ }
+ return types;
+ }
private static final AtomicLong LOB_ID = new AtomicLong();
//construction state
- private StorageManager manager;
+ private BatchManager manager;
private String tupleSourceID;
private List<?> schema;
private String[] types;
private int batchSize;
- private FileStore store;
-
private int rowCount;
private boolean isFinal;
- private TreeMap<Integer, ManagedBatch> batches = new TreeMap<Integer, ManagedBatch>();
+ private TreeMap<Integer, BatchManager.ManagedBatch> batches = new TreeMap<Integer, BatchManager.ManagedBatch>();
private ArrayList<List<?>> batchBuffer;
- private AtomicInteger referenceCount = new AtomicInteger(1);
+ private boolean removed;
+ private boolean forwardOnly;
//lob management
private Map<String, Streamable<?>> lobReferences; //references to contained lobs
private boolean lobs = true;
- public TupleBuffer(StorageManager manager, String id, List<?> schema, String[] types, int batchSize) {
+ public TupleBuffer(BatchManager manager, String id, List<?> schema, int batchSize) {
this.manager = manager;
this.tupleSourceID = id;
this.schema = schema;
- this.types = types;
+ this.types = getTypeNames(schema);
this.batchSize = batchSize;
if (types != null) {
int i = 0;
@@ -215,33 +214,16 @@
void saveBatch(boolean finalBatch) throws MetaMatrixComponentException {
Assertion.assertTrue(!this.isRemoved());
- if (batchBuffer == null || batchBuffer.isEmpty()) {
+ if (batchBuffer == null || batchBuffer.isEmpty() || batchBuffer.size() < Math.max(1, batchSize / 32)) {
return;
}
- List<?> rows = batchBuffer==null?Collections.emptyList():batchBuffer;
- TupleBatch writeBatch = new TupleBatch(rowCount - rows.size() + 1, rows);
+ TupleBatch writeBatch = new TupleBatch(rowCount - batchBuffer.size() + 1, batchBuffer);
if (finalBatch) {
writeBatch.setTerminationFlag(true);
}
- ManagedBatch mbatch = new ManagedBatch(writeBatch);
- if (this.store == null) {
- this.store = this.manager.createFileStore(this.tupleSourceID);
- this.store.setCleanupReference(this);
- }
- AccessibleByteArrayOutputStream baos = null;
- try {
- baos = new AccessibleByteArrayOutputStream(1024);
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- writeBatch.setDataTypes(types);
- writeBatch.writeExternal(oos);
- oos.flush();
- oos.close();
- } catch(IOException e) {
- throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStorageManager.batch_error")); //$NON-NLS-1$
- }
- mbatch.setLength(baos.getCount());
- mbatch.setOffset(this.store.write(baos.getBuffer(), 0, baos.getCount()));
- this.batches.put(mbatch.getBeginRow(), mbatch);
+ writeBatch.setDataTypes(types);
+ BatchManager.ManagedBatch mbatch = manager.createManagedBatch(writeBatch);
+ this.batches.put(writeBatch.getBeginRow(), mbatch);
batchBuffer = null;
}
@@ -252,7 +234,23 @@
}
this.isFinal = true;
}
+
+ List<?> getRow(int row) throws MetaMatrixComponentException {
+ if (this.batchBuffer != null && row > rowCount - this.batchBuffer.size()) {
+ return this.batchBuffer.get(row - rowCount + this.batchBuffer.size() - 1);
+ }
+ TupleBatch batch = getBatch(row);
+ return batch.getTuple(row);
+ }
+ /**
+ * Get the batch containing the given row.
+ * NOTE: the returned batch may be empty or may begin with a row other
+ * than the one specified.
+ * @param row
+ * @return
+ * @throws MetaMatrixComponentException
+ */
public TupleBatch getBatch(int row) throws MetaMatrixComponentException {
if (row > rowCount) {
TupleBatch batch = new TupleBatch(rowCount + 1, new List[] {});
@@ -262,58 +260,44 @@
return batch;
}
if (this.batchBuffer != null && row > rowCount - this.batchBuffer.size()) {
- return new TupleBatch(rowCount - this.batchBuffer.size() + 1, batchBuffer);
+ TupleBatch result = new TupleBatch(rowCount - this.batchBuffer.size() + 1, batchBuffer);
+ if (forwardOnly) {
+ this.batchBuffer = null;
+ }
+ return result;
}
if (this.batchBuffer != null && !this.batchBuffer.isEmpty()) {
+ //this is just a sanity check to ensure we're not holding too many
+ //hard references to batches.
saveBatch(isFinal);
}
- Map.Entry<Integer, ManagedBatch> entry = batches.floorEntry(row);
+ Map.Entry<Integer, BatchManager.ManagedBatch> entry = batches.floorEntry(row);
Assertion.isNotNull(entry);
- ManagedBatch batch = entry.getValue();
- TupleBatch result = batch.getBatch();
- if (result != null) {
- return result;
+ BatchManager.ManagedBatch batch = entry.getValue();
+ TupleBatch result = batch.getBatch(!forwardOnly, types);
+ if (lobs && result.getDataTypes() == null) {
+ correctLobReferences(result.getAllTuples());
}
- try {
- byte[] bytes = new byte[batch.getLength()];
- this.store.readFully(batch.getOffset(), bytes, 0, bytes.length);
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- ObjectInputStream ois = new ObjectInputStream(bais);
-
- result = new TupleBatch();
- result.setDataTypes(types);
- result.readExternal(ois);
- } catch(IOException e) {
- throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStoreageManager.error_reading", tupleSourceID)); //$NON-NLS-1$
- } catch (ClassNotFoundException e) {
- throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStoreageManager.error_reading", tupleSourceID)); //$NON-NLS-1$
- }
- if (lobs) {
- correctLobReferences(result.getAllTuples());
+ result.setDataTypes(types);
+ if (forwardOnly) {
+ batches.remove(entry.getKey());
}
- batch.setBatchReference(result);
return result;
}
public void remove() {
- int count = this.referenceCount.getAndDecrement();
- if (count == 0) {
- if (this.store != null) {
- this.store.remove();
- this.store = null;
- }
+ if (!removed) {
+ this.manager.remove();
if (this.batchBuffer != null) {
this.batchBuffer = null;
}
+ for (BatchManager.ManagedBatch batch : this.batches.values()) {
+ batch.remove();
+ }
this.batches.clear();
}
}
- public boolean addReference() {
- int count = this.referenceCount.addAndGet(1);
- return count > 1;
- }
-
public int getRowCount() {
return rowCount;
}
@@ -383,6 +367,10 @@
}
}
+ public void setForwardOnly(boolean forwardOnly) {
+ this.forwardOnly = forwardOnly;
+ }
+
/**
* Create a new iterator for this buffer
* @return
@@ -393,11 +381,11 @@
@Override
public String toString() {
- return this.tupleSourceID.toString();
+ return this.tupleSourceID;
}
public boolean isRemoved() {
- return this.referenceCount.get() <= 0;
+ return removed;
}
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -60,5 +60,11 @@
*/
void closeSource()
throws MetaMatrixComponentException;
+
+ /**
+ * Returns an estimate of the number of rows that can be read without blocking.
+ * @return
+ */
+ int available();
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -22,9 +22,20 @@
package com.metamatrix.common.buffer.impl;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.ref.Reference;
+import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
-import java.util.ListIterator;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -32,10 +43,14 @@
import javax.xml.transform.Source;
import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.buffer.BatchManager;
import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.FileStore;
import com.metamatrix.common.buffer.StorageManager;
+import com.metamatrix.common.buffer.TupleBatch;
import com.metamatrix.common.buffer.TupleBuffer;
+import com.metamatrix.common.buffer.BatchManager.ManagedBatch;
+import com.metamatrix.common.buffer.FileStore.FileStoreOutputStream;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.common.types.InputStreamFactory;
@@ -49,34 +64,185 @@
import com.metamatrix.core.log.MessageLevel;
import com.metamatrix.core.util.Assertion;
import com.metamatrix.dqp.util.LogConstants;
+import com.metamatrix.query.execution.QueryExecPlugin;
import com.metamatrix.query.processor.xml.XMLUtil;
-import com.metamatrix.query.sql.symbol.Expression;
/**
* <p>Default implementation of BufferManager.</p>
* Responsible for creating/tracking TupleBuffers and providing access to the StorageManager
*/
public class BufferManagerImpl implements BufferManager, StorageManager {
+
+ private static final int IO_BUFFER_SIZE = 1 << 14;
+
+ private final class ManagedBatchImpl implements ManagedBatch {
+ final private String id;
+ final private FileStore store;
+
+ private long offset = -1;
+ private boolean persistent;
+ private volatile TupleBatch pBatch;
+ private Reference<TupleBatch> batchReference;
+
+ public ManagedBatchImpl(String id, FileStore store, TupleBatch batch) throws MetaMatrixComponentException {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", batchAdded.incrementAndGet()); //$NON-NLS-1$
+ this.id = id;
+ this.store = store;
+ this.pBatch = batch;
+ if (batch.getBeginRow() == 1) {
+ activeBatches.add(this);
+ } else {
+ this.persist(false);
+ }
+ persistBatchReferences();
+ }
+ @Override
+ public TupleBatch getBatch(boolean cache, String[] types) throws MetaMatrixComponentException {
+ readAttempts.getAndIncrement();
+ synchronized (this) {
+ if (this.batchReference != null && this.pBatch == null) {
+ TupleBatch result = this.batchReference.get();
+ if (result != null) {
+ if (!cache) {
+ softCache.remove(this);
+ this.batchReference.clear();
+ }
+ return result;
+ }
+ }
+
+ TupleBatch batch = this.pBatch;
+ if (batch != null){
+ activeBatches.remove(this);
+ if (cache) {
+ activeBatches.add(this);
+ }
+ return batch;
+ }
+ }
+ persistBatchReferences();
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reading batch from disk", readCount.incrementAndGet()); //$NON-NLS-1$
+ synchronized (this) {
+ //Resurrect from disk
+ try {
+ ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(store.createInputStream(this.offset), IO_BUFFER_SIZE));
+ TupleBatch batch = new TupleBatch();
+ batch.setDataTypes(types);
+ batch.readExternal(ois);
+ batch.setDataTypes(null);
+ if (cache) {
+ this.pBatch = batch;
+ activeBatches.add(this);
+ }
+ return batch;
+ } catch(IOException e) {
+ throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStoreageManager.error_reading", id)); //$NON-NLS-1$
+ } catch (ClassNotFoundException e) {
+ throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStoreageManager.error_reading", id)); //$NON-NLS-1$
+ }
+ }
+ }
+
+ public void persistBatchReferences() throws MetaMatrixComponentException {
+ persistOneBatch(softCache, reserveBatches * 2, false);
+ persistOneBatch(activeBatches, reserveBatches, true);
+ }
+
+ private void persistOneBatch(Set<ManagedBatchImpl> set, int requiredSize, boolean createSoft) throws MetaMatrixComponentException {
+ ManagedBatchImpl mb = null;
+ synchronized (set) {
+ if (set.size() > requiredSize) {
+ Iterator<ManagedBatchImpl> iter = set.iterator();
+ mb = iter.next();
+ iter.remove();
+ }
+ }
+ try {
+ if (mb != null) {
+ mb.persist(createSoft);
+ }
+ } catch (MetaMatrixComponentException e) {
+ if (mb == this) {
+ throw e;
+ }
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
+ }
+ }
+
+ public synchronized void persist(boolean createSoft) throws MetaMatrixComponentException {
+ try {
+ TupleBatch batch = pBatch;
+ if (batch != null) {
+ if (!persistent) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Writing batch to disk", writeCount.incrementAndGet()); //$NON-NLS-1$
+ synchronized (store) {
+ offset = store.getLength();
+ FileStoreOutputStream fsos = store.createOutputStream(IO_BUFFER_SIZE);
+ ObjectOutputStream oos = new ObjectOutputStream(fsos);
+ batch.writeExternal(oos);
+ oos.flush();
+ oos.close();
+ fsos.flushBuffer();
+ }
+ }
+ if (createSoft) {
+ this.batchReference = new SoftReference<TupleBatch>(batch);
+ softCache.add(this);
+ } else {
+ this.batchReference = new WeakReference<TupleBatch>(batch);
+ }
+ }
+ } catch (IOException e) {
+ throw new MetaMatrixComponentException(e);
+ } finally {
+ persistent = true;
+ pBatch = null;
+ }
+ }
+
+ public void remove() {
+ activeBatches.remove(this);
+ softCache.remove(this);
+ pBatch = null;
+ if (batchReference != null) {
+ batchReference.clear();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ManagedBatch " + id + " " + pBatch; //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ }
+
// Configuration
private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
private int maxProcessingBatches = BufferManager.DEFAULT_MAX_PROCESSING_BATCHES;
private int reserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
private int maxReserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
+
private ReentrantLock lock = new ReentrantLock(true);
private Condition batchesFreed = lock.newCondition();
+ private Set<ManagedBatchImpl> activeBatches = Collections.synchronizedSet(new LinkedHashSet<ManagedBatchImpl>());
+ private Set<ManagedBatchImpl> softCache = Collections.synchronizedSet(new LinkedHashSet<ManagedBatchImpl>());
+
private StorageManager diskMgr;
- private AtomicLong currentTuple = new AtomicLong(0);
-
+ private AtomicLong currentTuple = new AtomicLong();
+ private AtomicInteger batchAdded = new AtomicInteger();
+ private AtomicInteger readCount = new AtomicInteger();
+ private AtomicInteger writeCount = new AtomicInteger();
+ private AtomicInteger readAttempts = new AtomicInteger();
+
public int getMaxProcessingBatches() {
return maxProcessingBatches;
}
public void setMaxProcessingBatches(int maxProcessingBatches) {
- this.maxProcessingBatches = maxProcessingBatches;
+ this.maxProcessingBatches = Math.max(2, maxProcessingBatches);
}
/**
@@ -118,11 +284,33 @@
}
@Override
- public TupleBuffer createTupleBuffer(List elements, String groupName,
+ public TupleBuffer createTupleBuffer(final List elements, String groupName,
TupleSourceType tupleSourceType)
throws MetaMatrixComponentException {
- String newID = String.valueOf(this.currentTuple.getAndIncrement());
- TupleBuffer tupleBuffer = new TupleBuffer(this, newID, elements, getTypeNames(elements), getProcessorBatchSize());
+ final String newID = String.valueOf(this.currentTuple.getAndIncrement());
+
+ BatchManager batchManager = new BatchManager() {
+ private FileStore store;
+
+ @Override
+ public ManagedBatch createManagedBatch(TupleBatch batch)
+ throws MetaMatrixComponentException {
+ if (this.store == null) {
+ this.store = createFileStore(newID);
+ this.store.setCleanupReference(this);
+ }
+ return new ManagedBatchImpl(newID, store, batch);
+ }
+
+ @Override
+ public void remove() {
+ if (this.store != null) {
+ this.store.remove();
+ this.store = null;
+ }
+ }
+ };
+ TupleBuffer tupleBuffer = new TupleBuffer(batchManager, newID, elements, getProcessorBatchSize());
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, new Object[]{"Creating TupleBuffer:", newID, "of type "+tupleSourceType}); //$NON-NLS-1$ //$NON-NLS-2$
}
@@ -139,7 +327,7 @@
@Override
public void initialize(Properties props) throws MetaMatrixComponentException {
- PropertiesUtils.setBeanProperties(this, props, "metamatrix.buffer"); //$NON-NLS-1$
+ PropertiesUtils.setBeanProperties(this, props, "org.teiid.buffer"); //$NON-NLS-1$
DataTypeManager.addSourceTransform(Source.class, new SourceTransform<Source, XMLType>() {
@Override
public XMLType transform(Source value) {
@@ -158,24 +346,6 @@
});
}
- /**
- * Gets the data type names for each of the input expressions, in order.
- * @param expressions List of Expressions
- * @return
- * @since 4.2
- */
- private static String[] getTypeNames(List expressions) {
- if (expressions == null) {
- return null;
- }
- String[] types = new String[expressions.size()];
- for (ListIterator i = expressions.listIterator(); i.hasNext();) {
- Expression expr = (Expression)i.next();
- types[i.previousIndex()] = DataTypeManager.getDataTypeName(expr.getType());
- }
- return types;
- }
-
@Override
public void releaseBuffers(int count) {
lock.lock();
@@ -209,6 +379,10 @@
lock.unlock();
}
}
+
+ public void setMaxReserveBatches(int maxReserveBatches) {
+ this.maxReserveBatches = maxReserveBatches;
+ }
public void shutdown() {
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/FileStorageManager.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/FileStorageManager.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -33,7 +33,6 @@
import java.util.TreeMap;
import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.FileStore;
import com.metamatrix.common.buffer.StorageManager;
import com.metamatrix.common.log.LogManager;
@@ -113,7 +112,7 @@
}
}
- public synchronized long writeDirect(byte[] bytes, int offset, int length) throws MetaMatrixComponentException {
+ public void writeDirect(byte[] bytes, int offset, int length) throws MetaMatrixComponentException {
Map.Entry<Long, FileInfo> entry = this.storageFiles.lastEntry();
boolean createNew = false;
FileInfo fileInfo = null;
@@ -139,7 +138,6 @@
fileAccess.setLength(pointer + length);
fileAccess.seek(pointer);
fileAccess.write(bytes, offset, length);
- return fileOffset + pointer;
} catch(IOException e) {
throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
} finally {
@@ -185,7 +183,7 @@
* @see com.metamatrix.common.buffer.BufferManager#MAX_FILE_SIZE
*/
public void initialize(Properties props) throws MetaMatrixComponentException {
- this.directory = props.getProperty(BufferManager.BUFFER_STORAGE_DIRECTORY);
+ PropertiesUtils.setBeanProperties(this, props, "org.teiid.buffer"); //$NON-NLS-1$
if(this.directory == null) {
throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString("FileStoreageManager.no_directory")); //$NON-NLS-1$
}
@@ -199,14 +197,20 @@
} else if(! dirFile.mkdirs()) {
throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString("FileStoreageManager.error_creating", dirFile.getAbsoluteFile())); //$NON-NLS-1$
}
-
- // Set up max number of open file descriptors
- maxOpenFiles = PropertiesUtils.getIntProperty(props, BufferManager.MAX_OPEN_FILES, DEFAULT_MAX_OPEN_FILES);
-
- // Set the max file size
- maxFileSize = PropertiesUtils.getIntProperty(props, BufferManager.MAX_FILE_SIZE, 2048) * 1024L * 1024L; // Multiply by 1MB
}
+ public void setMaxFileSize(long maxFileSize) {
+ this.maxFileSize = maxFileSize * 1024L * 1024L;
+ }
+
+ public void setMaxOpenFiles(int maxOpenFiles) {
+ this.maxOpenFiles = maxOpenFiles;
+ }
+
+ public void setStorageDirectory(String directory) {
+ this.directory = directory;
+ }
+
File createFile(String name, int fileNumber) throws MetaMatrixComponentException {
try {
File storageFile = File.createTempFile(FILE_PREFIX + name + "_" + String.valueOf(fileNumber) + "_", null, this.dirFile); //$NON-NLS-1$ //$NON-NLS-2$
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryStorageManager.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryStorageManager.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -41,20 +41,16 @@
public FileStore createFileStore(String name) {
return new FileStore() {
private ByteBuffer buffer = ByteBuffer.allocate(2 << 15);
- private int end;
@Override
- public synchronized long writeDirect(byte[] bytes, int offset, int length) throws MetaMatrixComponentException {
- if (end + length > buffer.capacity()) {
+ public void writeDirect(byte[] bytes, int offset, int length) throws MetaMatrixComponentException {
+ if (getLength() + length > buffer.capacity()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2 + length);
newBuffer.put(buffer);
buffer = newBuffer;
}
- buffer.position(end);
+ buffer.position((int)getLength());
buffer.put(bytes, offset, length);
- long result = end;
- end += length;
- return result;
}
@Override
@@ -65,12 +61,12 @@
@Override
public synchronized int readDirect(long fileOffset, byte[] b, int offset, int length)
throws MetaMatrixComponentException {
- if (fileOffset >= end) {
+ if (fileOffset >= getLength()) {
return -1;
}
int position = (int)fileOffset;
buffer.position(position);
- length = Math.min(length, end - position);
+ length = Math.min(length, (int)getLength() - position);
buffer.get(b, offset, length);
return length;
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -116,4 +116,12 @@
}
}
+ @Override
+ public int available() {
+ if (currentBatch != null) {
+ return currentBatch.getEndRow() - currentRow + 1;
+ }
+ return 0;
+ }
+
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/CollectionTupleSource.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/CollectionTupleSource.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/CollectionTupleSource.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -67,4 +67,12 @@
public void closeSource() {
}
+
+ @Override
+ public int available() {
+ if (tuples.hasNext()) {
+ return 1;
+ }
+ return 0;
+ }
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -106,6 +106,7 @@
if(collectionBuffer == null) {
collectionBuffer = mgr.createTupleBuffer(elements, groupName, TupleSourceType.PROCESSOR);
+ collectionBuffer.setForwardOnly(true);
}
List row = new ArrayList(1);
@@ -126,6 +127,7 @@
// Sort
sortUtility = new SortUtility(collectionBuffer.createIndexedTupleSource(), elements, sortTypes, Mode.DUP_REMOVE, mgr, groupName);
TupleBuffer sorted = sortUtility.sort();
+ sorted.setForwardOnly(true);
try {
// Add all input to proxy
TupleSource sortedSource = sorted.createIndexedTupleSource();
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -268,6 +268,14 @@
public void closeSource() throws MetaMatrixComponentException {
}
+
+ @Override
+ public int available() {
+ if (sourceBatch != null) {
+ return sourceBatch.getEndRow() - sourceRow + 1;
+ }
+ return 0;
+ }
};
}
@@ -287,6 +295,7 @@
private void sortPhase() throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException {
this.sortBuffer = this.sortUtility.sort();
+ this.sortBuffer.setForwardOnly(true);
this.groupTupleSource = this.sortBuffer.createIndexedTupleSource();
this.phase = GROUP;
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -42,21 +42,10 @@
*/
public class PartitionedSortJoin extends MergeJoinStrategy {
- /**
- * This is a compromise between the max size of the smaller side
- * and effective partitioning assuming that we only want to hold
- * two batches in memory during partitioning.
- *
- * TODO: apply partitioning recursively and/or have a better mechanism
- * for buffermanager reserve/release of memory
- * (would also help the sort utility)
- */
- public static final int MAX_PARTITIONS = 16;
-
private List[] endTuples;
private List<Boolean> overlap = new ArrayList<Boolean>();
private List<Integer> endRows = new ArrayList<Integer>();
- private List<TupleBuffer> partitionIds = new ArrayList<TupleBuffer>();
+ private List<TupleBuffer> partitions = new ArrayList<TupleBuffer>();
private int currentPartition;
private IndexedTupleSource currentSource;
private SourceState sortedSource;
@@ -73,13 +62,13 @@
@Override
public void close() {
super.close();
- for (TupleBuffer tupleSourceID : this.partitionIds) {
+ for (TupleBuffer tupleSourceID : this.partitions) {
tupleSourceID.remove();
}
this.endTuples = null;
this.overlap.clear();
this.endRows.clear();
- this.partitionIds.clear();
+ this.partitions.clear();
this.currentSource = null;
this.sortedSource = null;
this.partitionedSource = null;
@@ -128,7 +117,7 @@
protected void loadRight() throws MetaMatrixComponentException,
MetaMatrixProcessingException {
this.rightSource.getTupleBuffer();
- int maxRows = this.joinNode.getBatchSize() * MAX_PARTITIONS;
+ int maxRows = this.joinNode.getBatchSize() * getMaxProcessingBatches();
if (processingSortRight == SortOption.SORT
&& this.leftSource.getRowCount() < maxRows
&& this.leftSource.getRowCount() * 4 < this.rightSource.getRowCount()) {
@@ -161,13 +150,25 @@
}
}
}
+
+ /**
+ * Since the source to be partitioned is already loaded, then there's no
+ * chance of a blocked exception during partitioning, so double the max.
+ *
+ * TODO: partition at the same time as the load to determine size
+ *
+ * @return
+ */
+ private int getMaxProcessingBatches() {
+ return 2 * this.joinNode.getBufferManager().getMaxProcessingBatches();
+ }
private void partitionSource(boolean left) throws MetaMatrixComponentException,
MetaMatrixProcessingException {
if (partitioned) {
return;
}
- if (endTuples.length > MAX_PARTITIONS + 1) {
+ if (endTuples.length > getMaxProcessingBatches() + 1) {
if (left) {
this.processingSortLeft = SortOption.SORT;
} else {
@@ -176,13 +177,13 @@
return;
}
if (endTuples.length < 2) {
- partitionIds.add(this.partitionedSource.getTupleBuffer());
+ partitions.add(this.partitionedSource.getTupleBuffer());
} else {
- if (partitionIds.isEmpty()) {
+ if (partitions.isEmpty()) {
for (int i = 0; i < endTuples.length; i++) {
TupleBuffer tc = this.partitionedSource.createSourceTupleBuffer();
- tc.setBatchSize(Math.max(1, this.joinNode.getBatchSize()/4));
- this.partitionIds.add(tc);
+ tc.setBatchSize(Math.max(1, this.joinNode.getBatchSize()));
+ this.partitions.add(tc);
}
}
while (this.partitionedSource.getIterator().hasNext()) {
@@ -191,15 +192,18 @@
if (index < 0) {
index = -index - 1;
}
- if (index > this.partitionIds.size() -1) {
+ if (index > this.partitions.size() -1) {
continue;
}
while (index > 0 && this.overlap.get(index - 1)
&& compare(tuple, this.endTuples[index - 1], this.partitionedSource.getExpressionIndexes(), this.sortedSource.getExpressionIndexes()) == 0) {
index--;
}
- this.partitionIds.get(index).addTuple(tuple);
+ this.partitions.get(index).addTuple(tuple);
}
+ for (TupleBuffer partition : this.partitions) {
+ partition.saveBatch();
+ }
this.partitionedSource.getIterator().setPosition(1);
}
partitioned = true;
@@ -214,12 +218,12 @@
if (endRows.isEmpty()) {
return null; //no rows on the sorted side
}
- while (currentPartition < partitionIds.size()) {
+ while (currentPartition < partitions.size()) {
if (currentSource == null) {
- if (!this.partitionIds.isEmpty()) {
- this.partitionIds.get(currentPartition).close();
+ if (!this.partitions.isEmpty()) {
+ this.partitions.get(currentPartition).close();
}
- currentSource = partitionIds.get(currentPartition).createIndexedTupleSource();
+ currentSource = partitions.get(currentPartition).createIndexedTupleSource();
}
int beginIndex = currentPartition>0?endRows.get(currentPartition - 1)+1:1;
@@ -251,7 +255,7 @@
}
}
if (matchEnd == batch.length - 1 && currentPartition < overlap.size() && overlap.get(currentPartition)) {
- this.partitionIds.get(currentPartition + 1).addTuple(partitionedTuple);
+ this.partitions.get(currentPartition + 1).addTuple(partitionedTuple);
}
}
while (matchBegin <= matchEnd) {
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -104,6 +104,8 @@
private TupleBatch outputPhase() throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException {
if (!this.output.isFinal()) {
this.phase = SORT;
+ } else {
+ this.output.setForwardOnly(true);
}
List<?> tuple = null;
try {
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -82,7 +82,7 @@
}
//constructor state
- private TupleSource sourceID;
+ private TupleSource source;
private Mode mode;
private BufferManager bufferManager;
private String groupName;
@@ -105,11 +105,11 @@
public SortUtility(TupleSource sourceID, List sortElements, List<Boolean> sortTypes, Mode mode, BufferManager bufferMgr,
String groupName) {
- this.sourceID = sourceID;
+ this.source = sourceID;
this.mode = mode;
this.bufferManager = bufferMgr;
this.groupName = groupName;
- this.schema = this.sourceID.getSchema();
+ this.schema = this.source.getSchema();
int distinctIndex = sortElements != null? sortElements.size() - 1:0;
if (mode != Mode.SORT) {
if (sortElements == null) {
@@ -159,6 +159,7 @@
private TupleBuffer createTupleBuffer() throws MetaMatrixComponentException {
TupleBuffer tb = bufferManager.createTupleBuffer(this.schema, this.groupName, TupleSourceType.PROCESSOR);
+ tb.setForwardOnly(true);
return tb;
}
@@ -178,16 +179,23 @@
try {
int maxRows = bufferManager.getMaxProcessingBatches() * bufferManager.getProcessorBatchSize();
while(!doneReading) {
+ //attempt to reserve more working memory if there are additional rows available before blocking
if (workingTuples.size() == maxRows) {
- int reserved = bufferManager.reserveBuffers(1, false);
+ if (source.available() < 1) {
+ break;
+ }
+ int reserved = bufferManager.reserveBuffers(1, false);
if (reserved == 0) {
break;
}
totalReservedBuffers += 1;
- maxRows += bufferManager.getProcessorBatchSize();
+ maxRows += bufferManager.getProcessorBatchSize();
}
try {
- List<?> tuple = sourceID.nextTuple();
+ if (totalReservedBuffers > 0 && source.available() == 0) {
+ break;
+ }
+ List<?> tuple = source.nextTuple();
if (tuple == null) {
doneReading = true;
@@ -293,6 +301,7 @@
// Close sorted source (all others have been removed)
if (doneReading) {
activeTupleBuffers.get(0).close();
+ activeTupleBuffers.get(0).setForwardOnly(false);
if (this.output != null) {
this.output.close();
}
@@ -302,6 +311,7 @@
Assertion.assertTrue(mode == Mode.DUP_REMOVE);
if (this.output == null) {
this.output = activeTupleBuffers.get(0);
+ this.output.setForwardOnly(false);
}
this.phase = INITIAL_SORT;
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -75,7 +75,7 @@
@Override
public InputStream getInputStream() throws IOException {
//TODO: adjust the buffer size, and/or develop a shared buffer strategy
- return new BufferedInputStream(lobBuffer.createInputStream());
+ return new BufferedInputStream(lobBuffer.createInputStream(0));
}
@Override
Modified: trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -129,6 +129,11 @@
public void closeSource() throws MetaMatrixComponentException {
}
+
+ @Override
+ public int available() {
+ return 0;
+ }
}
private BufferManager buffer;
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -84,7 +84,7 @@
private ICommand translatedCommand;
private Class<?>[] schema;
private List<Integer> convertToRuntimeType;
- private List<Integer> convertToDesiredRuntimeType;
+ private boolean[] convertToDesiredRuntimeType;
/* End state information */
protected boolean lastBatch;
@@ -309,12 +309,12 @@
Command command = this.requestMsg.getCommand();
List<SingleElementSymbol> symbols = this.requestMsg.getCommand().getProjectedSymbols();
this.schema = new Class[symbols.size()];
- this.convertToDesiredRuntimeType = new ArrayList<Integer>(symbols.size());
+ this.convertToDesiredRuntimeType = new boolean[symbols.size()];
this.convertToRuntimeType = new ArrayList<Integer>(symbols.size());
for (int i = 0; i < schema.length; i++) {
SingleElementSymbol symbol = symbols.get(i);
this.schema[i] = symbol.getType();
- this.convertToDesiredRuntimeType.add(i);
+ this.convertToDesiredRuntimeType[i] = true;
this.convertToRuntimeType.add(i);
}
@@ -469,20 +469,23 @@
}
}
//TODO: add a proper intermediate schema
- for (int i = convertToDesiredRuntimeType.size() - 1; i >= 0; i--) {
- int index = convertToDesiredRuntimeType.get(i);
- Object value = row.get(index);
- if (value != null) {
- Object result;
- try {
- result = DataTypeManager.transformValue(value, value.getClass(), this.schema[index]);
- } catch (TransformationException e) {
- throw new ConnectorException(e);
+ for (int i = 0; i < row.size(); i++) {
+ if (convertToDesiredRuntimeType[i]) {
+ Object value = row.get(i);
+ if (value != null) {
+ Object result;
+ try {
+ result = DataTypeManager.transformValue(value, value.getClass(), this.schema[i]);
+ } catch (TransformationException e) {
+ throw new ConnectorException(e);
+ }
+ if (value == result) {
+ convertToDesiredRuntimeType[i] = false;
+ }
+ row.set(i, result);
}
- if (value == result) {
- convertToDesiredRuntimeType.remove(i);
- }
- row.set(index, result);
+ } else {
+ row.set(i, DataTypeManager.getCanonicalValue(row.get(i)));
}
}
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -225,4 +225,17 @@
return this.isTransactional;
}
+ @Override
+ public int available() {
+ if (index < currentBatchCount) {
+ return currentBatchCount - index;
+ }
+ synchronized (this) {
+ if (nextBatch != null) {
+ return nextBatch.length;
+ }
+ }
+ return 0;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -100,15 +100,15 @@
return;
}
List values = requestMsg.getParameterValues();
- List spParams = proc.getParameters();
+ List<SPParameter> spParams = proc.getParameters();
proc.clearParameters();
int inParameterCount = values.size();
if (this.requestMsg.isPreparedBatchUpdate() && values.size() > 0) {
inParameterCount = ((List)values.get(0)).size();
}
int index = 1;
- for (Iterator params = spParams.iterator(); params.hasNext();) {
- SPParameter param = (SPParameter) params.next();
+ for (Iterator<SPParameter> params = spParams.iterator(); params.hasNext();) {
+ SPParameter param = params.next();
if (param.getParameterType() == SPParameter.RETURN_VALUE) {
continue;
}
@@ -147,13 +147,13 @@
if (!this.addedLimit) { //TODO: this is a little problematic
prepPlan.setCommand(this.userCommand);
// Defect 13751: Clone the plan in its current state (i.e. before processing) so that it can be used for later queries
- prepPlan.setPlan((ProcessorPlan)processPlan.clone());
+ prepPlan.setPlan(processPlan.clone());
prepPlan.setAnalysisRecord(analysisRecord);
this.prepPlanCache.putPreparedPlan(id, this.context.isSessionFunctionEvaluated(), prepPlan);
}
} else {
LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Query exist in cache: ", sqlQuery }); //$NON-NLS-1$
- processPlan = (ProcessorPlan)cachedPlan.clone();
+ processPlan = cachedPlan.clone();
//already in cache. obtain the values from cache
analysisRecord = prepPlan.getAnalysisRecord();
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -22,6 +22,7 @@
package org.teiid.dqp.internal.process;
+import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -107,6 +108,7 @@
private AnalysisRecord analysisRecord;
private TransactionContext transactionContext;
protected TupleBuffer resultsBuffer;
+ private TupleBatch savedBatch;
private Collection schemas; // These are schemas associated with XML results
private boolean returnsUpdateCount;
@@ -346,6 +348,9 @@
}
});
resultsBuffer = collector.getTupleBuffer();
+ if (requestMsg.getCursorType() == ResultSet.TYPE_FORWARD_ONLY) {
+ resultsBuffer.setForwardOnly(true);
+ }
analysisRecord = request.analysisRecord;
schemas = request.schemas;
transactionContext = request.transactionContext;
@@ -383,11 +388,18 @@
LogManager.logDetail(LogConstants.CTX_DQP, "[RequestWorkItem.sendResultsIfNeeded] requestID: " + requestID + " resultsID: " + this.resultsBuffer + " done: " + doneProducingBatches ); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
- if (batch == null || batch.getBeginRow() > this.begin) {
- batch = resultsBuffer.getBatch(begin);
+ if (batch == null || !(batch.getBeginRow() <= this.begin && batch.getEndRow() >= this.begin)) {
+ if (savedBatch != null && savedBatch.getBeginRow() <= this.begin && savedBatch.getEndRow() >= this.begin) {
+ batch = savedBatch;
+ } else {
+ batch = resultsBuffer.getBatch(begin);
+ }
//TODO: support fetching more than 1 batch
int count = this.end - this.begin + 1;
if (batch.getRowCount() > count) {
+ if (requestMsg.getCursorType() == ResultSet.TYPE_FORWARD_ONLY) {
+ savedBatch = batch;
+ }
int beginRow = Math.min(this.begin, batch.getEndRow() - count + 1);
int endRow = Math.min(beginRow + count - 1, batch.getEndRow());
int firstOffset = beginRow - batch.getBeginRow();
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -120,4 +120,9 @@
this.exceptionOnClose = exceptionOnClose;
}
+ @Override
+ public int available() {
+ return 0;
+ }
+
}
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -492,8 +492,8 @@
private BufferManager createCustomBufferMgr(int batchSize) throws MetaMatrixComponentException {
BufferManagerImpl bufferMgr = new BufferManagerImpl();
Properties props = new Properties();
- props.setProperty(BufferManager.PROCESSOR_BATCH_SIZE, String.valueOf(batchSize));
- props.setProperty(BufferManager.CONNECTOR_BATCH_SIZE, String.valueOf(batchSize));
+ bufferMgr.setConnectorBatchSize(batchSize);
+ bufferMgr.setProcessorBatchSize(batchSize);
bufferMgr.initialize(props);
// Add unmanaged memory storage manager
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -37,24 +37,23 @@
public class NodeTestUtil {
static BufferManager getTestBufferManager(long bytesAvailable, int procBatchSize, int connectorBatchSize) {
-
+ BufferManagerImpl bufferManager = new BufferManagerImpl();
+ bufferManager.setProcessorBatchSize(procBatchSize);
+ bufferManager.setConnectorBatchSize(connectorBatchSize);
// Get the properties for BufferManager
Properties bmProps = new Properties();
- bmProps.setProperty(BufferManager.PROCESSOR_BATCH_SIZE, "" + procBatchSize); //$NON-NLS-1$
- bmProps.setProperty(BufferManager.CONNECTOR_BATCH_SIZE, "" + connectorBatchSize); //$NON-NLS-1$
- return createBufferManager(bmProps);
+ return createBufferManager(bufferManager, bmProps);
}
static BufferManager getTestBufferManager(long bytesAvailable, int procBatchSize) {
-
+ BufferManagerImpl bufferManager = new BufferManagerImpl();
+ bufferManager.setProcessorBatchSize(procBatchSize);
// Get the properties for BufferManager
Properties bmProps = new Properties();
- bmProps.setProperty(BufferManager.PROCESSOR_BATCH_SIZE, "" + procBatchSize); //$NON-NLS-1$
- return createBufferManager(bmProps);
+ return createBufferManager(bufferManager, bmProps);
}
- static BufferManager createBufferManager(Properties bmProps) {
- BufferManagerImpl bufferManager = new BufferManagerImpl();
+ static BufferManager createBufferManager(BufferManagerImpl bufferManager, Properties bmProps) {
try {
bufferManager.initialize(bmProps);
} catch (MetaMatrixComponentException e) {
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -242,6 +242,11 @@
}
return null;
}
+
+ @Override
+ public int available() {
+ return 0;
+ }
}
}
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -217,5 +217,9 @@
? null
: Arrays.asList(new Object[] {new Integer(currentRow), Integer.toString(currentRow)});
}
+ @Override
+ public int available() {
+ return 0;
+ }
}
}
Modified: trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
===================================================================
--- trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -34,7 +34,6 @@
import com.metamatrix.common.application.exception.ApplicationInitializationException;
import com.metamatrix.common.application.exception.ApplicationLifecycleException;
import com.metamatrix.common.buffer.BufferManager;
-import com.metamatrix.common.buffer.StorageManager;
import com.metamatrix.common.buffer.impl.BufferManagerImpl;
import com.metamatrix.common.buffer.impl.FileStorageManager;
import com.metamatrix.common.buffer.impl.MemoryStorageManager;
@@ -92,22 +91,21 @@
String connectorBatchSize = configurationSvc.getConnectorBatchSize();
// Set up buffer configuration properties
- Properties bufferProps = new Properties();
- bufferProps.setProperty(BufferManager.BUFFER_STORAGE_DIRECTORY, bufferDir.getCanonicalPath());
- bufferProps.setProperty(BufferManager.PROCESSOR_BATCH_SIZE, processorBatchSize);
- bufferProps.setProperty(BufferManager.CONNECTOR_BATCH_SIZE, connectorBatchSize);
+ Properties bufferProps = configurationSvc.getSystemProperties();
// Construct and initialize the buffer manager
this.bufferMgr = new BufferManagerImpl();
+ this.bufferMgr.setConnectorBatchSize(Integer.valueOf(connectorBatchSize));
+ this.bufferMgr.setProcessorBatchSize(Integer.valueOf(processorBatchSize));
+
this.bufferMgr.initialize(bufferProps);
// If necessary, add disk storage manager
if(useDisk) {
// Get the properties for FileStorageManager and create.
- Properties fsmProps = new Properties();
- fsmProps.setProperty(BufferManager.BUFFER_STORAGE_DIRECTORY, bufferDir.getCanonicalPath());
- StorageManager fsm = new FileStorageManager();
- fsm.initialize(fsmProps);
+ FileStorageManager fsm = new FileStorageManager();
+ fsm.setStorageDirectory(bufferDir.getCanonicalPath());
+ fsm.initialize(bufferProps);
this.bufferMgr.setStorageManager(fsm);
// start the file storage manager in clean state
Modified: trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-01-25 03:26:22 UTC (rev 1777)
@@ -200,7 +200,6 @@
dqp.begin();
fail("exception expected"); //$NON-NLS-1$
} catch (XATransactionException e) {
- e.printStackTrace();
assertEquals("Component not found: com.metamatrix.dqp.client.ClientSideDQP", e.getMessage()); //$NON-NLS-1$
}
}
14 years, 11 months
teiid SVN: r1776 - in branches/JCA: client/src/main/java/com/metamatrix/common/comm/platform/socket/client and 15 other directories.
by teiid-commits@lists.jboss.org
Author: rareddy
Date: 2010-01-22 17:49:05 -0500 (Fri, 22 Jan 2010)
New Revision: 1776
Added:
branches/JCA/client/src/main/java/com/metamatrix/dqp/client/DQPManagement.java
branches/JCA/client/src/main/java/org/teiid/transport/
branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java
Removed:
branches/JCA/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java
Modified:
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMConnection.java
branches/JCA/client-jdbc/src/main/java/org/teiid/jdbc/EmbeddedProfile.java
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/util/TestMMJDBCURL.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnection.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnectionFactory.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
branches/JCA/client/src/main/java/com/metamatrix/platform/security/api/SessionToken.java
branches/JCA/client/src/main/resources/com/metamatrix/common/comm/i18n.properties
branches/JCA/client/src/main/resources/com/metamatrix/common/comm/platform/i18n.properties
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPManagementView.java
branches/JCA/jboss-integration/src/main/java/org/teiid/adminapi/jboss/Admin.java
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
branches/JCA/runtime/src/main/java/org/teiid/TeiidConnectionFactory.java
branches/JCA/runtime/src/main/java/org/teiid/TeiidManagedConnection.java
branches/JCA/runtime/src/main/java/org/teiid/WrappedConnection.java
branches/JCA/runtime/src/main/java/org/teiid/transport/LogonImpl.java
branches/JCA/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketListener.java
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketTransport.java
branches/JCA/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
branches/JCA/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
Log:
TEIID-833: modified the socket layer and local layer to use same connection into runtime. Removed the thread nature of the socket worker item, such that netty threads directly submit work on to process workers
Modified: branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnection.java
===================================================================
--- branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnection.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnection.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -33,8 +33,6 @@
void close();
- void reallyClose();
-
boolean isOpen();
LogonResult getLogonResult();
Modified: branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnectionFactory.java
===================================================================
--- branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnectionFactory.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnectionFactory.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -42,5 +42,6 @@
<T> T getService(Class<T> clazz);
- <T> void setService(Class<T> type, T instance);
+ <T> void registerClientService(Class<T> type, T instance, String loggingContext);
+ <T> String getLoggingContextForService(Class<T> type);
}
Modified: branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java
===================================================================
--- branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -190,7 +190,7 @@
if (e.getCause() instanceof CommunicationException) {
throw (CommunicationException)e.getCause();
}
- throw new CommunicationException(e, CommPlatformPlugin.Util.getString("PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to_MetaMatrix")); //$NON-NLS-1$
+ throw new CommunicationException(e, CommPlatformPlugin.Util.getString("PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to")); //$NON-NLS-1$
}
}
@@ -316,10 +316,4 @@
public void setFailOver(boolean failOver) {
this.failOver = failOver;
}
-
- @Override
- public void reallyClose() {
- close();
- }
-
}
\ No newline at end of file
Modified: branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
===================================================================
--- branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -291,8 +291,11 @@
public <T> T getService(Class<T> clazz) {
return null;
}
-
@Override
- public <T> void setService(Class<T> type, T instance) {
+ public <T> void registerClientService(Class<T> type, T instance, String loggingContext){
}
+ @Override
+ public <T> String getLoggingContextForService(Class<T> type) {
+ return null;
+ }
}
Added: branches/JCA/client/src/main/java/com/metamatrix/dqp/client/DQPManagement.java
===================================================================
--- branches/JCA/client/src/main/java/com/metamatrix/dqp/client/DQPManagement.java (rev 0)
+++ branches/JCA/client/src/main/java/com/metamatrix/dqp/client/DQPManagement.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package com.metamatrix.dqp.client;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.teiid.adminapi.AdminException;
+import org.teiid.adminapi.impl.RequestMetadata;
+import org.teiid.adminapi.impl.SessionMetadata;
+import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
+
+
+public interface DQPManagement {
+ List<RequestMetadata> getRequestsForSession(long sessionId) ;
+ List<RequestMetadata> getRequests();
+ WorkerPoolStatisticsMetadata getWorkManagerStatistics(String identifier);
+ void terminateSession(long terminateeId);
+ boolean cancelRequest(long sessionId, long requestId) throws AdminException;
+ Collection<String> getCacheTypes();
+ void clearCache(String cacheType);
+ Collection<SessionMetadata> getActiveSessions() throws AdminException;
+ int getActiveSessionsCount() throws AdminException;
+ Collection<org.teiid.adminapi.Transaction> getTransactions();
+ void terminateTransaction(String xid) throws AdminException ;
+}
Modified: branches/JCA/client/src/main/java/com/metamatrix/platform/security/api/SessionToken.java
===================================================================
--- branches/JCA/client/src/main/java/com/metamatrix/platform/security/api/SessionToken.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/client/src/main/java/com/metamatrix/platform/security/api/SessionToken.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -33,10 +33,23 @@
* transit if sent to the client. Also it should only be sent to the client
* who creates the session.
*/
-public class SessionToken implements Serializable,
- Cloneable {
+public class SessionToken implements Serializable, Cloneable {
public final static long serialVersionUID = -2853708320435636107L;
+ private static ThreadLocal<SessionToken> CONTEXTS = new ThreadLocal<SessionToken>() {
+ protected SessionToken initialValue() {
+ return null;
+ }
+ };
+
+ public static SessionToken getSession() {
+ return CONTEXTS.get();
+ }
+
+ public static void setSession(SessionToken context) {
+ CONTEXTS.set(context);
+ }
+
/** The session ID */
private long sessionID;
private String userName;
Copied: branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java (from rev 1743, branches/JCA/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java)
===================================================================
--- branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java (rev 0)
+++ branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -0,0 +1,152 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.transport;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import com.metamatrix.admin.api.exception.security.InvalidSessionException;
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.api.exception.security.LogonException;
+import com.metamatrix.client.ExceptionUtil;
+import com.metamatrix.common.comm.CommonCommPlugin;
+import com.metamatrix.common.comm.api.ServerConnection;
+import com.metamatrix.common.comm.api.ServerConnectionFactory;
+import com.metamatrix.common.comm.exception.CommunicationException;
+import com.metamatrix.common.comm.exception.ConnectionException;
+import com.metamatrix.common.comm.platform.CommPlatformPlugin;
+import com.metamatrix.platform.security.api.ILogon;
+import com.metamatrix.platform.security.api.LogonResult;
+import com.metamatrix.platform.security.api.SessionToken;
+
+public class LocalServerConnection implements ServerConnection {
+ private static final String TEIID_RUNTIME = "java:teiid/runtime-engine";
+
+ private final LogonResult result;
+ private boolean shutdown;
+
+ public LocalServerConnection(Properties connectionProperties) throws CommunicationException, ConnectionException{
+ this.result = authenticate(connectionProperties);
+ }
+
+ public synchronized LogonResult authenticate(Properties connProps) throws ConnectionException, CommunicationException {
+ try {
+ connProps.setProperty("localConnection", "true");
+ LogonResult logonResult = this.getService(ILogon.class).logon(connProps);
+ return logonResult;
+ } catch (LogonException e) {
+ // Propagate the original message as it contains the message we want
+ // to give to the user
+ throw new ConnectionException(e, e.getMessage());
+ } catch (MetaMatrixComponentException e) {
+ if (e.getCause() instanceof CommunicationException) {
+ throw (CommunicationException)e.getCause();
+ }
+ throw new CommunicationException(e, CommPlatformPlugin.Util.getString("PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to_MetaMatrix")); //$NON-NLS-1$
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public <T> T getService(final Class<T> iface) {
+ return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {iface}, new InvocationHandler() {
+
+ public Object invoke(Object arg0, Method arg1, Object[] arg2) throws Throwable {
+ if (!isOpen()) {
+ throw ExceptionUtil.convertException(arg1, new MetaMatrixComponentException(CommonCommPlugin.Util.getString("LocalTransportHandler.Transport_shutdown"))); //$NON-NLS-1$
+ }
+ try {
+ ServerConnectionFactory scf = lookup(TEIID_RUNTIME);
+ T service = scf.getService(iface);
+
+ if (!(iface.equals(ILogon.class))) {
+ SessionToken.setSession(result.getSessionToken());
+ }
+
+ return arg1.invoke(service, arg2);
+ } catch(NamingException e){
+ throw ExceptionUtil.convertException(arg1, new MetaMatrixComponentException(CommonCommPlugin.Util.getString("LocalTransportHandler.Transport_shutdown"))); //$NON-NLS-1$
+ } finally {
+ SessionToken.setSession(null);
+ }
+ }
+ });
+
+ }
+
+ public boolean isOpen() {
+ return !shutdown;
+ }
+
+ public void close() {
+ shutdown(true);
+ }
+
+ private void shutdown(boolean logoff) {
+ if (shutdown) {
+ return;
+ }
+
+ if (logoff) {
+ try {
+ //make a best effort to send the logoff
+ Future<?> writeFuture = this.getService(ILogon.class).logoff();
+ if (writeFuture != null) {
+ writeFuture.get(5000, TimeUnit.MILLISECONDS);
+ }
+ } catch (InvalidSessionException e) {
+ //ignore
+ } catch (InterruptedException e) {
+ //ignore
+ } catch (ExecutionException e) {
+ //ignore
+ } catch (TimeoutException e) {
+ //ignore
+ }
+ }
+ this.shutdown = true;
+ }
+
+ public LogonResult getLogonResult() {
+ return result;
+ }
+
+ @Override
+ public boolean isSameInstance(ServerConnection conn) throws CommunicationException {
+ return (conn instanceof LocalServerConnection);
+ }
+
+ public static <T> T lookup(String jndiName) throws NamingException {
+ InitialContext ic = new InitialContext();
+ return (T)ic.lookup(jndiName);
+ }
+}
Property changes on: branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: branches/JCA/client/src/main/resources/com/metamatrix/common/comm/i18n.properties
===================================================================
--- branches/JCA/client/src/main/resources/com/metamatrix/common/comm/i18n.properties 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/client/src/main/resources/com/metamatrix/common/comm/i18n.properties 2010-01-22 22:49:05 UTC (rev 1776)
@@ -27,4 +27,6 @@
TrustedSessionToken.token_null = The trusted token for a session token may not be null.
StreamImpl.Unable_to_read_data_from_stream=Unable to read data from the stream: {0}
-RequestMessage.invalid_txnAutoWrap=''{0}'' is an invalid transaction autowrap mode.
\ No newline at end of file
+RequestMessage.invalid_txnAutoWrap=''{0}'' is an invalid transaction autowrap mode.
+
+LocalTransportHandler.Transport_shutdown=Tranport has been shutdown.
Modified: branches/JCA/client/src/main/resources/com/metamatrix/common/comm/platform/i18n.properties
===================================================================
--- branches/JCA/client/src/main/resources/com/metamatrix/common/comm/platform/i18n.properties 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/client/src/main/resources/com/metamatrix/common/comm/platform/i18n.properties 2010-01-22 22:49:05 UTC (rev 1776)
@@ -33,8 +33,8 @@
PlatformServerConnectionFactory.Missing_required_property=Missing required property:
PlatformServerConnectionFactory.Error_encrypting_user_password=Error encrypting user password
PlatformServerConnectionFactory.Error_communicating_with_app_server=Error communicating with app server
-PlatformServerConnectionFactory.Error_logging_on_to_MetaMatrix=Error logging on to MetaMatrix: {0}
-PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to_MetaMatrix=Unable to find a component used in logging on to MetaMatrix
+PlatformServerConnectionFactory.Error_logging_on_to_MetaMatrix=Error logging on to Teiid: {0}
+PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to=Unable to find a component used authenticate on to Teiid
PlatformServerConnectionFactory.Unable_to_get_a_PlatformServerConnection=Unable to get a PlatformServerConnection
PlatformServerConnectionFactory.Error_comunicating_with_LogonAPI=Error communicating with LogonAPI
PlatformServerConnectionFactory.JNDI_library_mismatch_plugin=Client library may not match server vendor or version for server {1}. Client code loaded from plugin: {2}, initial context factory: {0}.
Modified: branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMConnection.java
===================================================================
--- branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMConnection.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMConnection.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -57,8 +57,6 @@
import javax.transaction.xa.Xid;
-import org.teiid.adminapi.Admin;
-
import com.metamatrix.common.api.MMURL;
import com.metamatrix.common.comm.api.ServerConnection;
import com.metamatrix.common.comm.exception.CommunicationException;
@@ -279,7 +277,7 @@
} catch (SQLException se) {
firstException = se;
} finally {
- this.serverConn.reallyClose();
+ this.serverConn.close();
if ( firstException != null )
throw (SQLException)firstException;
}
Modified: branches/JCA/client-jdbc/src/main/java/org/teiid/jdbc/EmbeddedProfile.java
===================================================================
--- branches/JCA/client-jdbc/src/main/java/org/teiid/jdbc/EmbeddedProfile.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/client-jdbc/src/main/java/org/teiid/jdbc/EmbeddedProfile.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -32,14 +32,12 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
+import org.teiid.transport.LocalServerConnection;
-import com.metamatrix.common.comm.api.ServerConnection;
-import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ConnectionException;
import com.metamatrix.common.util.PropertiesUtils;
+import com.metamatrix.core.MetaMatrixRuntimeException;
import com.metamatrix.jdbc.BaseDataSource;
import com.metamatrix.jdbc.JDBCPlugin;
import com.metamatrix.jdbc.MMConnection;
@@ -47,7 +45,7 @@
final class EmbeddedProfile {
- private static final String TEIID_RUNTIME = "java:teiid/runtime-engine";
+
private static final String BUNDLE_NAME = "com.metamatrix.jdbc.basic_i18n"; //$NON-NLS-1$
/**
@@ -90,13 +88,8 @@
// and make sure we have all the properties we need.
validateProperties(info);
try {
- InitialContext ic = new InitialContext();
- ServerConnectionFactory scf = (ServerConnectionFactory)ic.lookup(TEIID_RUNTIME);
- ServerConnection conn = scf.getConnection(info);
- // this close has no effect; it only closes the managed connection of server connection
- conn.close();
- return new MMConnection(conn, info, url);
- } catch (NamingException e) {
+ return new MMConnection(new LocalServerConnection(info), info, url);
+ } catch (MetaMatrixRuntimeException e) {
throw new SQLException(e);
} catch (ConnectionException e) {
throw new SQLException(e);
Modified: branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/util/TestMMJDBCURL.java
===================================================================
--- branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/util/TestMMJDBCURL.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/util/TestMMJDBCURL.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -244,8 +244,8 @@
}
try {
+ // in embedded situation there is no connection url
new MMJDBCURL("myVDB", " ", null); //$NON-NLS-1$ //$NON-NLS-2$
- fail("Should have failed."); //$NON-NLS-1$
} catch (Exception e) {
}
Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPManagementView.java
===================================================================
--- branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPManagementView.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPManagementView.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -32,6 +32,7 @@
import org.jboss.managed.api.annotation.ManagementProperties;
import org.jboss.managed.api.annotation.ManagementProperty;
import org.jboss.managed.api.annotation.ViewUse;
+import org.teiid.adminapi.AdminComponentException;
import org.teiid.adminapi.AdminException;
import org.teiid.adminapi.impl.RequestMetadata;
import org.teiid.adminapi.impl.SessionMetadata;
@@ -41,6 +42,7 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.security.SessionServiceException;
+import com.metamatrix.dqp.client.DQPManagement;
/**
* Since DQPCore can not be made into a management bean itself as it has life cycle dependencies associated with
@@ -49,7 +51,7 @@
* what ever objects it has access to.
*/
@ManagementObject(isRuntime=true, componentType=@ManagementComponent(type="teiid",subtype="dqp"), properties=ManagementProperties.EXPLICIT)
-public class DQPManagementView {
+public class DQPManagementView implements DQPManagement {
private DQPCore dqp;
private ConnectorManagerRepository connectorManagerRepository;
@@ -66,16 +68,19 @@
this.connectorManagerRepository = repo;
}
+ @Override
@ManagementOperation(description="Requests for perticular session", impact=Impact.ReadOnly,params={@ManagementParameter(name="sessionId",description="The session Identifier")})
public List<RequestMetadata> getRequestsForSession(long sessionId) {
return this.dqp.getRequestsForSession(sessionId);
}
+ @Override
@ManagementOperation(description="Active requests", impact=Impact.ReadOnly)
public List<RequestMetadata> getRequests() {
return this.dqp.getRequests();
}
+ @Override
@ManagementOperation(description="Get Runtime workmanager statistics", impact=Impact.ReadOnly,params={@ManagementParameter(name="identifier",description="Use \"runtime\" for engine, or connector name for connector")})
public WorkerPoolStatisticsMetadata getWorkManagerStatistics(String identifier) {
if ("runtime".equalsIgnoreCase(identifier)) {
@@ -88,41 +93,61 @@
return null;
}
+ @Override
@ManagementOperation(description="Terminate a Session",params={@ManagementParameter(name="terminateeId",description="The session to be terminated")})
public void terminateSession(long terminateeId) {
this.dqp.terminateSession(terminateeId);
}
+ @Override
@ManagementOperation(description="Cancel a Request",params={@ManagementParameter(name="sessionId",description="The session Identifier"), @ManagementParameter(name="requestId",description="The request Identifier")})
- public boolean cancelRequest(long sessionId, long requestId) throws MetaMatrixComponentException {
- return this.dqp.cancelRequest(sessionId, requestId);
+ public boolean cancelRequest(long sessionId, long requestId) throws AdminException {
+ try {
+ return this.dqp.cancelRequest(sessionId, requestId);
+ } catch (MetaMatrixComponentException e) {
+ throw new AdminComponentException(e);
+ }
}
+ @Override
@ManagementOperation(description="Get Cache types in the system", impact=Impact.ReadOnly)
public Collection<String> getCacheTypes(){
return this.dqp.getCacheTypes();
}
+ @Override
@ManagementOperation(description="Clear the caches in the system", impact=Impact.ReadOnly)
public void clearCache(String cacheType) {
this.dqp.clearCache(cacheType);
}
+ @Override
@ManagementOperation(description="Active sessions", impact=Impact.ReadOnly)
- public Collection<SessionMetadata> getActiveSessions() throws SessionServiceException {
- return this.dqp.getActiveSessions();
+ public Collection<SessionMetadata> getActiveSessions() throws AdminException {
+ try {
+ return this.dqp.getActiveSessions();
+ } catch (SessionServiceException e) {
+ throw new AdminComponentException(e);
+ }
}
+ @Override
@ManagementProperty(description="Active session count", use={ViewUse.STATISTIC}, readOnly=true)
- public int getActiveSessionsCount() throws SessionServiceException{
- return this.dqp.getActiveSessionsCount();
+ public int getActiveSessionsCount() throws AdminException{
+ try {
+ return this.dqp.getActiveSessionsCount();
+ } catch (SessionServiceException e) {
+ throw new AdminComponentException(e);
+ }
}
+ @Override
@ManagementOperation(description="Active Transactions", impact=Impact.ReadOnly)
public Collection<org.teiid.adminapi.Transaction> getTransactions() {
return this.dqp.getTransactions();
}
+ @Override
@ManagementOperation(description="Clear the caches in the system", impact=Impact.ReadOnly)
public void terminateTransaction(String xid) throws AdminException {
this.dqp.terminateTransaction(xid);
Modified: branches/JCA/jboss-integration/src/main/java/org/teiid/adminapi/jboss/Admin.java
===================================================================
--- branches/JCA/jboss-integration/src/main/java/org/teiid/adminapi/jboss/Admin.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/jboss-integration/src/main/java/org/teiid/adminapi/jboss/Admin.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -120,6 +120,23 @@
return this.deploymentMgr;
}
+// private DQPManagement getDQPManagement() throws Exception {
+// final ManagedComponent mc = getView().getComponent(DQPManagementView.class.getName(), DQPTYPE);
+//
+// return (DQPManagement)Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {DQPManagement.class}, new InvocationHandler() {
+// @Override
+// public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+//
+// MetaValue value = ManagedUtil.executeOperation(mc, method.getName());
+// Class returnType = method.getReturnType();
+// if (returnType.equals(Void.class)) {
+// return value;
+// }
+// return null;
+// }
+// });
+// }
+
@Override
public Collection<ConnectorBinding> getConnectorBindings() throws AdminException {
ArrayList<ConnectorBinding> bindings = new ArrayList<ConnectorBinding>();
Modified: branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
--- branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -46,8 +46,6 @@
import org.teiid.transport.SocketConfiguration;
import org.teiid.transport.SocketTransport;
-import com.metamatrix.common.comm.ClientServiceRegistry;
-import com.metamatrix.common.comm.ClientServiceRegistryImpl;
import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.core.log.MessageLevel;
@@ -60,11 +58,11 @@
import com.metamatrix.platform.security.api.service.SessionService;
public class RuntimeEngineDeployer extends AbstractSimpleRealDeployer<ManagedConnectionFactoryDeploymentGroup> {
+ private static final String TEIID_RUNTIME = "java:teiid/runtime-engine";
protected Logger log = Logger.getLogger(getClass());
private ContainerHelper containerHelper;
private SocketTransport socketTransport;
private SocketConfiguration socketConfiguration;
- private ClientServiceRegistry clientServiceRegistry;
public RuntimeEngineDeployer() {
super(ManagedConnectionFactoryDeploymentGroup.class);
@@ -79,8 +77,7 @@
String connectorDefinition = data.getConnectionDefinition();
if (connectorDefinition.equals("com.metamatrix.common.comm.api.ServerConnectionFactory")) {
- ServerConnectionFactory scf = (ServerConnectionFactory)ContainerUtil.lookup("java:teiid/runtime-engine");
- startEngine(scf);
+ startEngine();
log.info("Teiid Engine Started = " + new Date(System.currentTimeMillis()).toString()); //$NON-NLS-1$
}
@@ -110,14 +107,14 @@
this.socketConfiguration = socketConfig;
}
- private void startEngine(ServerConnectionFactory scf) {
- DQPConfiguration config = scf.getService(DQPConfiguration.class);
- ClientServiceRegistry services = createClientServices(config, scf.getService(WorkManager.class), scf.getService(XATerminator.class));
- scf.setService(ClientServiceRegistry.class, services);
-
- this.clientServiceRegistry = services;
-
+ private void startEngine() {
+ ServerConnectionFactory scf = ContainerUtil.lookup(TEIID_RUNTIME);
+
+ // create the necessary services
+ createClientServices(scf);
+
// Start the socket transport
+ DQPConfiguration config = scf.getService(DQPConfiguration.class);
if (config.getBindAddress() != null) {
this.socketConfiguration.setBindAddress(config.getBindAddress());
}
@@ -126,17 +123,17 @@
}
this.socketTransport = new SocketTransport(this.socketConfiguration);
- this.socketTransport.setClientServiceRegistry(services);
this.socketTransport.setWorkManager(scf.getService(WorkManager.class));
this.socketTransport.start();
}
private void stopEngine() {
+ ServerConnectionFactory scf = ContainerUtil.lookup(TEIID_RUNTIME);
// Stop DQP
- DQPCore dqp = (DQPCore)this.clientServiceRegistry.getClientService(ClientSideDQP.class);
- dqp.stop();
+ ClientSideDQP dqp = scf.getService(ClientSideDQP.class);
+ ((DQPCore)dqp).stop();
// Stop socket transport
if (this.socketTransport != null) {
@@ -145,26 +142,21 @@
}
}
- private ClientServiceRegistry createClientServices(DQPConfiguration config, WorkManager workMgr, XATerminator terminator) {
-
+ private void createClientServices(ServerConnectionFactory scf) {
DQPCore dqp = new DQPCore();
- dqp.setTransactionService(getTransactionService("localhost", terminator));
- dqp.setWorkManager(workMgr);
+ dqp.setTransactionService(getTransactionService("localhost", scf.getService(XATerminator.class)));
+ dqp.setWorkManager(scf.getService(WorkManager.class));
dqp.setAuthorizationService(this.containerHelper.getService(AuthorizationService.class));
dqp.setBufferService(this.containerHelper.getService(BufferService.class));
dqp.setSessionService(this.containerHelper.getService(SessionService.class));
dqp.setConnectorManagerRepository(this.containerHelper.getService(ConnectorManagerRepository.class));
- dqp.start(config);
+ dqp.start(scf.getService(DQPConfiguration.class));
DQPManagementView holder = this.containerHelper.getService(DQPManagementView.class);
holder.setDQP(dqp);
- ClientServiceRegistry services = new ClientServiceRegistryImpl();
- services.registerClientService(ILogon.class, new LogonImpl(dqp.getSessionService(), "teiid-cluster"), com.metamatrix.common.util.LogConstants.CTX_SERVER);
-
- services.registerClientService(ClientSideDQP.class, dqp, LogConstants.CTX_QUERY_SERVICE);
-
- return services;
+ scf.registerClientService(ILogon.class, new LogonImpl(dqp.getSessionService(), "teiid-cluster"), com.metamatrix.common.util.LogConstants.CTX_SERVER);
+ scf.registerClientService(ClientSideDQP.class, dqp, LogConstants.CTX_QUERY_SERVICE);
}
private TransactionService getTransactionService(String processName, XATerminator terminator) {
Modified: branches/JCA/runtime/src/main/java/org/teiid/TeiidConnectionFactory.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/TeiidConnectionFactory.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/TeiidConnectionFactory.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -22,7 +22,12 @@
package org.teiid;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionManager;
@@ -30,8 +35,9 @@
import javax.resource.spi.work.WorkManager;
import org.teiid.dqp.internal.process.DQPConfiguration;
+import org.teiid.dqp.internal.process.DQPWorkContext;
-import com.metamatrix.common.comm.ClientServiceRegistry;
+import com.metamatrix.client.ExceptionUtil;
import com.metamatrix.common.comm.api.ServerConnection;
import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
@@ -39,6 +45,8 @@
import com.metamatrix.common.log.LogManager;
import com.metamatrix.jdbc.LogConfigurationProvider;
import com.metamatrix.jdbc.LogListernerProvider;
+import com.metamatrix.platform.security.api.ILogon;
+import com.metamatrix.platform.security.api.SessionToken;
/**
@@ -51,7 +59,8 @@
private TeiidResourceAdapter ra;
private TeiidManagedConnectionFactory mcf;
private ConnectionManager cxManager;
- private ClientServiceRegistry clientServices = null;
+ private ConcurrentHashMap<Class, Object> clientServices = new ConcurrentHashMap<Class, Object>();
+ private ConcurrentHashMap<Class, String> loggingContext = new ConcurrentHashMap<Class, String>();
public TeiidConnectionFactory(TeiidResourceAdapter ra, TeiidManagedConnectionFactory mcf, ConnectionManager cxmanager) {
@@ -66,7 +75,8 @@
@Override
public ServerConnection getConnection(Properties connectionProperties) throws CommunicationException, ConnectionException {
try {
- return (ServerConnection)cxManager.allocateConnection(this.mcf, new ConnectionInfo(connectionProperties, this.clientServices));
+ // this code will not be invoked as teiid does not use managed connection.
+ return (ServerConnection)cxManager.allocateConnection(this.mcf, new ConnectionInfo(connectionProperties, null));
} catch (ResourceException e) {
throw new ConnectionException(e);
}
@@ -85,15 +95,51 @@
return type.cast(mcf);
}
- throw new IllegalArgumentException(type + " Sevice is not available");
+ // see if there are any client services.
+ Object service = this.clientServices.get(type);
+ if (service != null) {
+ return type.cast(proxyService(type, type.cast(service)));
+ }
+
+ return null;
}
- public <T> void setService(Class<T> type, T instance) {
- if (type.equals(ClientServiceRegistry.class)) {
- this.clientServices = (ClientServiceRegistry)instance;
- }
+ @Override
+ public <T> void registerClientService(Class<T> type, T instance, String loggingContext) {
+ this.clientServices.put(type, instance);
+ this.loggingContext.put(type, loggingContext);
}
+ @Override
+ public <T> String getLoggingContextForService(Class<T> type) {
+ return this.loggingContext.get(type);
+ }
+
+ private <T> T proxyService(final Class<T> iface, final T instance) {
+
+ return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {iface}, new InvocationHandler() {
+
+ public Object invoke(Object arg0, Method arg1, Object[] arg2) throws Throwable {
+
+ Throwable exception = null;
+ ClassLoader current = Thread.currentThread().getContextClassLoader();
+ try {
+ if (!(iface.equals(ILogon.class))) {
+ ((ILogon)clientServices.get(ILogon.class)).assertIdentity(SessionToken.getSession());
+ }
+ return arg1.invoke(instance, arg2);
+ } catch (InvocationTargetException e) {
+ exception = e.getTargetException();
+ } catch(Throwable t){
+ exception = t;
+ } finally {
+ Thread.currentThread().setContextClassLoader(current);
+ DQPWorkContext.releaseWorkContext();
+ }
+ throw ExceptionUtil.convertException(arg1, exception);
+ }
+ });
+ }
// public MMProcess getProcess() {
//
Modified: branches/JCA/runtime/src/main/java/org/teiid/TeiidManagedConnection.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/TeiidManagedConnection.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/TeiidManagedConnection.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -76,18 +76,10 @@
@Override
public Object getConnection(Subject arg0, ConnectionRequestInfo arg1) throws ResourceException {
ConnectionInfo ci = (ConnectionInfo)arg1;
- try {
- this.conn = new WrappedConnection(new LocalServerConnection(ci.properties, ci.clientServices));
- this.conn.setManagedConnection(this);
- return this.conn;
- } catch (CommunicationException e) {
- throw new ResourceException(e);
- } catch (ConnectionException e) {
- if (e.getCause() instanceof ResourceException) {
- throw (ResourceException)e.getCause();
- }
- throw new ResourceException(e);
- }
+ // This code will not be invoked as managed connection is not used anywhere.
+ this.conn = new WrappedConnection(null);
+ this.conn.setManagedConnection(this);
+ return this.conn;
}
@Override
Modified: branches/JCA/runtime/src/main/java/org/teiid/WrappedConnection.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/WrappedConnection.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/WrappedConnection.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -69,13 +69,4 @@
void setManagedConnection(TeiidManagedConnection teiidManagedConnection) {
this.mc = teiidManagedConnection;
}
-
- @Override
- public void reallyClose() {
- close();
- if (this.delegate != null) {
- delegate.close();
- delegate = null;
- }
- }
}
Deleted: branches/JCA/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -1,161 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.transport;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.teiid.dqp.internal.process.DQPWorkContext;
-
-import com.metamatrix.admin.api.exception.security.InvalidSessionException;
-import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.api.exception.security.LogonException;
-import com.metamatrix.client.ExceptionUtil;
-import com.metamatrix.common.comm.ClientServiceRegistry;
-import com.metamatrix.common.comm.api.ServerConnection;
-import com.metamatrix.common.comm.exception.CommunicationException;
-import com.metamatrix.common.comm.exception.ConnectionException;
-import com.metamatrix.common.comm.platform.CommPlatformPlugin;
-import com.metamatrix.jdbc.JDBCPlugin;
-import com.metamatrix.platform.security.api.ILogon;
-import com.metamatrix.platform.security.api.LogonResult;
-
-public class LocalServerConnection implements ServerConnection {
-
- private final LogonResult result;
- private boolean shutdown;
- private DQPWorkContext workContext;
- private ClientServiceRegistry clientServices;
- private ILogon logon;
-
- public LocalServerConnection(Properties connectionProperties, ClientServiceRegistry clientServices) throws CommunicationException, ConnectionException{
- this.clientServices = clientServices;
- this.workContext = new DQPWorkContext();
- DQPWorkContext.setWorkContext(this.workContext);
- this.logon = this.getService(ILogon.class);
- this.result = authenticate(connectionProperties);
- }
-
- public synchronized LogonResult authenticate(Properties connProps) throws ConnectionException, CommunicationException {
- try {
- connProps.setProperty("localConnection", "true");
- LogonResult logonResult = this.logon.logon(connProps);
- return logonResult;
- } catch (LogonException e) {
- // Propagate the original message as it contains the message we want
- // to give to the user
- throw new ConnectionException(e, e.getMessage());
- } catch (MetaMatrixComponentException e) {
- if (e.getCause() instanceof CommunicationException) {
- throw (CommunicationException)e.getCause();
- }
- throw new CommunicationException(e, CommPlatformPlugin.Util.getString("PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to_MetaMatrix")); //$NON-NLS-1$
- }
- }
-
-
- @SuppressWarnings("unchecked")
- public <T> T getService(final Class<T> iface) {
-
- return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {iface}, new InvocationHandler() {
-
- public Object invoke(Object arg0, Method arg1, Object[] arg2) throws Throwable {
- if (!isOpen()) {
- throw ExceptionUtil.convertException(arg1, new MetaMatrixComponentException(JDBCPlugin.Util.getString("LocalTransportHandler.session_inactive"))); //$NON-NLS-1$
- }
- Throwable exception = null;
- ClassLoader current = Thread.currentThread().getContextClassLoader();
- try {
- DQPWorkContext.setWorkContext(workContext);
- if (!(iface.equals(ILogon.class))) {
- logon.assertIdentity(result.getSessionToken());
- }
- return arg1.invoke(clientServices.getClientService(iface), arg2);
- } catch (InvocationTargetException e) {
- exception = e.getTargetException();
- } catch(Throwable t){
- exception = t;
- } finally {
- Thread.currentThread().setContextClassLoader(current);
- DQPWorkContext.releaseWorkContext();
- }
- throw ExceptionUtil.convertException(arg1, exception);
- }
- });
- }
-
- public boolean isOpen() {
- return !shutdown;
- }
-
- public void close() {
- // no-op managed connection close
- }
-
- @Override
- public void reallyClose() {
- shutdown(true);
- }
-
- private void shutdown(boolean logoff) {
- if (shutdown) {
- return;
- }
-
- if (logoff) {
- try {
- //make a best effort to send the logoff
- Future<?> writeFuture = this.logon.logoff();
- if (writeFuture != null) {
- writeFuture.get(5000, TimeUnit.MILLISECONDS);
- }
- } catch (InvalidSessionException e) {
- //ignore
- } catch (InterruptedException e) {
- //ignore
- } catch (ExecutionException e) {
- //ignore
- } catch (TimeoutException e) {
- //ignore
- }
- }
- this.workContext = null;
- this.shutdown = true;
- }
-
- public LogonResult getLogonResult() {
- return result;
- }
-
- @Override
- public boolean isSameInstance(ServerConnection conn) throws CommunicationException {
- return (conn instanceof LocalServerConnection);
- }
-}
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/LogonImpl.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/LogonImpl.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/LogonImpl.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -89,6 +89,9 @@
long sessionID = s.getSessionId();
DQPWorkContext workContext = DQPWorkContext.getWorkContext();
+ if (workContext == null) {
+ workContext = new DQPWorkContext();
+ }
workContext.setSessionToken(s.getAttachment(SessionToken.class));
workContext.setAppName(s.getApplicationName());
@@ -105,6 +108,7 @@
workContext.setVdbVersion(vdb.getVersion());
workContext.setVdb(vdb);
}
+ DQPWorkContext.setWorkContext(workContext);
return sessionID;
}
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -38,8 +38,8 @@
import com.metamatrix.api.exception.ComponentNotFoundException;
import com.metamatrix.api.exception.ExceptionHolder;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
-import com.metamatrix.common.comm.ClientServiceRegistry;
import com.metamatrix.common.comm.api.Message;
+import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.platform.socket.client.ServiceInvocationStruct;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.util.LogConstants;
@@ -49,30 +49,29 @@
import com.metamatrix.dqp.client.ResultsFuture;
import com.metamatrix.dqp.embedded.DQPEmbeddedPlugin;
import com.metamatrix.platform.security.api.ILogon;
+import com.metamatrix.platform.security.api.SessionToken;
-public class ServerWorkItem implements Runnable {
+public class ServerWorkItem {
+
private final ClientInstance socketClientInstance;
private final Serializable messageKey;
private final Message message;
- private final ClientServiceRegistry server;
+ private ServerConnectionFactory scf;
- public ServerWorkItem(ClientInstance socketClientInstance,
- Serializable messageKey, Message message,
- ClientServiceRegistry server) {
+ public ServerWorkItem(ClientInstance socketClientInstance, Serializable messageKey, Message message, ServerConnectionFactory server) {
this.socketClientInstance = socketClientInstance;
this.messageKey = messageKey;
this.message = message;
- this.server = server;
+ this.scf = server;
}
/**
* main entry point for remote method calls. encryption/decryption is
* handled here so that it won't be done by the io thread
*/
- public void run() {
- DQPWorkContext.setWorkContext(this.socketClientInstance.getWorkContext());
+ public void process() {
Message result = null;
- String service = null;
+ DQPWorkContext.setWorkContext(this.socketClientInstance.getWorkContext());
final boolean encrypt = message.getContents() instanceof SealedObject;
try {
message.setContents(this.socketClientInstance.getCryptor().unsealObject(message.getContents()));
@@ -81,16 +80,14 @@
throw new AssertionError("unknown message contents"); //$NON-NLS-1$
}
final ServiceInvocationStruct serviceStruct = (ServiceInvocationStruct)message.getContents();
- Object instance = server.getClientService(serviceStruct.targetClass);
+ final Class type = Class.forName(serviceStruct.targetClass);
+ Object instance = this.scf.getService(type);
if (instance == null) {
throw new ComponentNotFoundException(DQPEmbeddedPlugin.Util.getString("ServerWorkItem.Component_Not_Found", serviceStruct.targetClass)); //$NON-NLS-1$
}
if (!(instance instanceof ILogon)) {
- DQPWorkContext workContext = this.socketClientInstance.getWorkContext();
- ILogon logonModule = server.getClientService(ILogon.class);
- logonModule.assertIdentity(workContext.getSessionToken());
+ SessionToken.setSession(this.socketClientInstance.getWorkContext().getSessionToken());
}
- service = serviceStruct.targetClass;
ReflectionHelper helper = new ReflectionHelper(instance.getClass());
Method m = helper.findBestMethodOnTarget(serviceStruct.methodName, serviceStruct.args);
Object methodResult;
@@ -109,9 +106,9 @@
try {
asynchResult.setContents(completedFuture.get());
} catch (InterruptedException e) {
- asynchResult.setContents(processException(e, serviceStruct.targetClass));
+ asynchResult.setContents(processException(e, scf.getLoggingContextForService(type)));
} catch (ExecutionException e) {
- asynchResult.setContents(processException(e.getCause(), serviceStruct.targetClass));
+ asynchResult.setContents(processException(e.getCause(), scf.getLoggingContextForService(type)));
}
sendResult(asynchResult, encrypt);
}
@@ -124,11 +121,12 @@
}
} catch (Throwable t) {
Message holder = new Message();
- holder.setContents(processException(t, service));
+ holder.setContents(processException(t, null));
result = holder;
} finally {
DQPWorkContext.releaseWorkContext();
}
+
if (result != null) {
sendResult(result, encrypt);
}
@@ -145,11 +143,7 @@
socketClientInstance.send(result, messageKey);
}
- private Serializable processException(Throwable e, String service) {
- String context = null;
- if (service != null) {
- context = this.server.getLoggingContextForService(service);
- }
+ private Serializable processException(Throwable e, String context) {
if (context == null) {
context = LogConstants.CTX_SERVER;
}
@@ -176,5 +170,4 @@
LogManager.logDetail(context, e, "Processing exception for session", this.socketClientInstance.getWorkContext().getConnectionID()); //$NON-NLS-1$
LogManager.logWarning(context, DQPEmbeddedPlugin.Util.getString("ServerWorkItem.processing_error", e.getMessage(), this.socketClientInstance.getWorkContext().getConnectionID(), e.getClass().getName(), elem)); //$NON-NLS-1$
}
-
}
\ No newline at end of file
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -28,14 +28,13 @@
import org.teiid.dqp.internal.process.DQPWorkContext;
-import com.metamatrix.common.comm.ClientServiceRegistry;
import com.metamatrix.common.comm.api.Message;
+import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.platform.CommPlatformPlugin;
import com.metamatrix.common.comm.platform.socket.Handshake;
import com.metamatrix.common.comm.platform.socket.ObjectChannel;
import com.metamatrix.common.log.LogManager;
-import com.metamatrix.common.queue.WorkerPool;
import com.metamatrix.common.util.LogConstants;
import com.metamatrix.common.util.crypto.CryptoException;
import com.metamatrix.common.util.crypto.Cryptor;
@@ -54,16 +53,14 @@
public class SocketClientInstance implements ChannelListener, ClientInstance {
private final ObjectChannel objectSocket;
- private final WorkerPool workerPool;
- private final ClientServiceRegistry server;
private Cryptor cryptor;
+ private ServerConnectionFactory server;
private boolean usingEncryption;
private DhKeyGenerator keyGen;
private DQPWorkContext workContext = new DQPWorkContext();
- public SocketClientInstance(ObjectChannel objectSocket, WorkerPool workerPool, ClientServiceRegistry server, boolean isClientEncryptionEnabled) {
+ public SocketClientInstance(ObjectChannel objectSocket, ServerConnectionFactory server, boolean isClientEncryptionEnabled) {
this.objectSocket = objectSocket;
- this.workerPool = workerPool;
this.server = server;
this.usingEncryption = isClientEncryptionEnabled;
SocketAddress address = this.objectSocket.getRemoteAddress();
@@ -142,7 +139,8 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_SERVER, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_SERVER, "processing message:" + packet); //$NON-NLS-1$
}
- workerPool.execute(new ServerWorkItem(this, packet.getMessageKey(), packet, this.server));
+ ServerWorkItem work = new ServerWorkItem(this, packet.getMessageKey(), packet, this.server);
+ work.process();
}
public void shutdown() throws CommunicationException {
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/SocketListener.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/SocketListener.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/SocketListener.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -25,6 +25,8 @@
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
import javax.net.ssl.SSLEngine;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
@@ -38,7 +40,7 @@
import org.teiid.transport.ChannelListener.ChannelListenerFactory;
import com.metamatrix.common.CommonPlugin;
-import com.metamatrix.common.comm.ClientServiceRegistry;
+import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.platform.socket.ObjectChannel;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.queue.WorkerPool;
@@ -51,8 +53,8 @@
* Server-side class to listen for new connection requests and create a SocketClientConnection for each connection request.
*/
public class SocketListener implements ChannelListenerFactory {
- private ClientServiceRegistry server;
- private SSLAwareChannelHandler channelHandler;
+ private static final String TEIID_RUNTIME = "java:teiid/runtime-engine";
+ private SSLAwareChannelHandler channelHandler;
private Channel serverChanel;
private boolean isClientEncryptionEnabled;
private WorkerPool workerPool;
@@ -68,14 +70,13 @@
* @param workerPool
* @param engine null if SSL is disabled
*/
- public SocketListener(int port, String bindAddress, ClientServiceRegistry server, int inputBufferSize,
+ public SocketListener(int port, String bindAddress, int inputBufferSize,
int outputBufferSize, int maxWorkers, SSLEngine engine, boolean isClientEncryptionEnabled, WorkManager workManager) {
this.isClientEncryptionEnabled = isClientEncryptionEnabled;
if (port < 0 || port > 0xFFFF) {
throw new IllegalArgumentException("port out of range:" + port); //$NON-NLS-1$
}
- this.server = server;
this.executor = new WorkManagerExecutor(workManager);
this.workerPool = WorkerPoolFactory.newWorkerPool("SocketWorker", maxWorkers, this.executor); //$NON-NLS-1$
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_SERVER, MessageLevel.DETAIL)) {
@@ -125,7 +126,7 @@
}
public ChannelListener createChannelListener(ObjectChannel channel) {
- return new SocketClientInstance(channel, this.workerPool, this.server, this.isClientEncryptionEnabled);
+ return new SocketClientInstance(channel, getServer(), this.isClientEncryptionEnabled);
}
static class WorkManagerExecutor implements Executor{
@@ -153,4 +154,13 @@
}
}
}
+
+ protected ServerConnectionFactory getServer() {
+ try {
+ InitialContext ic = new InitialContext();
+ return (ServerConnectionFactory)ic.lookup(TEIID_RUNTIME);
+ } catch (NamingException e) {
+ return null;
+ }
+ }
}
\ No newline at end of file
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/SocketTransport.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/SocketTransport.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/SocketTransport.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -29,7 +29,6 @@
import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
-import com.metamatrix.common.comm.ClientServiceRegistry;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.util.LogConstants;
import com.metamatrix.core.MetaMatrixRuntimeException;
@@ -42,7 +41,6 @@
public class SocketTransport {
private SocketListener listener;
- private ClientServiceRegistry clientServices;
private WorkManager workManager;
private SocketConfiguration config;
@@ -56,7 +54,7 @@
try {
if (this.config.isEnabled()) {
LogManager.logDetail(LogConstants.CTX_SERVER, DQPEmbeddedPlugin.Util.getString("SocketTransport.1", new Object[] {bindAddress, String.valueOf(this.config.getPortNumber())})); //$NON-NLS-1$
- this.listener = new SocketListener(this.config.getPortNumber(), bindAddress, this.clientServices, this.config.getInputBufferSize(), this.config.getOutputBufferSize(), this.config.getMaxSocketThreads(), this.config.getSSLConfiguration().getServerSSLEngine(), this.config.getSSLConfiguration().isClientEncryptionEnabled(), this.workManager);
+ this.listener = new SocketListener(this.config.getPortNumber(), bindAddress, this.config.getInputBufferSize(), this.config.getOutputBufferSize(), this.config.getMaxSocketThreads(), this.config.getSSLConfiguration().getServerSSLEngine(), this.config.getSSLConfiguration().isClientEncryptionEnabled(), this.workManager);
}
else {
LogManager.logDetail(LogConstants.CTX_SERVER, DQPEmbeddedPlugin.Util.getString("SocketTransport.3")); //$NON-NLS-1$
@@ -87,12 +85,7 @@
return this.listener.getStats();
}
- public void setClientServiceRegistry(ClientServiceRegistry services) {
- this.clientServices = services;
- }
-
public void setWorkManager(WorkManager mgr) {
this.workManager = mgr;
}
-
}
Modified: branches/JCA/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
===================================================================
--- branches/JCA/runtime/src/test/java/org/teiid/transport/TestCommSockets.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/test/java/org/teiid/transport/TestCommSockets.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -35,6 +35,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.teiid.dqp.internal.datamgr.impl.FakeWorkManager;
import com.metamatrix.api.exception.ComponentNotFoundException;
@@ -42,6 +43,7 @@
import com.metamatrix.common.api.MMURL;
import com.metamatrix.common.comm.ClientServiceRegistry;
import com.metamatrix.common.comm.ClientServiceRegistryImpl;
+import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ConnectionException;
import com.metamatrix.common.comm.platform.socket.SocketUtil;
@@ -73,8 +75,7 @@
ClientServiceRegistry csr = new ClientServiceRegistryImpl();
SessionService sessionService = mock(SessionService.class);
csr.registerClientService(ILogon.class, new LogonImpl(sessionService, "fakeCluster"), "foo"); //$NON-NLS-1$ //$NON-NLS-2$
- listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
- csr, 1024, 1024, 1, null, true, new FakeWorkManager());
+ listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),1024, 1024, 1, null, true, new FakeWorkManager());
try {
Properties p = new Properties();
@@ -141,17 +142,20 @@
SSLEngine serverSSL, boolean isClientEncryptionEnabled, Properties socketConfig) throws CommunicationException,
ConnectionException {
if (listener == null) {
- SessionService sessionService = mock(SessionService.class);
- ClientServiceRegistry csr = new ClientServiceRegistryImpl();
- csr.registerClientService(ILogon.class, new LogonImpl(sessionService, "fakeCluster") { //$NON-NLS-1$
- @Override
- public LogonResult logon(Properties connProps)
- throws LogonException, ComponentNotFoundException {
- return new LogonResult();
+ listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(), 1024, 1024, 1, serverSSL, isClientEncryptionEnabled, new FakeWorkManager()) {
+ protected ServerConnectionFactory getServer() {
+ ServerConnectionFactory server = Mockito.mock(ServerConnectionFactory.class);
+ Mockito.stub(server.getService(ILogon.class)).toReturn(new LogonImpl(mock(SessionService.class), "fakeCluster") { //$NON-NLS-1$
+ @Override
+ public LogonResult logon(Properties connProps)
+ throws LogonException, ComponentNotFoundException {
+ return new LogonResult();
+ }
+ });
+ return server;
}
- }, "foo"); //$NON-NLS-1$
- listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
- csr, 1024, 1024, 1, serverSSL, isClientEncryptionEnabled, new FakeWorkManager());
+ };
+
SocketListenerStats stats = listener.getStats();
assertEquals(0, stats.maxSockets);
assertEquals(0, stats.objectsRead);
Modified: branches/JCA/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
===================================================================
--- branches/JCA/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-01-22 17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-01-22 22:49:05 UTC (rev 1776)
@@ -22,6 +22,11 @@
package org.teiid.transport;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;
@@ -29,8 +34,13 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import junit.framework.TestCase;
+import javax.resource.spi.ConnectionManager;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.teiid.TeiidConnectionFactory;
+import org.teiid.TeiidManagedConnectionFactory;
+import org.teiid.TeiidResourceAdapter;
import org.teiid.dqp.internal.process.DQPWorkContext;
import com.metamatrix.admin.api.exception.security.InvalidSessionException;
@@ -39,10 +49,9 @@
import com.metamatrix.api.exception.security.LogonException;
import com.metamatrix.common.api.HostInfo;
import com.metamatrix.common.api.MMURL;
-import com.metamatrix.common.comm.ClientServiceRegistry;
-import com.metamatrix.common.comm.ClientServiceRegistryImpl;
import com.metamatrix.common.comm.api.Message;
import com.metamatrix.common.comm.api.ResultsReceiver;
+import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ConnectionException;
import com.metamatrix.common.comm.platform.socket.client.SocketServerConnection;
@@ -59,7 +68,7 @@
import com.metamatrix.platform.security.api.LogonResult;
import com.metamatrix.platform.security.api.SessionToken;
-public class TestSocketRemoting extends TestCase {
+public class TestSocketRemoting {
public interface FakeService {
@@ -85,12 +94,12 @@
private static class FakeClientServerInstance extends SocketServerInstanceImpl implements ClientInstance {
- ClientServiceRegistry clientServiceRegistry;
+ ServerConnectionFactory server;
private ResultsReceiver<Object> listener;
- public FakeClientServerInstance(ClientServiceRegistry clientServiceRegistry) {
+ public FakeClientServerInstance(ServerConnectionFactory server) {
super();
- this.clientServiceRegistry = clientServiceRegistry;
+ this.server = server;
}
public HostInfo getHostInfo() {
@@ -105,9 +114,9 @@
public void send(Message message, ResultsReceiver<Object> listener,
Serializable messageKey) throws CommunicationException,
InterruptedException {
- ServerWorkItem workItem = new ServerWorkItem(this, messageKey, message, clientServiceRegistry);
+ ServerWorkItem workItem = new ServerWorkItem(this, messageKey, message, server);
this.listener = listener;
- workItem.run();
+ workItem.process();
}
public void shutdown() {
@@ -131,18 +140,20 @@
/**
* No server was supplied, will throw an NPE under the covers
*/
+ @Test
public void testUnckedException() throws Exception {
FakeClientServerInstance serverInstance = new FakeClientServerInstance(null);
try {
createFakeConnection(serverInstance);
fail("expected exception"); //$NON-NLS-1$
} catch (CommunicationException e) {
- assertEquals("Unable to find a component used in logging on to MetaMatrix", e.getMessage()); //$NON-NLS-1$
+ assertEquals("Unable to find a component used in logging on to Teiid", e.getMessage()); //$NON-NLS-1$
}
}
+ @Test
public void testMethodInvocation() throws Exception {
- ClientServiceRegistry csr = new ClientServiceRegistryImpl();
+ TeiidConnectionFactory csr = new TeiidConnectionFactory(new TeiidResourceAdapter(), new TeiidManagedConnectionFactory(), Mockito.mock(ConnectionManager.class));
csr.registerClientService(ILogon.class, new ILogon() {
public ResultsFuture<?> logoff()
14 years, 11 months
teiid SVN: r1775 - trunk/client-jdbc/src/test/java/com/metamatrix/jdbc.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2010-01-22 12:27:53 -0500 (Fri, 22 Jan 2010)
New Revision: 1775
Modified:
trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java
trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMResultSet.java
Log:
TEIID-916 fix for timeout during nextBatch.
Modified: trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java
===================================================================
--- trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java 2010-01-22 17:24:27 UTC (rev 1774)
+++ trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java 2010-01-22 17:27:53 UTC (rev 1775)
@@ -35,7 +35,11 @@
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.mockito.Matchers;
+
import junit.framework.TestCase;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
@@ -772,20 +776,20 @@
static MMResultSet helpTestBatching(MMStatement statement, int fetchSize, int batchLength,
int totalLength) throws InterruptedException, ExecutionException,
- MetaMatrixProcessingException, SQLException {
+ MetaMatrixProcessingException, SQLException, TimeoutException {
ClientSideDQP dqp = mock(ClientSideDQP.class);
stub(statement.getDQP()).toReturn(dqp);
stub(statement.getFetchSize()).toReturn(fetchSize);
for (int i = batchLength; i < totalLength; i += batchLength) {
//forward requests
ResultsFuture<ResultsMessage> nextBatch = mock(ResultsFuture.class);
- stub(nextBatch.get()).toReturn(exampleResultsMsg4(i + 1, Math.min(batchLength, totalLength - i), fetchSize, i + batchLength >= totalLength));
+ stub(nextBatch.get(Matchers.anyLong(), (TimeUnit)Matchers.anyObject())).toReturn(exampleResultsMsg4(i + 1, Math.min(batchLength, totalLength - i), fetchSize, i + batchLength >= totalLength));
stub(dqp.processCursorRequest(REQUEST_ID, i + 1, fetchSize)).toReturn(nextBatch);
if (i + batchLength < totalLength) {
//backward requests
ResultsFuture<ResultsMessage> previousBatch = mock(ResultsFuture.class);
- stub(previousBatch.get()).toReturn(exampleResultsMsg4(i - batchLength + 1, i, fetchSize, false));
+ stub(previousBatch.get(Matchers.anyLong(), (TimeUnit)Matchers.anyObject())).toReturn(exampleResultsMsg4(i - batchLength + 1, i, fetchSize, false));
stub(dqp.processCursorRequest(REQUEST_ID, i, fetchSize)).toReturn(previousBatch);
}
}
Modified: trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMResultSet.java
===================================================================
--- trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMResultSet.java 2010-01-22 17:24:27 UTC (rev 1774)
+++ trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMResultSet.java 2010-01-22 17:27:53 UTC (rev 1775)
@@ -33,6 +33,7 @@
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import junit.framework.TestCase;
@@ -693,6 +694,8 @@
throw new SQLException(e.getMessage());
} catch (ExecutionException e) {
throw new SQLException(e.getMessage());
+ } catch (TimeoutException e) {
+ throw new SQLException(e.getMessage());
}
}
14 years, 11 months
teiid SVN: r1774 - in trunk: connector-api/src/main/java/org/teiid/connector/visitor/util and 6 other directories.
by teiid-commits@lists.jboss.org
Author: jdoyle
Date: 2010-01-22 12:24:27 -0500 (Fri, 22 Jan 2010)
New Revision: 1774
Added:
trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/MetadataProcessor.java
trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/NameUtil.java
trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/Relationship.java
trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/RelationshipImpl.java
Modified:
trunk/build/kit-runtime/deploy/log4j.xml
trunk/connector-api/src/main/java/org/teiid/connector/visitor/util/SQLReservedWords.java
trunk/connector-api/src/main/java/org/teiid/connector/visitor/util/SQLStringVisitor.java
trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/SQLConversionVisitor.java
trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/Connector.java
trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/ConnectorState.java
trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/connection/SalesforceConnection.java
trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/connection/impl/ConnectionImpl.java
trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/execution/visitors/InsertVisitor.java
trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/execution/visitors/SelectVisitor.java
trunk/connectors/sandbox/connector-object/src/main/java/com/metamatrix/connector/object/util/ObjectExecutionHelper.java
Log:
TEIID-844
Implement metadata creation in the SalesForce connector.
Added isReservedWord to SQLReservedWords
Added logging filter for Axis
Modified: trunk/build/kit-runtime/deploy/log4j.xml
===================================================================
--- trunk/build/kit-runtime/deploy/log4j.xml 2010-01-22 15:35:49 UTC (rev 1773)
+++ trunk/build/kit-runtime/deploy/log4j.xml 2010-01-22 17:24:27 UTC (rev 1774)
@@ -81,6 +81,10 @@
<logger name="org.teiid">
<level value="WARN" />
</logger>
+
+ <logger name="org.apache.axis">
+ <level value="WARN" />
+ </logger>
<!-- un-comment to enable COMMAND log
<logger name="org.teiid.COMMAND_LOG" additivity="false">
Modified: trunk/connector-api/src/main/java/org/teiid/connector/visitor/util/SQLReservedWords.java
===================================================================
--- trunk/connector-api/src/main/java/org/teiid/connector/visitor/util/SQLReservedWords.java 2010-01-22 15:35:49 UTC (rev 1773)
+++ trunk/connector-api/src/main/java/org/teiid/connector/visitor/util/SQLReservedWords.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -22,8 +22,11 @@
package org.teiid.connector.visitor.util;
-public interface SQLReservedWords {
+import java.util.HashSet;
+import java.util.Set;
+public class SQLReservedWords {
+
public static final String ANY = "ANY"; //$NON-NLS-1$
public static final String ALL = "ALL"; //$NON-NLS-1$
public static final String ALL_COLS = "*"; //$NON-NLS-1$
@@ -145,4 +148,44 @@
public static final String LPAREN = "("; //$NON-NLS-1$
public static final String RPAREN = ")"; //$NON-NLS-1$
+
+ public static final String[] ALL_WORDS = new String[] {ALL, ALL_COLS, AND, ANY, AS, ASC, AVG, BEGIN, BETWEEN, BIGINTEGER,
+ BIGDECIMAL, BREAK, BY, BYTE, CASE, CAST, CHAR, CONVERT, CONTINUE, COUNT, CREATE, CRITERIA, CROSS, DATE, DEBUG, DECLARE,
+ DELETE, DESC, DISTINCT, DOUBLE, ELSE, END, ERROR, ESCAPE, EXCEPT, EXEC, EXECUTE, EXISTS, FALSE, FLOAT, FOR, FROM, FULL,
+ GROUP, HAS, HAVING, IF, IN, INNER, INSERT, INTEGER, INTERSECT, INTO, IS, JOIN, LEFT, LIKE, LONG, LOOP, MAKEDEP, MIN, MAX,
+ NOT, NULL, OBJECT, ON, OR, ORDER, OPTION, OUTER, PROCEDURE, RIGHT, SELECT, SET, SHORT, SHOWPLAN, SOME,
+ SQL_TSI_FRAC_SECOND, SQL_TSI_SECOND, SQL_TSI_MINUTE, SQL_TSI_HOUR, SQL_TSI_DAY, SQL_TSI_WEEK, SQL_TSI_MONTH,
+ SQL_TSI_QUARTER, SQL_TSI_YEAR, STRING, SUM, THEN, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TRANSLATE, TRUE, UNION,
+ UNKNOWN, UPDATE, USING, VALUES, VIRTUAL, WHEN, WITH, WHERE, WHILE,};
+
+ /**
+ * Set of CAPITALIZED reserved words for checking whether a string is a reserved word.
+ */
+ private static final Set RESERVED_WORDS = new HashSet();
+
+ // Initialize RESERVED_WORDS set
+ static {
+ // Iterate through the reserved words and capitalize all of them
+ for (int i = 0; i != SQLReservedWords.ALL_WORDS.length; ++i) {
+ String reservedWord = SQLReservedWords.ALL_WORDS[i];
+ SQLReservedWords.RESERVED_WORDS.add(reservedWord.toUpperCase());
+ }
+ }
+
+ /** Can't construct */
+ private SQLReservedWords() {
+ }
+
+ /**
+ * Check whether a string is a reserved word.
+ *
+ * @param str String to check
+ * @return True if reserved word, false if not or null
+ */
+ public static final boolean isReservedWord( String str ) {
+ if (str == null) {
+ return false;
+ }
+ return RESERVED_WORDS.contains(str.toUpperCase());
+ }
}
Modified: trunk/connector-api/src/main/java/org/teiid/connector/visitor/util/SQLStringVisitor.java
===================================================================
--- trunk/connector-api/src/main/java/org/teiid/connector/visitor/util/SQLStringVisitor.java 2010-01-22 15:35:49 UTC (rev 1773)
+++ trunk/connector-api/src/main/java/org/teiid/connector/visitor/util/SQLStringVisitor.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -82,7 +82,7 @@
* Creates a SQL string for a LanguageObject subtree. Instances of this class
* are not reusable, and are not thread-safe.
*/
-public class SQLStringVisitor extends AbstractLanguageVisitor implements SQLReservedWords {
+public class SQLStringVisitor extends AbstractLanguageVisitor {
private Set<String> infixFunctions = new HashSet<String>(Arrays.asList("%", "+", "-", "*", "+", "/", "||", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$ //$NON-NLS-6$ //$NON-NLS-7$
"&", "|", "^", "#")); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
@@ -130,8 +130,8 @@
if (items != null && items.size() != 0) {
append(items.get(0));
for (int i = 1; i < items.size(); i++) {
- buffer.append(COMMA)
- .append(SPACE);
+ buffer.append(SQLReservedWords.COMMA)
+ .append(SQLReservedWords.SPACE);
append(items.get(i));
}
}
@@ -146,8 +146,8 @@
if (items != null && items.length != 0) {
append(items[0]);
for (int i = 1; i < items.length; i++) {
- buffer.append(COMMA)
- .append(SPACE);
+ buffer.append(SQLReservedWords.COMMA)
+ .append(SQLReservedWords.SPACE);
append(items[i]);
}
}
@@ -171,19 +171,19 @@
*/
public void visit(IAggregate obj) {
buffer.append(obj.getName())
- .append(LPAREN);
+ .append(SQLReservedWords.LPAREN);
if ( obj.isDistinct()) {
- buffer.append(DISTINCT)
- .append(SPACE);
+ buffer.append(SQLReservedWords.DISTINCT)
+ .append(SQLReservedWords.SPACE);
}
if (obj.getExpression() == null) {
- buffer.append(ALL_COLS);
+ buffer.append(SQLReservedWords.ALL_COLS);
} else {
append(obj.getExpression());
}
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
/**
@@ -191,18 +191,18 @@
*/
public void visit(ICompareCriteria obj) {
append(obj.getLeftExpression());
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
switch(obj.getOperator()) {
- case EQ: buffer.append(EQ); break;
- case GE: buffer.append(GE); break;
- case GT: buffer.append(GT); break;
- case LE: buffer.append(LE); break;
- case LT: buffer.append(LT); break;
- case NE: buffer.append(NE); break;
+ case EQ: buffer.append(SQLReservedWords.EQ); break;
+ case GE: buffer.append(SQLReservedWords.GE); break;
+ case GT: buffer.append(SQLReservedWords.GT); break;
+ case LE: buffer.append(SQLReservedWords.LE); break;
+ case LT: buffer.append(SQLReservedWords.LT); break;
+ case NE: buffer.append(SQLReservedWords.NE); break;
default: buffer.append(UNDEFINED);
}
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
append(obj.getRightExpression());
}
@@ -212,8 +212,8 @@
public void visit(ICompoundCriteria obj) {
String opString = null;
switch(obj.getOperator()) {
- case AND: opString = AND; break;
- case OR: opString = OR; break;
+ case AND: opString = SQLReservedWords.AND; break;
+ case OR: opString = SQLReservedWords.OR; break;
default: opString = UNDEFINED;
}
@@ -224,16 +224,16 @@
// Special case - should really never happen, but we are tolerant
append((ILanguageObject)criteria.get(0));
} else {
- buffer.append(LPAREN);
+ buffer.append(SQLReservedWords.LPAREN);
append((ILanguageObject)criteria.get(0));
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
for (int i = 1; i < criteria.size(); i++) {
- buffer.append(SPACE)
+ buffer.append(SQLReservedWords.SPACE)
.append(opString)
- .append(SPACE)
- .append(LPAREN);
+ .append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.LPAREN);
append((ILanguageObject)criteria.get(i));
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
}
@@ -243,16 +243,16 @@
* @see com.metamatrix.data.visitor.LanguageObjectVisitor#visit(org.teiid.connector.language.IDelete)
*/
public void visit(IDelete obj) {
- buffer.append(DELETE)
- .append(SPACE);
+ buffer.append(SQLReservedWords.DELETE)
+ .append(SQLReservedWords.SPACE);
buffer.append(getSourceComment(obj));
- buffer.append(FROM)
- .append(SPACE);
+ buffer.append(SQLReservedWords.FROM)
+ .append(SQLReservedWords.SPACE);
append(obj.getGroup());
if (obj.getCriteria() != null) {
- buffer.append(SPACE)
- .append(WHERE)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.WHERE)
+ .append(SQLReservedWords.SPACE);
append(obj.getCriteria());
}
}
@@ -319,7 +319,7 @@
// If not, do normal logic: [group + "."] + element
if(groupName != null) {
elementName.append(groupName);
- elementName.append(DOT);
+ elementName.append(SQLReservedWords.DOT);
}
elementName.append(elemShortName);
return elementName.toString();
@@ -342,8 +342,8 @@
* @see com.metamatrix.data.visitor.LanguageObjectVisitor#visit(com.metamatrix.data.language.IExecute)
*/
public void visit(IProcedure obj) {
- buffer.append(EXEC)
- .append(SPACE);
+ buffer.append(SQLReservedWords.EXEC)
+ .append(SQLReservedWords.SPACE);
if(obj.getMetadataObject() != null) {
buffer.append(getName(obj.getMetadataObject()));
@@ -351,7 +351,7 @@
buffer.append(obj.getProcedureName());
}
- buffer.append(LPAREN);
+ buffer.append(SQLReservedWords.LPAREN);
final List params = obj.getParameters();
if (params != null && params.size() != 0) {
IParameter param = null;
@@ -359,8 +359,8 @@
param = (IParameter)params.get(i);
if (param.getDirection() == Direction.IN || param.getDirection() == Direction.INOUT) {
if (i != 0) {
- buffer.append(COMMA)
- .append(SPACE);
+ buffer.append(SQLReservedWords.COMMA)
+ .append(SQLReservedWords.SPACE);
}
if (param.getValue() != null) {
buffer.append(param.getValue().toString());
@@ -370,26 +370,26 @@
}
}
}
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
/*
* @see com.metamatrix.data.visitor.LanguageObjectVisitor#visit(com.metamatrix.data.language.IExistsCriteria)
*/
public void visit(IExistsCriteria obj) {
- buffer.append(EXISTS)
- .append(SPACE)
- .append(LPAREN);
+ buffer.append(SQLReservedWords.EXISTS)
+ .append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.LPAREN);
append(obj.getQuery());
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
/**
* @see com.metamatrix.data.visitor.LanguageObjectVisitor#visit(org.teiid.connector.language.IFrom)
*/
public void visit(IFrom obj) {
- buffer.append(FROM)
- .append(SPACE);
+ buffer.append(SQLReservedWords.FROM)
+ .append(SQLReservedWords.SPACE);
append(obj.getItems());
}
@@ -404,61 +404,61 @@
String name = obj.getName();
List<IExpression> args = obj.getParameters();
- if(name.equalsIgnoreCase(CONVERT) || name.equalsIgnoreCase(CAST)) {
+ if(name.equalsIgnoreCase(SQLReservedWords.CONVERT) || name.equalsIgnoreCase(SQLReservedWords.CAST)) {
Object typeValue = ((ILiteral)args.get(1)).getValue();
buffer.append(name);
- buffer.append(LPAREN);
+ buffer.append(SQLReservedWords.LPAREN);
append(args.get(0));
- if(name.equalsIgnoreCase(CONVERT)) {
- buffer.append(COMMA);
- buffer.append(SPACE);
+ if(name.equalsIgnoreCase(SQLReservedWords.CONVERT)) {
+ buffer.append(SQLReservedWords.COMMA);
+ buffer.append(SQLReservedWords.SPACE);
} else {
- buffer.append(SPACE);
- buffer.append(AS);
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
+ buffer.append(SQLReservedWords.AS);
+ buffer.append(SQLReservedWords.SPACE);
}
buffer.append(typeValue);
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
} else if(isInfixFunction(name)) {
- buffer.append(LPAREN);
+ buffer.append(SQLReservedWords.LPAREN);
if(args != null) {
for(int i=0; i<args.size(); i++) {
append(args.get(i));
if(i < (args.size()-1)) {
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
buffer.append(name);
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
}
}
}
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
- } else if(name.equalsIgnoreCase(TIMESTAMPADD) || name.equalsIgnoreCase(TIMESTAMPDIFF)) {
+ } else if(name.equalsIgnoreCase(SQLReservedWords.TIMESTAMPADD) || name.equalsIgnoreCase(SQLReservedWords.TIMESTAMPDIFF)) {
buffer.append(name);
- buffer.append(LPAREN);
+ buffer.append(SQLReservedWords.LPAREN);
if(args != null && args.size() > 0) {
buffer.append(((ILiteral)args.get(0)).getValue());
for(int i=1; i<args.size(); i++) {
- buffer.append(COMMA);
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.COMMA);
+ buffer.append(SQLReservedWords.SPACE);
append(args.get(i));
}
}
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
} else {
buffer.append(obj.getName())
- .append(LPAREN);
+ .append(SQLReservedWords.LPAREN);
append(obj.getParameters());
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
}
@@ -478,10 +478,10 @@
}
if (obj.getDefinition() != null) {
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
if (useAsInGroupAlias()){
- buffer.append(AS)
- .append(SPACE);
+ buffer.append(SQLReservedWords.AS)
+ .append(SQLReservedWords.SPACE);
}
buffer.append(obj.getContext());
}
@@ -502,10 +502,10 @@
* @see com.metamatrix.data.visitor.LanguageObjectVisitor#visit(org.teiid.connector.language.IGroupBy)
*/
public void visit(IGroupBy obj) {
- buffer.append(GROUP)
- .append(SPACE)
- .append(BY)
- .append(SPACE);
+ buffer.append(SQLReservedWords.GROUP)
+ .append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.BY)
+ .append(SQLReservedWords.SPACE);
append(obj.getElements());
}
@@ -515,29 +515,29 @@
public void visit(IInCriteria obj) {
append(obj.getLeftExpression());
if (obj.isNegated()) {
- buffer.append(SPACE)
- .append(NOT);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.NOT);
}
- buffer.append(SPACE)
- .append(IN)
- .append(SPACE)
- .append(LPAREN);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.IN)
+ .append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.LPAREN);
append(obj.getRightExpressions());
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
public void visit(IInlineView obj) {
- buffer.append(LPAREN);
+ buffer.append(SQLReservedWords.LPAREN);
if (obj.getOutput() != null) {
buffer.append(obj.getOutput());
} else {
append(obj.getQuery());
}
- buffer.append(RPAREN);
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.RPAREN);
+ buffer.append(SQLReservedWords.SPACE);
if(useAsInGroupAlias()) {
- buffer.append(AS);
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.AS);
+ buffer.append(SQLReservedWords.SPACE);
}
buffer.append(obj.getContext());
}
@@ -546,33 +546,33 @@
* @see com.metamatrix.data.visitor.LanguageObjectVisitor#visit(org.teiid.connector.language.IInsert)
*/
public void visit(IInsert obj) {
- buffer.append(INSERT).append(SPACE);
+ buffer.append(SQLReservedWords.INSERT).append(SQLReservedWords.SPACE);
buffer.append(getSourceComment(obj));
- buffer.append(INTO).append(SPACE);
+ buffer.append(SQLReservedWords.INTO).append(SQLReservedWords.SPACE);
append(obj.getGroup());
if (obj.getElements() != null && obj.getElements().size() != 0) {
- buffer.append(SPACE).append(LPAREN);
+ buffer.append(SQLReservedWords.SPACE).append(SQLReservedWords.LPAREN);
int elementCount = obj.getElements().size();
for (int i = 0; i < elementCount; i++) {
buffer.append(getElementName(obj.getElements().get(i), false));
if (i < elementCount - 1) {
- buffer.append(COMMA);
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.COMMA);
+ buffer.append(SQLReservedWords.SPACE);
}
}
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
append(obj.getValueSource());
}
@Override
public void visit(IInsertExpressionValueSource obj) {
- buffer.append(VALUES).append(SPACE).append(LPAREN);
+ buffer.append(SQLReservedWords.VALUES).append(SQLReservedWords.SPACE).append(SQLReservedWords.LPAREN);
append(obj.getValues());
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
/**
@@ -580,14 +580,14 @@
*/
public void visit(IIsNullCriteria obj) {
append(obj.getExpression());
- buffer.append(SPACE)
- .append(IS)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.IS)
+ .append(SQLReservedWords.SPACE);
if (obj.isNegated()) {
- buffer.append(NOT)
- .append(SPACE);
+ buffer.append(SQLReservedWords.NOT)
+ .append(SQLReservedWords.SPACE);
}
- buffer.append(NULL);
+ buffer.append(SQLReservedWords.NULL);
}
/**
@@ -596,56 +596,56 @@
public void visit(IJoin obj) {
IFromItem leftItem = obj.getLeftItem();
if(useParensForJoins() && leftItem instanceof IJoin) {
- buffer.append(LPAREN);
+ buffer.append(SQLReservedWords.LPAREN);
append(leftItem);
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
} else {
append(leftItem);
}
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
switch(obj.getJoinType()) {
case CROSS_JOIN:
- buffer.append(CROSS);
+ buffer.append(SQLReservedWords.CROSS);
break;
case FULL_OUTER_JOIN:
- buffer.append(FULL)
- .append(SPACE)
- .append(OUTER);
+ buffer.append(SQLReservedWords.FULL)
+ .append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.OUTER);
break;
case INNER_JOIN:
- buffer.append(INNER);
+ buffer.append(SQLReservedWords.INNER);
break;
case LEFT_OUTER_JOIN:
- buffer.append(LEFT)
- .append(SPACE)
- .append(OUTER);
+ buffer.append(SQLReservedWords.LEFT)
+ .append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.OUTER);
break;
case RIGHT_OUTER_JOIN:
- buffer.append(RIGHT)
- .append(SPACE)
- .append(OUTER);
+ buffer.append(SQLReservedWords.RIGHT)
+ .append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.OUTER);
break;
default: buffer.append(UNDEFINED);
}
- buffer.append(SPACE)
- .append(JOIN)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.JOIN)
+ .append(SQLReservedWords.SPACE);
IFromItem rightItem = obj.getRightItem();
if(rightItem instanceof IJoin && (useParensForJoins() || obj.getJoinType() == IJoin.JoinType.CROSS_JOIN)) {
- buffer.append(LPAREN);
+ buffer.append(SQLReservedWords.LPAREN);
append(rightItem);
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
} else {
append(rightItem);
}
final List criteria = obj.getCriteria();
if (criteria != null && criteria.size() != 0) {
- buffer.append(SPACE)
- .append(ON)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.ON)
+ .append(SQLReservedWords.SPACE);
Iterator critIter = criteria.iterator();
while(critIter.hasNext()) {
@@ -653,15 +653,15 @@
if(crit instanceof IPredicateCriteria) {
append(crit);
} else {
- buffer.append(LPAREN);
+ buffer.append(SQLReservedWords.LPAREN);
append(crit);
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
if(critIter.hasNext()) {
- buffer.append(SPACE)
- .append(AND)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.AND)
+ .append(SQLReservedWords.SPACE);
}
}
}
@@ -673,31 +673,31 @@
public void visit(ILikeCriteria obj) {
append(obj.getLeftExpression());
if (obj.isNegated()) {
- buffer.append(SPACE)
- .append(NOT);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.NOT);
}
- buffer.append(SPACE)
- .append(LIKE)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.LIKE)
+ .append(SQLReservedWords.SPACE);
append(obj.getRightExpression());
if (obj.getEscapeCharacter() != null) {
- buffer.append(SPACE)
- .append(ESCAPE)
- .append(SPACE)
- .append(QUOTE)
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.ESCAPE)
+ .append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.QUOTE)
.append(obj.getEscapeCharacter().toString())
- .append(QUOTE);
+ .append(SQLReservedWords.QUOTE);
}
}
public void visit(ILimit obj) {
- buffer.append(LIMIT)
- .append(SPACE);
+ buffer.append(SQLReservedWords.LIMIT)
+ .append(SQLReservedWords.SPACE);
if (obj.getRowOffset() > 0) {
buffer.append(obj.getRowOffset())
- .append(COMMA)
- .append(SPACE);
+ .append(SQLReservedWords.COMMA)
+ .append(SQLReservedWords.SPACE);
}
buffer.append(obj.getRowLimit());
}
@@ -709,7 +709,7 @@
if (obj.isBindValue()) {
buffer.append("?"); //$NON-NLS-1$
} else if (obj.getValue() == null) {
- buffer.append(NULL);
+ buffer.append(SQLReservedWords.NULL);
} else {
Class type = obj.getType();
String val = obj.getValue().toString();
@@ -732,9 +732,9 @@
.append(val)
.append("'}"); //$NON-NLS-1$
} else {
- buffer.append(QUOTE)
- .append(escapeString(val, QUOTE))
- .append(QUOTE);
+ buffer.append(SQLReservedWords.QUOTE)
+ .append(escapeString(val, SQLReservedWords.QUOTE))
+ .append(SQLReservedWords.QUOTE);
}
}
}
@@ -743,21 +743,21 @@
* @see com.metamatrix.data.visitor.LanguageObjectVisitor#visit(org.teiid.connector.language.INotCriteria)
*/
public void visit(INotCriteria obj) {
- buffer.append(NOT)
- .append(SPACE)
- .append(LPAREN);
+ buffer.append(SQLReservedWords.NOT)
+ .append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.LPAREN);
append(obj.getCriteria());
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
/**
* @see com.metamatrix.data.visitor.LanguageObjectVisitor#visit(org.teiid.connector.language.IOrderBy)
*/
public void visit(IOrderBy obj) {
- buffer.append(ORDER)
- .append(SPACE)
- .append(BY)
- .append(SPACE);
+ buffer.append(SQLReservedWords.ORDER)
+ .append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.BY)
+ .append(SQLReservedWords.SPACE);
append(obj.getItems());
}
@@ -774,8 +774,8 @@
buffer.append(UNDEFINED);
}
if (obj.getDirection() == IOrderByItem.DESC) {
- buffer.append(SPACE)
- .append(DESC);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.DESC);
} // Don't print default "ASC"
}
@@ -786,7 +786,7 @@
if (obj.getValue() == null) {
buffer.append(UNDEFINED_PARAM);
} else if (obj.getValue() == null) {
- buffer.append(NULL);
+ buffer.append(SQLReservedWords.NULL);
} else {
buffer.append(obj.getValue().toString());
}
@@ -798,31 +798,31 @@
public void visit(IQuery obj) {
visitSelect(obj.getSelect(), obj);
if (obj.getFrom() != null) {
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
append(obj.getFrom());
}
if (obj.getWhere() != null) {
- buffer.append(SPACE)
- .append(WHERE)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.WHERE)
+ .append(SQLReservedWords.SPACE);
append(obj.getWhere());
}
if (obj.getGroupBy() != null) {
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
append(obj.getGroupBy());
}
if (obj.getHaving() != null) {
- buffer.append(SPACE)
- .append(HAVING)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.HAVING)
+ .append(SQLReservedWords.SPACE);
append(obj.getHaving());
}
if (obj.getOrderBy() != null) {
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
append(obj.getOrderBy());
}
if (obj.getLimit() != null) {
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
append(obj.getLimit());
}
}
@@ -831,26 +831,26 @@
* @see com.metamatrix.data.visitor.LanguageObjectVisitor#visit(org.teiid.connector.language.ISearchedCaseExpression)
*/
public void visit(ISearchedCaseExpression obj) {
- buffer.append(CASE);
+ buffer.append(SQLReservedWords.CASE);
final int whenCount = obj.getWhenCount();
for (int i = 0; i < whenCount; i++) {
- buffer.append(SPACE)
- .append(WHEN)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.WHEN)
+ .append(SQLReservedWords.SPACE);
append(obj.getWhenCriteria(i));
- buffer.append(SPACE)
- .append(THEN)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.THEN)
+ .append(SQLReservedWords.SPACE);
append(obj.getThenExpression(i));
}
if (obj.getElseExpression() != null) {
- buffer.append(SPACE)
- .append(ELSE)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.ELSE)
+ .append(SQLReservedWords.SPACE);
append(obj.getElseExpression());
}
- buffer.append(SPACE)
- .append(END);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.END);
}
/**
@@ -861,10 +861,10 @@
}
private void visitSelect(ISelect obj, ICommand command) {
- buffer.append(SELECT).append(SPACE);
+ buffer.append(SQLReservedWords.SELECT).append(SQLReservedWords.SPACE);
buffer.append(getSourceComment(command));
if (obj.isDistinct()) {
- buffer.append(DISTINCT).append(SPACE);
+ buffer.append(SQLReservedWords.DISTINCT).append(SQLReservedWords.SPACE);
}
append(obj.getSelectSymbols());
}
@@ -877,9 +877,9 @@
* @see com.metamatrix.data.visitor.LanguageObjectVisitor#visit(com.metamatrix.data.language.IScalarSubquery)
*/
public void visit(IScalarSubquery obj) {
- buffer.append(LPAREN);
+ buffer.append(SQLReservedWords.LPAREN);
append(obj.getQuery());
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
/**
@@ -888,9 +888,9 @@
public void visit(ISelectSymbol obj) {
append(obj.getExpression());
if (obj.hasAlias()) {
- buffer.append(SPACE)
- .append(AS)
- .append(SPACE)
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.AS)
+ .append(SQLReservedWords.SPACE)
.append(obj.getOutputName());
}
}
@@ -900,27 +900,27 @@
*/
public void visit(ISubqueryCompareCriteria obj) {
append(obj.getLeftExpression());
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
switch(obj.getOperator()) {
- case EQ: buffer.append(EQ); break;
- case GE: buffer.append(GE); break;
- case GT: buffer.append(GT); break;
- case LE: buffer.append(LE); break;
- case LT: buffer.append(LT); break;
- case NE: buffer.append(NE); break;
+ case EQ: buffer.append(SQLReservedWords.EQ); break;
+ case GE: buffer.append(SQLReservedWords.GE); break;
+ case GT: buffer.append(SQLReservedWords.GT); break;
+ case LE: buffer.append(SQLReservedWords.LE); break;
+ case LT: buffer.append(SQLReservedWords.LT); break;
+ case NE: buffer.append(SQLReservedWords.NE); break;
default: buffer.append(UNDEFINED);
}
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
switch(obj.getQuantifier()) {
- case ALL: buffer.append(ALL); break;
- case SOME: buffer.append(SOME); break;
+ case ALL: buffer.append(SQLReservedWords.ALL); break;
+ case SOME: buffer.append(SQLReservedWords.SOME); break;
default: buffer.append(UNDEFINED);
}
- buffer.append(SPACE);
- buffer.append(LPAREN);
+ buffer.append(SQLReservedWords.SPACE);
+ buffer.append(SQLReservedWords.LPAREN);
append(obj.getQuery());
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
/*
@@ -929,33 +929,33 @@
public void visit(ISubqueryInCriteria obj) {
append(obj.getLeftExpression());
if (obj.isNegated()) {
- buffer.append(SPACE)
- .append(NOT);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.NOT);
}
- buffer.append(SPACE)
- .append(IN)
- .append(SPACE)
- .append(LPAREN);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.IN)
+ .append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.LPAREN);
append(obj.getQuery());
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
}
/**
* @see com.metamatrix.data.visitor.LanguageObjectVisitor#visit(org.teiid.connector.language.IUpdate)
*/
public void visit(IUpdate obj) {
- buffer.append(UPDATE)
- .append(SPACE);
+ buffer.append(SQLReservedWords.UPDATE)
+ .append(SQLReservedWords.SPACE);
buffer.append(getSourceComment(obj));
append(obj.getGroup());
- buffer.append(SPACE)
- .append(SET)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.SET)
+ .append(SQLReservedWords.SPACE);
append(obj.getChanges());
if (obj.getCriteria() != null) {
- buffer.append(SPACE)
- .append(WHERE)
- .append(SPACE);
+ buffer.append(SQLReservedWords.SPACE)
+ .append(SQLReservedWords.WHERE)
+ .append(SQLReservedWords.SPACE);
append(obj.getCriteria());
}
}
@@ -966,34 +966,34 @@
public void visit(ISetClause clause) {
buffer.append(getElementName(clause.getSymbol(), false));
- buffer.append(SPACE).append(EQ).append(SPACE);
+ buffer.append(SQLReservedWords.SPACE).append(SQLReservedWords.EQ).append(SQLReservedWords.SPACE);
append(clause.getValue());
}
public void visit(ISetQuery obj) {
appendSetQuery(obj.getLeftQuery());
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
appendSetOperation(obj.getOperation());
if(obj.isAll()) {
- buffer.append(SPACE);
- buffer.append(ALL);
+ buffer.append(SQLReservedWords.SPACE);
+ buffer.append(SQLReservedWords.ALL);
}
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
appendSetQuery(obj.getRightQuery());
IOrderBy orderBy = obj.getOrderBy();
if(orderBy != null) {
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
append(orderBy);
}
ILimit limit = obj.getLimit();
if(limit != null) {
- buffer.append(SPACE);
+ buffer.append(SQLReservedWords.SPACE);
append(limit);
}
}
@@ -1008,9 +1008,9 @@
protected void appendSetQuery(IQueryCommand obj) {
if(obj instanceof ISetQuery || useParensForSetQueries()) {
- buffer.append(LPAREN);
+ buffer.append(SQLReservedWords.LPAREN);
append(obj);
- buffer.append(RPAREN);
+ buffer.append(SQLReservedWords.RPAREN);
} else {
append(obj);
}
Modified: trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/SQLConversionVisitor.java
===================================================================
--- trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/SQLConversionVisitor.java 2010-01-22 15:35:49 UTC (rev 1773)
+++ trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/SQLConversionVisitor.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -53,6 +53,7 @@
import org.teiid.connector.language.ISetClause;
import org.teiid.connector.language.IParameter.Direction;
import org.teiid.connector.language.ISetQuery.Operation;
+import org.teiid.connector.visitor.util.SQLReservedWords;
import org.teiid.connector.visitor.util.SQLStringVisitor;
@@ -138,7 +139,7 @@
*/
private void translateSQLType(Class type, Object obj, StringBuilder valuesbuffer) {
if (obj == null) {
- valuesbuffer.append(NULL);
+ valuesbuffer.append(SQLReservedWords.NULL);
} else {
if(Number.class.isAssignableFrom(type)) {
boolean useFormatting = false;
@@ -173,9 +174,9 @@
} else {
// If obj is string, toSting() will not create a new String
// object, it returns it self, so new object creation.
- valuesbuffer.append(QUOTE)
- .append(escapeString(obj.toString(), QUOTE))
- .append(QUOTE);
+ valuesbuffer.append(SQLReservedWords.QUOTE)
+ .append(escapeString(obj.toString(), SQLReservedWords.QUOTE))
+ .append(SQLReservedWords.QUOTE);
}
}
}
Modified: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/Connector.java
===================================================================
--- trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/Connector.java 2010-01-22 15:35:49 UTC (rev 1773)
+++ trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/Connector.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -33,13 +33,17 @@
import org.teiid.connector.api.ConnectorLogger;
import org.teiid.connector.api.CredentialMap;
import org.teiid.connector.api.ExecutionContext;
+import org.teiid.connector.api.MetadataProvider;
import org.teiid.connector.api.ConnectorAnnotations.ConnectionPooling;
+import org.teiid.connector.metadata.runtime.MetadataFactory;
import com.metamatrix.connector.salesforce.connection.SalesforceConnection;
@ConnectionPooling
-public class Connector extends org.teiid.connector.basic.BasicConnector {
+public class Connector extends org.teiid.connector.basic.BasicConnector implements MetadataProvider {
+ private static Connector connector;
+
private ConnectorLogger logger;
private ConnectorEnvironment connectorEnv;
@@ -50,6 +54,10 @@
private URL url;
private SalesforceCapabilities salesforceCapabilites;
+ public Connector() {
+ connector = this;
+ }
+
// ///////////////////////////////////////////////////////////
// Connector implementation
// ///////////////////////////////////////////////////////////
@@ -157,4 +165,15 @@
public ConnectorCapabilities getCapabilities() {
return salesforceCapabilites;
}
+
+ @Override
+ public void getConnectorMetadata(MetadataFactory metadataFactory)
+ throws ConnectorException {
+ MetadataProcessor processor = new MetadataProcessor((SalesforceConnection)getConnection(null),metadataFactory, logger);
+ processor.processMetadata();
+ }
+
+ public static Connector getConnector() {
+ return connector;
+ }
}
Modified: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/ConnectorState.java
===================================================================
--- trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/ConnectorState.java 2010-01-22 15:35:49 UTC (rev 1773)
+++ trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/ConnectorState.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -34,12 +34,13 @@
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String URL = "URL";
+ public static final String MODEL_AUDIT_FIELDS = "ModelAuditFields";
String username;
String password;
URL url;
+ boolean modelAuditFields;
-
public ConnectorState(Properties props, ConnectorLogger logger) throws ConnectorException {
if (logger == null) {
throw new ConnectorException("Internal Exception: logger is null");
@@ -65,6 +66,11 @@
}
setUrl(salesforceURL);
}
+
+ Boolean modelAudits = Boolean.valueOf(props.getProperty(MODEL_AUDIT_FIELDS));
+ if (modelAudits) {
+ setModelAuditFields(modelAudits);
+ }
}
private void setUrl(URL salesforceURL) {
@@ -90,5 +96,12 @@
public String getPassword() {
return password;
}
+
+ public void setModelAuditFields(boolean modelAuditFields) {
+ this.modelAuditFields = modelAuditFields;
+ }
+ public boolean isModelAuditFields() {
+ return modelAuditFields;
+ }
}
Added: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/MetadataProcessor.java
===================================================================
--- trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/MetadataProcessor.java (rev 0)
+++ trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/MetadataProcessor.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -0,0 +1,298 @@
+package com.metamatrix.connector.salesforce;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.teiid.connector.api.ConnectorException;
+import org.teiid.connector.api.ConnectorLogger;
+import org.teiid.connector.metadata.runtime.Column;
+import org.teiid.connector.metadata.runtime.KeyRecord;
+import org.teiid.connector.metadata.runtime.MetadataFactory;
+import org.teiid.connector.metadata.runtime.Table;
+import org.teiid.connector.metadata.runtime.BaseColumn.NullType;
+import org.teiid.connector.metadata.runtime.Column.SearchType;
+import org.teiid.connector.visitor.util.SQLReservedWords;
+
+import com.metamatrix.common.types.DataTypeManager;
+import com.metamatrix.connector.salesforce.connection.SalesforceConnection;
+import com.sforce.soap.partner.ChildRelationship;
+import com.sforce.soap.partner.DescribeGlobalResult;
+import com.sforce.soap.partner.DescribeGlobalSObjectResult;
+import com.sforce.soap.partner.DescribeSObjectResult;
+import com.sforce.soap.partner.Field;
+import com.sforce.soap.partner.FieldType;
+import com.sforce.soap.partner.PicklistEntry;
+
+public class MetadataProcessor {
+ private MetadataFactory metadataFactory;
+ private SalesforceConnection connection;
+ private ConnectorLogger logger;
+
+ private Map<String, Table> tableMap = new HashMap<String, Table>();
+ private List<Relationship> relationships = new ArrayList<Relationship>();
+ private boolean hasUpdateableColumn = false;
+ private List<Column> columns;
+
+ // Audit Fields
+ public static final String AUDIT_FIELD_CREATED_BY_ID = "CreatedById"; //$NON-NLS-1$
+ public static final String AUDIT_FIELD_CREATED_DATE = "CreatedDate"; //$NON-NLS-1$
+ public static final String AUDIT_FIELD_LAST_MODIFIED_BY_ID = "LastModifiedById"; //$NON-NLS-1$
+ public static final String AUDIT_FIELD_LAST_MODIFIED_DATE = "LastModifiedDate"; //$NON-NLS-1$
+ public static final String AUDIT_FIELD_SYSTEM_MOD_STAMP = "SystemModstamp"; //$NON-NLS-1$
+
+ // Model Extensions
+ static final String TABLE_SUPPORTS_CREATE = "Supports Create"; //$NON-NLS-1$
+ static final String TABLE_SUPPORTS_DELETE = "Supports Delete"; //$NON-NLS-1$
+ static final String TABLE_CUSTOM = "Custom"; //$NON-NLS-1$
+ static final String TABLE_SUPPORTS_LOOKUP = "Supports ID Lookup"; //$NON-NLS-1$
+ static final String TABLE_SUPPORTS_MERGE = "Supports Merge"; //$NON-NLS-1$
+ static final String TABLE_SUPPORTS_QUERY = "Supports Query"; //$NON-NLS-1$
+ static final String TABLE_SUPPORTS_REPLICATE = "Supports Replicate"; //$NON-NLS-1$
+ static final String TABLE_SUPPORTS_RETRIEVE = "Supports Retrieve"; //$NON-NLS-1$
+ static final String TABLE_SUPPORTS_SEARCH = "Supports Search"; //$NON-NLS-1$
+
+ static final String COLUMN_DEFAULTED = "Defaulted on Create"; //$NON-NLS-1$
+ static final String COLUMN_CUSTOM = "Custom"; //$NON-NLS-1$
+ static final String COLUMN_CALCULATED = "Calculated"; //$NON-NLS-1$
+ static final String COLUMN_PICKLIST_VALUES = "Picklist Values"; //$NON-NLS-1$
+
+ public MetadataProcessor(SalesforceConnection connection, MetadataFactory metadataFactory, ConnectorLogger logger) {
+ this.connection = connection;
+ this.metadataFactory = metadataFactory;
+ this.logger = logger;
+ }
+
+ public void processMetadata() throws ConnectorException {
+ DescribeGlobalResult globalResult = connection.getObjects();
+ DescribeGlobalSObjectResult[] objects = globalResult.getSobjects();
+ for (int i=0;i < objects.length;i++) {
+ DescribeGlobalSObjectResult object = objects[i];
+ addTable(object);
+ }
+ addRelationships();
+ }
+
+ private void addRelationships() throws ConnectorException {
+ for (Iterator<Relationship> iterator = relationships.iterator(); iterator.hasNext();) {
+ Relationship relationship = iterator.next();
+ ConnectorState state = Connector.getConnector().getState();
+ if (!state.isModelAuditFields() && isAuditField(relationship.getForeignKeyField())) {
+ continue;
+ }
+
+ Table parent = tableMap.get(NameUtil.normalizeName(relationship.getParentTable()));
+ KeyRecord pk = parent.getPrimaryKey();
+ if (null == pk) {
+ throw new RuntimeException("ERROR !!primary key column not found!!"); //$NON-NLS-1$
+ }
+ ArrayList<String> columnNames = new ArrayList<String>();
+ columnNames.add(pk.getName());
+
+
+ Table child = tableMap.get(NameUtil.normalizeName(relationship.getChildTable()));
+
+ Column col = null;
+ columns = child.getColumns();
+ for (Iterator colIter = columns.iterator(); colIter.hasNext();) {
+ Column column = (Column) colIter.next();
+ if(column.getName().equals(relationship.getForeignKeyField())) {
+ col = column;
+ }
+ }
+ if (null == col) throw new RuntimeException(
+ "ERROR !!foreign key column not found!! " + child.getName() + relationship.getForeignKeyField()); //$NON-NLS-1$
+
+
+ String columnName = "FK_" + parent.getName() + "_" + col.getName();
+ ArrayList<String> columnNames2 = new ArrayList<String>();
+ columnNames2.add(col.getName());
+ metadataFactory.addForiegnKey(columnName, columnNames2, parent, child);
+
+ }
+
+
+ }
+
+ public static boolean isAuditField(String name) {
+ boolean result = false;
+ if(name.equals(AUDIT_FIELD_CREATED_BY_ID) ||
+ name.equals(AUDIT_FIELD_CREATED_DATE) ||
+ name.equals(AUDIT_FIELD_LAST_MODIFIED_BY_ID) ||
+ name.equals(AUDIT_FIELD_LAST_MODIFIED_DATE) ||
+ name.equals(AUDIT_FIELD_SYSTEM_MOD_STAMP)) {
+ result = true;
+ }
+ return result;
+ }
+
+ private void addTable(DescribeGlobalSObjectResult object) throws ConnectorException {
+ DescribeSObjectResult objectMetadata = connection.getObjectMetaData(object.getName());
+ String name = NameUtil.normalizeName(objectMetadata.getName());
+ Table table = metadataFactory.addTable(name);
+
+ table.setNameInSource(objectMetadata.getName());
+ tableMap.put(name, table);
+ getRelationships(objectMetadata);
+
+ table.setProperty(TABLE_CUSTOM, String.valueOf(objectMetadata.isCustom()));
+ table.setProperty(TABLE_SUPPORTS_CREATE, String.valueOf(objectMetadata.isCreateable()));
+ table.setProperty(TABLE_SUPPORTS_DELETE, String.valueOf(objectMetadata.isDeletable()));
+ table.setProperty(TABLE_SUPPORTS_MERGE, String.valueOf(objectMetadata.isMergeable()));
+ table.setProperty(TABLE_SUPPORTS_QUERY, String.valueOf(objectMetadata.isQueryable()));
+ table.setProperty(TABLE_SUPPORTS_REPLICATE, String.valueOf(objectMetadata.isReplicateable()));
+ table.setProperty(TABLE_SUPPORTS_RETRIEVE, String.valueOf(objectMetadata.isRetrieveable()));
+ table.setProperty(TABLE_SUPPORTS_SEARCH, String.valueOf(objectMetadata.isSearchable()));
+
+ hasUpdateableColumn = false;
+ addColumns(objectMetadata, table);
+
+ // Some SF objects return true for isUpdateable() but have no updateable columns.
+ if(hasUpdateableColumn && objectMetadata.isUpdateable()) {
+ table.setSupportsUpdate(true);
+ }
+ }
+
+ private void getRelationships(DescribeSObjectResult objectMetadata) {
+ ChildRelationship[] children = objectMetadata.getChildRelationships();
+ if(children != null && children.length != 0) {
+ for (int i = 0; i < children.length; i++) {
+ ChildRelationship childRelation = children[i];
+ Relationship newRelation = new RelationshipImpl();
+ newRelation.setParentTable(objectMetadata.getName());
+ newRelation.setChildTable(childRelation.getChildSObject());
+ newRelation.setForeignKeyField(childRelation.getField());
+ newRelation.setCascadeDelete(childRelation.isCascadeDelete());
+ relationships.add(newRelation);
+ }
+ }
+ }
+
+ private void addColumns(DescribeSObjectResult objectMetadata, Table table) throws ConnectorException {
+ Field[] fields = objectMetadata.getFields();
+ for (int i=0;i < fields.length;i++) {
+ Field field = fields[i];
+ String normalizedName = NameUtil.normalizeName(field.getName());
+ FieldType fieldType = field.getType();
+ if(!Connector.getConnector().getState().isModelAuditFields() && isAuditField(field.getName())) {
+ continue;
+ }
+ String sfTypeName = fieldType.getValue();
+ Column column = null;
+ if(sfTypeName.equals(FieldType._value1) || //string
+ sfTypeName.equals(FieldType._value4) || //"combobox"
+ sfTypeName.equals(FieldType._value5) || //"reference"
+ sfTypeName.equals(FieldType._value13) || //"phone"
+ sfTypeName.equals(FieldType._value14) || //"id"
+ sfTypeName.equals(FieldType._value18) || //"url"
+ sfTypeName.equals(FieldType._value19) || //"email"
+ sfTypeName.equals(FieldType._value20) || //"encryptedstring"
+ sfTypeName.equals(FieldType._value21)) { //"anytype"
+ column = metadataFactory.addColumn(normalizedName, DataTypeManager.DefaultDataTypes.STRING, table);
+ column.setNativeType(sfTypeName);
+ if(sfTypeName.equals(FieldType._value14)) {
+ column.setNullType(NullType.No_Nulls);
+ ArrayList<String> columnNames = new ArrayList<String>();
+ columnNames.add(field.getName());
+ metadataFactory.addPrimaryKey(field.getName()+"_PK", columnNames, table);
+ }
+ }
+ else if(sfTypeName.equals(FieldType._value2)) { // "picklist"
+ column = metadataFactory.addColumn(normalizedName, DataTypeManager.DefaultDataTypes.STRING, table);
+ if(field.isRestrictedPicklist()) {
+ column.setNativeType("restrictedpicklist");
+ } else {
+ column.setNativeType(FieldType._value2);
+ }
+
+ column.setProperty(COLUMN_PICKLIST_VALUES, getPicklistValues(field));
+ }
+ else if(sfTypeName.equals(FieldType._value3)) { //"multipicklist"
+ column = metadataFactory.addColumn(normalizedName, DataTypeManager.DefaultDataTypes.STRING, table);
+ if(field.isRestrictedPicklist()) {
+ column.setNativeType("restrictedmultiselectpicklist");
+ } else {
+ column.setNativeType(FieldType._value3);
+ }
+ column.setProperty(COLUMN_PICKLIST_VALUES, getPicklistValues(field));
+ }
+ else if(sfTypeName.equals(FieldType._value6)) { //"base64"
+ column = metadataFactory.addColumn(normalizedName, DataTypeManager.DefaultDataTypes.BLOB, table);
+ column.setNativeType(FieldType._value6);
+ }
+ else if(sfTypeName.equals(FieldType._value7)) { //"boolean"
+ column = metadataFactory.addColumn(normalizedName, DataTypeManager.DefaultDataTypes.BOOLEAN, table);
+ column.setNativeType(FieldType._value7);
+ }
+ else if(sfTypeName.equals(FieldType._value8)) { //"currency"
+ column = metadataFactory.addColumn(normalizedName, DataTypeManager.DefaultDataTypes.DOUBLE, table);
+ column.setNativeType(FieldType._value8);
+ column.setCurrency(true);
+ column.setScale(field.getScale());
+ column.setPrecision(field.getPrecision());
+ }
+ else if(sfTypeName.equals(FieldType._value9)) { //"textarea"
+ column = metadataFactory.addColumn(normalizedName, DataTypeManager.DefaultDataTypes.STRING, table);
+ column.setNativeType(FieldType._value9);
+ column.setSearchType(SearchType.Unsearchable);
+ }
+ else if(sfTypeName.equals(FieldType._value10)) { //"int"
+ column = metadataFactory.addColumn(normalizedName, DataTypeManager.DefaultDataTypes.INTEGER, table);
+ column.setNativeType(FieldType._value10);
+ column.setPrecision(field.getPrecision());
+ }
+ else if(sfTypeName.equals(FieldType._value11) || //"double"
+ sfTypeName.equals(FieldType._value12)) { //"percent"
+ column = metadataFactory.addColumn(normalizedName, DataTypeManager.DefaultDataTypes.DOUBLE, table);
+ column.setNativeType(sfTypeName);
+ column.setScale(field.getScale());
+ column.setPrecision(field.getPrecision());
+ }
+ else if(sfTypeName.equals(FieldType._value15)) { //"date"
+ column = metadataFactory.addColumn(normalizedName, DataTypeManager.DefaultDataTypes.DATE, table);
+ column.setNativeType(FieldType._value15);
+ }
+ else if(sfTypeName.equals(FieldType._value16)) { //"datetime"
+ column = metadataFactory.addColumn(normalizedName, DataTypeManager.DefaultDataTypes.TIMESTAMP, table);
+ column.setNativeType(FieldType._value16);
+ }
+ else if(sfTypeName.equals(FieldType._value17)) { //"time"
+ column = metadataFactory.addColumn(normalizedName, DataTypeManager.DefaultDataTypes.TIME, table);
+ column.setNativeType(FieldType._value17);
+ }
+ if(null == column) {
+ logger.logError("Unknown type returned by SalesForce: " + sfTypeName);
+ continue;
+ } else {
+ column.setNameInSource(field.getName());
+ column.setLength(field.getLength());
+ if(field.isUpdateable()) {
+ column.setUpdatable(true);
+ hasUpdateableColumn = true;
+ }
+ column.setProperty(COLUMN_CALCULATED, String.valueOf(field.isCalculated()));
+ column.setProperty(COLUMN_CUSTOM, String.valueOf(field.isCustom()));
+ column.setProperty(COLUMN_DEFAULTED, String.valueOf(field.isDefaultedOnCreate()));
+ }
+
+ }
+ }
+
+ private String getPicklistValues(Field field) {
+ StringBuffer picklistValues = new StringBuffer();
+ if(null != field.getPicklistValues() && 0 != field.getPicklistValues().length) {
+ List<PicklistEntry> entries = Arrays.asList(field.getPicklistValues());
+ for (Iterator iterator = entries.iterator(); iterator.hasNext();) {
+ PicklistEntry entry = (PicklistEntry) iterator.next();
+ picklistValues.append(entry.getValue());
+ if(iterator.hasNext()) {
+ picklistValues.append(',');
+ }
+ }
+ }
+ return picklistValues.toString();
+ }
+}
Property changes on: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/MetadataProcessor.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/NameUtil.java
===================================================================
--- trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/NameUtil.java (rev 0)
+++ trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/NameUtil.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ *
+ * See the LEGAL.txt file distributed with this work for information regarding copyright ownership and licensing.
+ *
+ * See the AUTHORS.txt file distributed with this work for a full listing of individual contributors.
+ */
+package com.metamatrix.connector.salesforce;
+
+import org.teiid.connector.visitor.util.SQLReservedWords;
+
+public class NameUtil {
+
+ public static String normalizeName( String nameIn ) {
+ String normal = nameIn.trim();
+ normal = removeDuplicate(normal);
+ normal = removeSpaces(normal);
+ normal = removeIllegalChars(normal);
+ normal = removeTrailingUnderscore(normal);
+ normal = removeLeadingUnderscore(normal);
+ normal = checkReservedWords(normal);
+ return normal;
+
+ }
+
+ /**
+ * @param normal
+ * @return
+ */
+ private static String checkReservedWords( String normal ) {
+ if (SQLReservedWords.isReservedWord(normal)) {
+ normal = normal + "_"; //$NON-NLS-1$
+ }
+ return normal;
+ }
+
+ private static String removeTrailingUnderscore( String normal ) {
+ if (normal.endsWith("_")) { //$NON-NLS-1$
+ return normal.substring(0, normal.lastIndexOf('_'));
+ }
+ return normal;
+ }
+
+ private static String removeIllegalChars( String normal ) {
+ String edit = normal;
+ edit = edit.replace('.', '_');
+ edit = edit.replace('(', '_');
+ edit = edit.replace(')', '_');
+ edit = edit.replace('/', '_');
+ edit = edit.replace('\\', '_');
+ edit = edit.replace(':', '_');
+ edit = edit.replace('\'', '_');
+ edit = edit.replace('-', '_');
+ edit = edit.replace("%", "percentage");//$NON-NLS-1$ //$NON-NLS-2$
+ edit = edit.replace("#", "number");//$NON-NLS-1$ //$NON-NLS-2$
+ edit = edit.replace("$", "_");//$NON-NLS-1$ //$NON-NLS-2$
+ edit = edit.replace("{", "_");//$NON-NLS-1$ //$NON-NLS-2$
+ edit = edit.replace("}", "_");//$NON-NLS-1$ //$NON-NLS-2$
+ return edit;
+ }
+
+ private static String removeSpaces( String normal ) {
+ return normal.replace(' ', '_');
+ }
+
+ private static String removeDuplicate( String normal ) {
+ if (normal.indexOf('(') < 0 || normal.indexOf(')') != normal.length() - 1) return normal;
+ String firstPart = normal.substring(0, normal.indexOf('(')).trim();
+ String secondPart = normal.substring(normal.indexOf('(') + 1, normal.length() - 1).trim();
+ if (firstPart.equals(secondPart) || secondPart.equals("null")) return firstPart; //$NON-NLS-1$
+ return normal;
+ }
+
+ /**
+ * @param normal
+ * @return
+ */
+ private static String removeLeadingUnderscore( String normal ) {
+ while (normal.indexOf('_') == 0) {
+ normal = normal.substring(1);
+ }
+ return normal;
+ }
+
+}
\ No newline at end of file
Property changes on: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/NameUtil.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/Relationship.java
===================================================================
--- trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/Relationship.java (rev 0)
+++ trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/Relationship.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -0,0 +1,21 @@
+package com.metamatrix.connector.salesforce;
+
+public interface Relationship {
+
+ void setParentTable(String name);
+
+ void setChildTable(String childSObject);
+
+ void setForeignKeyField(String field);
+
+ void setCascadeDelete(boolean cascadeDelete);
+
+ public boolean isCascadeDelete();
+
+ public String getChildTable();
+
+ public String getForeignKeyField();
+
+ public String getParentTable();
+
+}
Property changes on: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/Relationship.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/RelationshipImpl.java
===================================================================
--- trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/RelationshipImpl.java (rev 0)
+++ trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/RelationshipImpl.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -0,0 +1,40 @@
+package com.metamatrix.connector.salesforce;
+
+public class RelationshipImpl implements Relationship {
+ boolean cascadeDelete;
+ public String childTablename;
+ public String parentTableName;
+ public String foreignKeyField;
+
+ public void setCascadeDelete(boolean delete) {
+ cascadeDelete = delete;
+ }
+
+ public boolean isCascadeDelete() {
+ return cascadeDelete;
+ }
+
+ public void setChildTable(String childTable) {
+ childTablename = childTable;
+ }
+
+ public String getChildTable() {
+ return childTablename;
+ }
+
+ public String getForeignKeyField() {
+ return foreignKeyField;
+ }
+
+ public void setForeignKeyField(String foreignKeyField) {
+ this.foreignKeyField = foreignKeyField;
+ }
+
+ public String getParentTable() {
+ return parentTableName;
+ }
+
+ public void setParentTable(String parentTableName) {
+ this.parentTableName = parentTableName;
+ }
+}
Property changes on: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/RelationshipImpl.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/connection/SalesforceConnection.java
===================================================================
--- trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/connection/SalesforceConnection.java 2010-01-22 15:35:49 UTC (rev 1773)
+++ trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/connection/SalesforceConnection.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -47,6 +47,8 @@
import com.metamatrix.connector.salesforce.execution.QueryExecutionImpl;
import com.metamatrix.connector.salesforce.execution.UpdateExecutionImpl;
import com.metamatrix.connector.salesforce.execution.UpdatedResult;
+import com.sforce.soap.partner.DescribeGlobalResult;
+import com.sforce.soap.partner.DescribeSObjectResult;
import com.sforce.soap.partner.QueryResult;
import com.sforce.soap.partner.sobject.SObject;
@@ -176,4 +178,12 @@
objects, objects.length);
return result;
}
+
+ public DescribeGlobalResult getObjects() throws ConnectorException {
+ return connection.getObjects();
+ }
+
+ public DescribeSObjectResult getObjectMetaData(String objectName) throws ConnectorException {
+ return connection.getObjectMetaData(objectName);
+ }
}
Modified: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/connection/impl/ConnectionImpl.java
===================================================================
--- trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/connection/impl/ConnectionImpl.java 2010-01-22 15:35:49 UTC (rev 1773)
+++ trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/connection/impl/ConnectionImpl.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -48,6 +48,8 @@
import com.sforce.soap.partner.CallOptions;
import com.sforce.soap.partner.DeleteResult;
import com.sforce.soap.partner.DeletedRecord;
+import com.sforce.soap.partner.DescribeGlobalResult;
+import com.sforce.soap.partner.DescribeSObjectResult;
import com.sforce.soap.partner.GetDeletedResult;
import com.sforce.soap.partner.GetUpdatedResult;
import com.sforce.soap.partner.LoginResult;
@@ -340,4 +342,26 @@
throw new ConnectorException(e, e.getMessage());
}
}
+
+ public DescribeGlobalResult getObjects() throws ConnectorException {
+ try {
+ return binding.describeGlobal();
+ } catch (RemoteException e) {
+ ConnectorException ce = new ConnectorException(e.getCause().getMessage());
+ ce.initCause(e.getCause());
+ throw ce;
+ }
+ }
+
+ public DescribeSObjectResult getObjectMetaData(String objectName) throws ConnectorException {
+ try {
+ return binding.describeSObject(objectName);
+ } catch (InvalidSObjectFault e) {
+ throw new ConnectorException(e.getExceptionMessage());
+ } catch (UnexpectedErrorFault e) {
+ throw new ConnectorException(e.getMessage());
+ } catch (RemoteException e) {
+ throw new ConnectorException(e, e.getMessage());
+ }
+ }
}
Modified: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/execution/visitors/InsertVisitor.java
===================================================================
--- trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/execution/visitors/InsertVisitor.java 2010-01-22 15:35:49 UTC (rev 1773)
+++ trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/execution/visitors/InsertVisitor.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -64,7 +64,10 @@
String val;
if(value instanceof ILiteral) {
ILiteral literalValue = (ILiteral)value;
- val = this.stripQutes(literalValue.getValue().toString());
+ val = literalValue.getValue().toString();
+ if(null != val && !val.isEmpty()) {
+ val = this.stripQutes(val);
+ }
} else {
val = value.toString();
}
Modified: trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/execution/visitors/SelectVisitor.java
===================================================================
--- trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/execution/visitors/SelectVisitor.java 2010-01-22 15:35:49 UTC (rev 1773)
+++ trunk/connectors/connector-salesforce/src/main/java/com/metamatrix/connector/salesforce/execution/visitors/SelectVisitor.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -51,7 +51,7 @@
private int idIndex = -1; // index of the ID select symbol.
protected List<ISelectSymbol> selectSymbols;
protected StringBuffer limitClause = new StringBuffer();
- private Boolean supportsRetrieve;
+ private Boolean objectSupportsRetrieve;
public SelectVisitor(RuntimeMetadata metadata) {
super(metadata);
@@ -113,7 +113,7 @@
if(fromItem instanceof IGroup) {
table = ((IGroup)fromItem).getMetadataObject();
String supportsQuery = (String)table.getProperties().get(Constants.SUPPORTS_QUERY);
- supportsRetrieve = Boolean.valueOf((String)table.getProperties().get(Constants.SUPPORTS_RETRIEVE));
+ objectSupportsRetrieve = Boolean.valueOf((String)table.getProperties().get(Constants.SUPPORTS_RETRIEVE));
if (!Boolean.valueOf(supportsQuery)) {
throw new ConnectorException(table.getNameInSource() + " "
+ Messages.getString("CriteriaVisitor.query.not.supported"));
@@ -230,7 +230,7 @@
}
public boolean canRetrieve() {
- return supportsRetrieve && hasOnlyIDCriteria();
+ return objectSupportsRetrieve && hasOnlyIDCriteria();
}
}
Modified: trunk/connectors/sandbox/connector-object/src/main/java/com/metamatrix/connector/object/util/ObjectExecutionHelper.java
===================================================================
--- trunk/connectors/sandbox/connector-object/src/main/java/com/metamatrix/connector/object/util/ObjectExecutionHelper.java 2010-01-22 15:35:49 UTC (rev 1773)
+++ trunk/connectors/sandbox/connector-object/src/main/java/com/metamatrix/connector/object/util/ObjectExecutionHelper.java 2010-01-22 17:24:27 UTC (rev 1774)
@@ -47,7 +47,7 @@
/**
*/
-public class ObjectExecutionHelper implements SQLReservedWords {
+public class ObjectExecutionHelper {
private static final String ESCAPED_QUOTE = "''"; //$NON-NLS-1$
private static final TimeZone LOCAL_TIME_ZONE = TimeZone.getDefault();
@@ -58,7 +58,7 @@
* @return a SQL-safe string
*/
protected String escapeString(String str) {
- return StringUtil.replaceAll(str, QUOTE, ESCAPED_QUOTE);
+ return StringUtil.replaceAll(str, SQLReservedWords.QUOTE, ESCAPED_QUOTE);
}
/**
14 years, 11 months
teiid SVN: r1773 - trunk/client-jdbc/src/main/java/com/metamatrix/jdbc.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2010-01-22 10:35:49 -0500 (Fri, 22 Jan 2010)
New Revision: 1773
Modified:
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java
Log:
TEIID-916 fix for timeout during nextBatch.
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java 2010-01-22 14:34:36 UTC (rev 1772)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java 2010-01-22 15:35:49 UTC (rev 1773)
@@ -46,6 +46,8 @@
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import com.metamatrix.api.exception.MetaMatrixComponentException;
@@ -365,7 +367,11 @@
checkClosed();
try {
ResultsFuture<ResultsMessage> results = statement.getDQP().processCursorRequest(requestID, beginRow, fetchSize);
- ResultsMessage currentResultMsg = results.get();
+ int timeoutSeconds = statement.getQueryTimeout();
+ if (timeoutSeconds == 0) {
+ timeoutSeconds = Integer.MAX_VALUE;
+ }
+ ResultsMessage currentResultMsg = results.get(timeoutSeconds, TimeUnit.SECONDS);
this.setResultsData(currentResultMsg);
this.updatedPlanDescription = currentResultMsg.getPlanDescription();
return getCurrentBatch(currentResultMsg);
@@ -375,6 +381,8 @@
throw MMSQLException.create(e);
} catch (ExecutionException e) {
throw MMSQLException.create(e);
+ } catch (TimeoutException e) {
+ throw MMSQLException.create(e);
}
}
14 years, 11 months