Author: shawkins
Date: 2010-02-23 11:07:13 -0500 (Tue, 23 Feb 2010)
New Revision: 1867
Added:
branches/JCA/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java
branches/JCA/client/src/main/java/org/teiid/transport/ClientServiceRegistry.java
branches/JCA/common-internal/src/main/java/org/teiid/SecurityHelper.java
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/FakeWorkManager.java
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/TestStatsCapturingWorkManager.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/ContainerServiceProvider.java
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/JBossSecurityHelper.java
branches/JCA/runtime/src/main/java/org/teiid/services/TeiidLoginContext.java
branches/JCA/runtime/src/main/java/org/teiid/transport/ClientServiceRegistryImpl.java
Removed:
branches/JCA/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java
branches/JCA/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPool.java
branches/JCA/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPoolFactory.java
branches/JCA/common-internal/src/main/java/org/teiid/ContainerHelper.java
branches/JCA/common-internal/src/main/java/org/teiid/ContainerUtil.java
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java
branches/JCA/engine/src/main/java/com/metamatrix/common/comm/
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeWorkManager.java
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/JBossContainerHelper.java
branches/JCA/runtime/src/main/java/org/teiid/services/MembershipServiceImpl.java
Modified:
branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-engine.rar/META-INF/ra.xml
branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-jboss-beans.xml
branches/JCA/build/kit-jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMCallableStatement.java
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMConnection.java
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMPreparedStatement.java
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMPreparedStatement.java
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMResultSet.java
branches/JCA/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnection.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnectionFactory.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
branches/JCA/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java
branches/JCA/client/src/main/java/com/metamatrix/dqp/message/ResultsMessage.java
branches/JCA/client/src/main/java/org/teiid/adminapi/Request.java
branches/JCA/client/src/main/java/org/teiid/adminapi/impl/RequestMetadata.java
branches/JCA/client/src/main/java/org/teiid/adminapi/impl/RequestMetadataMapper.java
branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java
branches/JCA/client/src/test/java/com/metamatrix/dqp/message/TestRequestMessage.java
branches/JCA/common-internal/src/main/java/com/metamatrix/common/queue/StatsCapturingWorkManager.java
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/FakeWorkItem.java
branches/JCA/engine/src/main/java/com/metamatrix/dqp/message/AtomicRequestMessage.java
branches/JCA/engine/src/main/java/com/metamatrix/platform/security/api/service/SessionService.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
branches/JCA/engine/src/test/java/com/metamatrix/dqp/message/TestAtomicRequestMessage.java
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorStateManager.java
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectorBindingDeployer.java
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
branches/JCA/runtime/src/main/java/org/teiid/ConnectionInfo.java
branches/JCA/runtime/src/main/java/org/teiid/TeiidConnectionFactory.java
branches/JCA/runtime/src/main/java/org/teiid/TeiidManagedConnection.java
branches/JCA/runtime/src/main/java/org/teiid/services/SessionServiceImpl.java
branches/JCA/runtime/src/main/java/org/teiid/transport/LogonImpl.java
branches/JCA/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketListener.java
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketTransport.java
branches/JCA/runtime/src/test/java/org/teiid/services/TestMembershipServiceImpl.java
branches/JCA/runtime/src/test/java/org/teiid/services/TestSessionServiceImpl.java
branches/JCA/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
branches/JCA/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
Log:
TEIID-833 refactoring of transaction processing methods so that there is no need for the
server work pool (there is still a separate pool for admin calls). Also removed most
lookups and minimized the info in request/response messages.
Modified:
branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-engine.rar/META-INF/ra.xml
===================================================================
---
branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-engine.rar/META-INF/ra.xml 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-engine.rar/META-INF/ra.xml 2010-02-23
16:07:13 UTC (rev 1867)
@@ -59,11 +59,6 @@
<config-property-value>2000</config-property-value>
</config-property>
<config-property>
- <description>Plan debug messages allowed. see option
debug.</description>
-
<config-property-name>OptionDebugAllowed</config-property-name>
-
<config-property-type>java.lang.Boolean</config-property-type>
- </config-property>
- <config-property>
<description>Maximum allowed fetch size, set via JDBC. User
requested value ignored above this value. (default 20480)</description>
<config-property-name>MaxRowsFetchSize</config-property-name>
<config-property-type>java.lang.Integer</config-property-type>
Modified: branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-jboss-beans.xml 2010-02-22
18:48:13 UTC (rev 1866)
+++ branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-jboss-beans.xml 2010-02-23
16:07:13 UTC (rev 1867)
@@ -17,15 +17,7 @@
<interceptor-ref name="JndiAspect"/>
</bind>
- <bean name="ContainerHelper"
class="org.teiid.jboss.JBossContainerHelper">
-
<annotation>@org.jboss.aop.microcontainer.aspects.jndi.JndiBinding(name="teiid/container-helper")</annotation>
- <property name="VDBRepository"><inject
bean="VDBRepository"/></property>
- <property name="authorizationService"><inject
bean="AuthorizationService"/></property>
- <property name="sessionService"><inject
bean="SessionService"/></property>
- <property name="bufferService"><inject
bean="BufferService"/></property>
- <property name="connectorManagerRepository"><inject
bean="ConnectorManagerRepository"/></property>
- <property name="DQPManager"><inject
bean="DQPManager"/></property>
- </bean>
+ <bean name="SecurityHelper"
class="org.teiid.jboss.JBossSecurityHelper"/>
<!-- Teiid Services -->
<bean name="AuthorizationService"
class="org.teiid.services.AuthorizationServiceImpl">
@@ -35,6 +27,7 @@
<bean name="SessionService"
class="org.teiid.services.SessionServiceImpl">
<property name="VDBRepository"><inject
bean="VDBRepository"/></property>
+ <property name="securityHelper"><inject
bean="SecurityHelper"/></property>
<!-- Comma separated list of domains to be used -->
<property name="securityDomains">teiid-security</property>
<property
name="adminSecurityDomain">jmx-console</property>
@@ -47,12 +40,10 @@
<property name="cacheFactory"><inject
bean="TeiidCache"/></property>
<property name="useDisk">true</property>
<property
name="diskDirectory">${jboss.server.temp.dir}/teiid</property>
- <property name="bufferMemorySizeInMB">64</property>
- <!-- The max row count of a batch sent internally within the query processor.
Should be <= the connectorBatchSize. (default 256) -->
+ <!-- The max row count of a batch sent internally within the query processor.
Should be <= the connectorBatchSize. (default 512) -->
<property name="processorBatchSize">512</property>
- <!-- The max row count of a batch from a connector. Should be even multiple of
processorBatchSize. (default 512) -->
+ <!-- The max row count of a batch from a connector. Should be even multiple of
processorBatchSize. (default 1024) -->
<property name="connectorBatchSize">1024</property>
- <property name="maxProcessingBatches">8</property>
<!--
The number of batch columns to allow in memory (default 16384).
This value should be set lower or higher depending on the available memory to
Teiid in the VM.
@@ -60,20 +51,28 @@
-->
<property name="maxReserveBatchColumns">16384</property>
<!--
- The number of batch columns guarenteed to a processing operation. Set this
value lower if the workload typically
+ The number of batch columns guaranteed to a processing operation. Set this
value lower if the workload typically
processes larger numbers of concurrent queries with large intermediate
results from operations such as sorting,
grouping, etc. (default 124)
-->
<property
name="maxProcessingBatchesColumns">128</property>
- <!-- Max File size in GB -->
- <property name="maxFileSize">2</property>
+ <!-- Max File size in MB -->
+ <property name="maxFileSize">2024</property>
<property name="maxOpenFiles">256</property>
</bean>
<bean name="RuntimeEngineDeployer"
class="org.teiid.jboss.deployers.RuntimeEngineDeployer">
+
<annotation>@org.jboss.aop.microcontainer.aspects.jndi.JndiBinding(name="teiid/engine-deployer")</annotation>
+ <property name="securityHelper"><inject
bean="ContainerHelper"/></property>
<property name="containerHelper"><inject
bean="ContainerHelper"/></property>
<property name="jdbcSocketConfiguration"><inject
bean="JdbcSocketConfiguration"/></property>
<property name="adminSocketConfiguration"><inject
bean="AdminSocketConfiguration"/></property>
+ <property name="VDBRepository"><inject
bean="VDBRepository"/></property>
+ <property name="authorizationService"><inject
bean="AuthorizationService"/></property>
+ <property name="sessionService"><inject
bean="SessionService"/></property>
+ <property name="bufferService"><inject
bean="BufferService"/></property>
+ <property name="connectorManagerRepository"><inject
bean="ConnectorManagerRepository"/></property>
+ <property name="DQPManager"><inject
bean="DQPManager"/></property>
</bean>
<bean name="JdbcSocketConfiguration"
class="org.teiid.transport.SocketConfiguration">
Modified:
branches/JCA/build/kit-jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml
===================================================================
---
branches/JCA/build/kit-jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/build/kit-jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml 2010-02-23
16:07:13 UTC (rev 1867)
@@ -50,6 +50,7 @@
</bean>
<bean name="ConnectorBindingDeployer"
class="org.teiid.jboss.deployers.ConnectorBindingDeployer">
+ <property name="securityHelper"><inject
bean="SecurityHelper"/></property>
<property name="connectorManagerRepository"><inject
bean="ConnectorManagerRepository"/></property>
<property name="managedObjectFactory"><inject
bean="ManagedObjectFactory"/></property>
</bean>
Modified:
branches/JCA/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -45,24 +45,21 @@
private BatchSerializer() {} // Uninstantiable
- private static final Map serializers = new HashMap(18, 1.0f);
+ private static ColumnSerializer defaultSerializer = new ColumnSerializer();
+
+ private static final Map<String, ColumnSerializer> serializers = new
HashMap<String, ColumnSerializer>();
static {
serializers.put(DataTypeManager.DefaultDataTypes.BIG_DECIMAL, new
BigDecimalColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.BIG_INTEGER, new
BigIntegerColumnSerializer());
- serializers.put(DataTypeManager.DefaultDataTypes.BLOB, new
ObjectColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.BOOLEAN, new
BooleanColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.BYTE, new
ByteColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.CHAR, new
CharColumnSerializer());
- serializers.put(DataTypeManager.DefaultDataTypes.CLOB, new
ObjectColumnSerializer());
- serializers.put(DataTypeManager.DefaultDataTypes.XML, new
ObjectColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.DATE, new
DateColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.DOUBLE, new
DoubleColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.FLOAT, new
FloatColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.INTEGER, new
IntColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.LONG, new
LongColumnSerializer());
- serializers.put(DataTypeManager.DefaultDataTypes.OBJECT, new
ObjectColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.SHORT, new
ShortColumnSerializer());
- serializers.put(DataTypeManager.DefaultDataTypes.STRING, new
StringColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.TIME, new
TimeColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.TIMESTAMP, new
TimestampColumnSerializer());
}
@@ -123,19 +120,10 @@
}
/**
- * An interface representing a stateless serializer of a batch column
- * @since 4.2
- */
- private static interface ColumnSerializer {
- void writeColumn(ObjectOutput out, int col, List[] results) throws IOException;
- void readColumn(ObjectInput in, int col, List[] batch, byte[] isNullNuffer)
throws IOException, ClassNotFoundException;
- }
-
- /**
* An abstract serializer for native types
* @since 4.2
*/
- private static abstract class AbstractNativeColumnSerializer implements
ColumnSerializer {
+ private static class ColumnSerializer {
public void writeColumn(ObjectOutput out, int col, List[] batch) throws
IOException {
writeIsNullData(out, col, batch);
Object obj = null;
@@ -156,11 +144,15 @@
}
}
- protected abstract void writeObject(ObjectOutput out, Object obj) throws
IOException;
- protected abstract Object readObject(ObjectInput in) throws IOException;
+ protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+ out.writeObject(obj);
+ }
+ protected Object readObject(ObjectInput in) throws IOException,
ClassNotFoundException {
+ return in.readObject();
+ }
}
- private static class IntColumnSerializer extends AbstractNativeColumnSerializer {
+ private static class IntColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
out.writeInt(((Integer)obj).intValue());
}
@@ -169,7 +161,7 @@
}
}
- private static class LongColumnSerializer extends AbstractNativeColumnSerializer {
+ private static class LongColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
out.writeLong(((Long)obj).longValue());
}
@@ -178,7 +170,7 @@
}
}
- private static class FloatColumnSerializer extends AbstractNativeColumnSerializer {
+ private static class FloatColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
out.writeFloat(((Float)obj).floatValue());
}
@@ -187,7 +179,7 @@
}
}
- private static class DoubleColumnSerializer extends AbstractNativeColumnSerializer {
+ private static class DoubleColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
out.writeDouble(((Double)obj).doubleValue());
}
@@ -196,7 +188,7 @@
}
}
- private static class ShortColumnSerializer extends AbstractNativeColumnSerializer {
+ private static class ShortColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
out.writeShort(((Short)obj).shortValue());
}
@@ -205,7 +197,7 @@
}
}
- private static class BooleanColumnSerializer implements ColumnSerializer {
+ private static class BooleanColumnSerializer extends ColumnSerializer {
/* This implementation compacts the isNull and boolean data for non-null values
into a byte[]
* by using a 8 bit mask that is bit-shifted to mask each value.
*/
@@ -270,7 +262,7 @@
}
}
- private static class ByteColumnSerializer extends AbstractNativeColumnSerializer {
+ private static class ByteColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
out.writeByte(((Byte)obj).byteValue());
}
@@ -279,7 +271,7 @@
}
}
- private static class CharColumnSerializer extends AbstractNativeColumnSerializer {
+ private static class CharColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
out.writeChar(((Character)obj).charValue());
}
@@ -288,61 +280,8 @@
}
}
- private static class StringColumnSerializer extends AbstractNativeColumnSerializer {
- /*
- * This implementation writes single-byte chars until it reaches a non-ascii char
in the string,
- * at which point it starts writing two-byte characters. This implementation
never writes more
- * than two bytes per char.
- */
+ private static class BigIntegerColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
- String val = (String)obj;
- int length = val.length();
- out.writeInt(length);
- boolean writingShort = true;
- char c;
- for (int i = 0 ; i < length; i++) {
- if (writingShort) {
- /* charAt() simply gets the char out of the underlying array. The
assumption is that this would be quicker
- * calling getChars() which makes a copy of the underlying char[].
- */
- c = val.charAt(i);
- if (c < 0x80) {
- out.write(c);
- } else {
- out.write(0x80);
- writingShort = false;
- out.writeChar(c);
- }
- } else {
- out.writeChar(val.charAt(i));
- }
- }
- }
- protected Object readObject(ObjectInput in) throws IOException {
- int b;
- boolean readingShort;
- int length = in.readInt();
- char[] chars = new char[length];
- readingShort = true;
- for (int i = 0; i < length; i++) {
- if (readingShort) {
- b = in.read();
- if (b == 0x80) {
- readingShort = false;
- chars[i] = in.readChar();
- } else {
- chars[i] = ((char)b);
- }
- } else {
- chars[i] = in.readChar();
- }
- }
- return new String(chars);
- }
- }
-
- private static class BigIntegerColumnSerializer extends
AbstractNativeColumnSerializer {
- protected void writeObject(ObjectOutput out, Object obj) throws IOException {
BigInteger val = (BigInteger)obj;
byte[] bytes = val.toByteArray();
out.writeInt(bytes.length);
@@ -356,7 +295,7 @@
}
}
- private static class BigDecimalColumnSerializer extends
AbstractNativeColumnSerializer {
+ private static class BigDecimalColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
BigDecimal val = (BigDecimal)obj;
out.writeInt(val.scale());
@@ -374,7 +313,7 @@
}
}
- private static class DateColumnSerializer extends AbstractNativeColumnSerializer {
+ private static class DateColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
out.writeLong(((java.sql.Date)obj).getTime());
}
@@ -383,7 +322,7 @@
}
}
- private static class TimeColumnSerializer extends AbstractNativeColumnSerializer {
+ private static class TimeColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
out.writeLong(((Time)obj).getTime());
}
@@ -392,7 +331,7 @@
}
}
- private static class TimestampColumnSerializer extends AbstractNativeColumnSerializer
{
+ private static class TimestampColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
Timestamp ts = (Timestamp)obj;
out.writeLong(ts.getTime());
@@ -404,24 +343,12 @@
return ts;
}
}
-
- private static class ObjectColumnSerializer implements ColumnSerializer {
- public void writeColumn(ObjectOutput out, int col, List[] results) throws
IOException {
- for (int i = 0; i < results.length; i++) {
- out.writeObject(results[i].get(col));
- }
- }
- public void readColumn(ObjectInput in, int col, List[] batch, byte[] isNull)
throws IOException, ClassNotFoundException {
- for (int i = 0; i < batch.length; i++) {
- batch[i].set(col, in.readObject());
- }
- }
- }
-
private static ColumnSerializer getSerializer(String type) {
- ColumnSerializer cs = (ColumnSerializer)serializers.get((type == null) ?
DataTypeManager.DefaultDataTypes.OBJECT : type);
- assert cs != null;
+ ColumnSerializer cs = serializers.get((type == null) ?
DataTypeManager.DefaultDataTypes.OBJECT : type);
+ if (cs == null) {
+ return defaultSerializer;
+ }
return cs;
}
Modified:
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnection.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnection.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnection.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -28,6 +28,7 @@
public interface ServerConnection {
public static final int PING_INTERVAL = 120000;
+ public static final String LOCAL_CONNECTION = "localConnection";
<T> T getService(Class<T> iface);
Modified:
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnectionFactory.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnectionFactory.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnectionFactory.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -39,9 +39,4 @@
* problems with the connection properties (bad user name, bad password, bad host
name, etc)
*/
ServerConnection getConnection(Properties connectionProperties) throws
CommunicationException, ConnectionException;
-
- <T> T getService(Class<T> clazz);
-
- <T> void registerClientService(Class<T> type, T instance, String
loggingContext);
- <T> String getLoggingContextForService(Class<T> type);
}
Modified:
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -287,15 +287,4 @@
this.maxCachedInstances = maxCachedInstances;
}
- @Override
- public <T> T getService(Class<T> clazz) {
- return null;
- }
- @Override
- public <T> void registerClientService(Class<T> type, T instance, String
loggingContext){
- }
- @Override
- public <T> String getLoggingContextForService(Class<T> type) {
- return null;
- }
}
Deleted: branches/JCA/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -1,85 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.dqp.client;
-
-import java.util.List;
-
-import javax.transaction.xa.Xid;
-
-import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.api.exception.MetaMatrixProcessingException;
-import com.metamatrix.api.exception.query.QueryMetadataException;
-import com.metamatrix.common.lob.LobChunk;
-import com.metamatrix.common.xa.MMXid;
-import com.metamatrix.common.xa.XATransactionException;
-import com.metamatrix.dqp.message.RequestMessage;
-import com.metamatrix.dqp.message.ResultsMessage;
-
-public interface ClientSideDQP {
-
- ResultsFuture<ResultsMessage> executeRequest(long reqID, RequestMessage message)
throws MetaMatrixProcessingException, MetaMatrixComponentException;
-
- ResultsFuture<ResultsMessage> processCursorRequest(long reqID, int batchFirst, int
fetchSize) throws MetaMatrixProcessingException;
-
- ResultsFuture<?> closeRequest(long requestID) throws
MetaMatrixProcessingException, MetaMatrixComponentException;
-
- boolean cancelRequest(long requestID) throws MetaMatrixProcessingException,
MetaMatrixComponentException;
-
- ResultsFuture<?> closeLobChunkStream(int lobRequestId, long requestId, String
streamId) throws MetaMatrixProcessingException, MetaMatrixComponentException;
-
- ResultsFuture<LobChunk> requestNextLobChunk(int lobRequestId, long requestId,
String streamId) throws MetaMatrixProcessingException, MetaMatrixComponentException;
-
- List getXmlSchemas(String docName) throws MetaMatrixComponentException,
QueryMetadataException;
-
- MetadataResult getMetadata(long requestID) throws MetaMatrixComponentException,
MetaMatrixProcessingException;
-
- MetadataResult getMetadata(long requestID, String preparedSql, boolean
allowDoubleQuotedVariable) throws MetaMatrixComponentException,
MetaMatrixProcessingException;
-
- void terminateSession() throws MetaMatrixComponentException;
-
- // local transaction
- void begin() throws XATransactionException;
-
- void commit() throws XATransactionException;
-
- void rollback() throws XATransactionException;
-
- // XA
- int prepare(MMXid xid) throws XATransactionException;
-
- void commit(MMXid xid, boolean onePhase) throws XATransactionException;
-
- void rollback(MMXid xid) throws XATransactionException;
-
- Xid[] recover(int flag) throws XATransactionException;
-
- void forget(MMXid xid) throws XATransactionException;
-
- void start(MMXid xid,
- int flags,
- int timeout) throws XATransactionException;
-
- void end(MMXid xid,
- int flags) throws XATransactionException;
-
-}
Added: branches/JCA/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java
===================================================================
--- branches/JCA/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java
(rev 0)
+++
branches/JCA/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.dqp.client;
+
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.api.exception.MetaMatrixProcessingException;
+import com.metamatrix.api.exception.query.QueryMetadataException;
+import com.metamatrix.common.lob.LobChunk;
+import com.metamatrix.common.xa.MMXid;
+import com.metamatrix.common.xa.XATransactionException;
+import com.metamatrix.dqp.message.RequestMessage;
+import com.metamatrix.dqp.message.ResultsMessage;
+
+public interface ClientSideDQP {
+
+ ResultsFuture<ResultsMessage> executeRequest(long reqID, RequestMessage message)
throws MetaMatrixProcessingException, MetaMatrixComponentException;
+
+ ResultsFuture<ResultsMessage> processCursorRequest(long reqID, int batchFirst, int
fetchSize) throws MetaMatrixProcessingException;
+
+ ResultsFuture<?> closeRequest(long requestID) throws
MetaMatrixProcessingException, MetaMatrixComponentException;
+
+ boolean cancelRequest(long requestID) throws MetaMatrixProcessingException,
MetaMatrixComponentException;
+
+ ResultsFuture<?> closeLobChunkStream(int lobRequestId, long requestId, String
streamId) throws MetaMatrixProcessingException, MetaMatrixComponentException;
+
+ ResultsFuture<LobChunk> requestNextLobChunk(int lobRequestId, long requestId,
String streamId) throws MetaMatrixProcessingException, MetaMatrixComponentException;
+
+ List<String> getXmlSchemas(String docName) throws MetaMatrixComponentException,
QueryMetadataException;
+
+ MetadataResult getMetadata(long requestID) throws MetaMatrixComponentException,
MetaMatrixProcessingException;
+
+ MetadataResult getMetadata(long requestID, String preparedSql, boolean
allowDoubleQuotedVariable) throws MetaMatrixComponentException,
MetaMatrixProcessingException;
+
+ // local transaction
+
+ ResultsFuture<?> begin() throws XATransactionException;
+
+ ResultsFuture<?> commit() throws XATransactionException;
+
+ ResultsFuture<?> rollback() throws XATransactionException;
+
+ // XA
+
+ ResultsFuture<?> start(MMXid xid,
+ int flags,
+ int timeout) throws XATransactionException;
+
+ ResultsFuture<?> end(MMXid xid,
+ int flags) throws XATransactionException;
+
+ ResultsFuture<Integer> prepare(MMXid xid) throws XATransactionException;
+
+ ResultsFuture<?> commit(MMXid xid, boolean onePhase) throws
XATransactionException;
+
+ ResultsFuture<?> rollback(MMXid xid) throws XATransactionException;
+
+ ResultsFuture<?> forget(MMXid xid) throws XATransactionException;
+
+ ResultsFuture<Xid[]> recover(int flag) throws XATransactionException;
+
+}
Modified:
branches/JCA/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -22,58 +22,54 @@
package com.metamatrix.dqp.message;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Date;
import java.util.List;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.comm.CommonCommPlugin;
+import com.metamatrix.core.util.ExternalizeUtil;
import com.metamatrix.jdbc.api.ExecutionProperties;
/**
* Request Message, used by MMXStatement for submitting queries.
*/
-public class RequestMessage implements Serializable {
+public class RequestMessage implements Externalizable {
- static final long serialVersionUID = 2258063872049251854L;
-
public static final int DEFAULT_FETCH_SIZE = 2048;
+
+ public enum StatementType {
+ PREPARED, CALLABLE, STATEMENT
+ }
+
+ public enum ResultsMode {
+ RESULTSET, UPDATECOUNT, EITHER
+ }
private String[] commands;
private boolean isBatchedUpdate;
private int fetchSize = DEFAULT_FETCH_SIZE;
private int cursorType;
private boolean partialResultsFlag;
- private boolean isPreparedStatement;
- private boolean isCallableStatement;
- private boolean isPreparedBatchUpdate;
- private List parameterValues;
+ private StatementType statementType = StatementType.STATEMENT;
+ private List<?> parameterValues;
private boolean validationMode;
private String txnAutoWrapMode;
private String XMLFormat;
private String styleSheet;
- private Boolean requireResultSet;
-
- /**The time when the command was created by the client.*/
- private Date submittedTimestamp;
-
- /**The time when command begins processing on the server.*/
- private Date processingTimestamp;
-
+ private ResultsMode resultsMode = ResultsMode.EITHER;
//whether to use ResultSet cache if there is one
private boolean useResultSetCache;
-
// Treat the double quoted strings as variables in the command
private boolean ansiQuotedIdentifiers = true;
-
- private boolean showPlan = false;
-
+ private boolean showPlan;
private int rowLimit;
-
private Serializable executionPayload;
-
private long executionId;
public RequestMessage() {
@@ -110,34 +106,24 @@
* @return True if this request includes a prepared statement.
*/
public boolean isPreparedStatement() {
- return isPreparedStatement;
+ return this.statementType == StatementType.PREPARED;
}
/**
* @return True if this request includes a callable statement.
*/
public boolean isCallableStatement() {
- return isCallableStatement;
+ return this.statementType == StatementType.CALLABLE;
}
-
- /**
- * @param isPreparedStatement
- */
- public void setPreparedStatement(boolean isPreparedStatement) {
- this.isPreparedStatement = isPreparedStatement;
- }
-
- /**
- * @param isCallableStatement
- */
- public void setCallableStatement(boolean isCallableStatement) {
- this.isCallableStatement = isCallableStatement;
- }
-
- /**
+
+ public void setStatementType(StatementType statementType) {
+ this.statementType = statementType;
+ }
+
+ /**
* @return A list of parameter values. May be null.
*/
- public List getParameterValues() {
+ public List<?> getParameterValues() {
if (parameterValues == null) {
return Collections.EMPTY_LIST;
}
@@ -147,7 +133,7 @@
/**
* @param values
*/
- public void setParameterValues(List values) {
+ public void setParameterValues(List<?> values) {
parameterValues = values;
}
@@ -238,55 +224,6 @@
this.styleSheet = styleSheet;
}
- /**
- * Get time that the time when the command was created by the client.
- * @return timestamp in millis
- */
- public Date getSubmittedTimestamp() {
- return submittedTimestamp;
- }
-
- /**
- * Set time that the time when the command was created by the client.
- * NOTE: By default, this gets set to the current time by the constructor.
- * @param submittedTimestamp Time submitted to server.
- */
- public void setSubmittedTimestamp(Date submittedTimestamp) {
- this.submittedTimestamp = submittedTimestamp;
- }
-
- /**
- * Start the clock on submission start - this should be called when the request is
originally created.
- */
- public void markSubmissionStart() {
- setSubmittedTimestamp(new Date());
- }
-
-
- /**
- * Get time that the request was assigned a unique ID by the server.
- * @return timestamp in millis
- */
- public Date getProcessingTimestamp() {
- return processingTimestamp;
- }
-
- /**
- * Set time that the request is submitted on the server.
- * @param processingTimestamp Time submitted to server.
- */
- public void setProcessingTimestamp(Date processingTimestamp) {
- this.processingTimestamp = processingTimestamp;
- }
-
- /**
- * Start the clock on processing times - this should be called when the query
- * hits the QueryService or SubscriptionService.
- */
- public void markProcessingStart() {
- setProcessingTimestamp(new Date());
- }
-
public boolean useResultSetCache() {
//not use caching when there is a txn
return useResultSetCache;
@@ -354,14 +291,6 @@
this.commands = batchedCommands;
}
- public boolean isPreparedBatchUpdate() {
- return isPreparedBatchUpdate;
- }
-
- public void setPreparedBatchUpdate(boolean isPreparedBatchUpdate) {
- this.isPreparedBatchUpdate = isPreparedBatchUpdate;
- }
-
public void setExecutionPayload(Serializable executionPayload) {
this.executionPayload = executionPayload;
}
@@ -377,7 +306,7 @@
public void setExecutionId(long executionId) {
this.executionId = executionId;
}
-
+
public void setBatchedUpdate(boolean isBatchedUpdate) {
this.isBatchedUpdate = isBatchedUpdate;
}
@@ -385,13 +314,58 @@
public boolean isBatchedUpdate() {
return isBatchedUpdate;
}
-
- public Boolean getRequireResultSet() {
- return requireResultSet;
+
+ public ResultsMode getResultsMode() {
+ return resultsMode;
}
- public void setRequireResultSet(Boolean requireResultSet) {
- this.requireResultSet = requireResultSet;
+ public void setResultsMode(ResultsMode resultsMode) {
+ this.resultsMode = resultsMode;
}
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ this.commands = ExternalizeUtil.readStringArray(in);
+ this.isBatchedUpdate = in.readBoolean();
+ this.fetchSize = in.readInt();
+ this.cursorType = in.readInt();
+ this.partialResultsFlag = in.readBoolean();
+ this.statementType = StatementType.values()[in.readByte()];
+ this.parameterValues = ExternalizeUtil.readList(in);
+ this.validationMode = in.readBoolean();
+ this.txnAutoWrapMode = (String)in.readObject();
+ this.XMLFormat = (String)in.readObject();
+ this.styleSheet = (String)in.readObject();
+ this.resultsMode = ResultsMode.values()[in.readByte()];
+ this.useResultSetCache = in.readBoolean();
+ this.ansiQuotedIdentifiers = in.readBoolean();
+ this.showPlan = in.readBoolean();
+ this.rowLimit = in.readInt();
+ this.executionPayload = (Serializable)in.readObject();
+ this.executionId = in.readLong();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ ExternalizeUtil.writeArray(out, commands);
+ out.writeBoolean(isBatchedUpdate);
+ out.writeInt(fetchSize);
+ out.writeInt(cursorType);
+ out.writeBoolean(partialResultsFlag);
+ out.writeByte(statementType.ordinal());
+ ExternalizeUtil.writeList(out, parameterValues);
+ out.writeBoolean(validationMode);
+ out.writeObject(txnAutoWrapMode);
+ out.writeObject(XMLFormat);
+ out.writeObject(styleSheet);
+ out.writeByte(resultsMode.ordinal());
+ out.writeBoolean(useResultSetCache);
+ out.writeBoolean(ansiQuotedIdentifiers);
+ out.writeBoolean(showPlan);
+ out.writeInt(rowLimit);
+ out.writeObject(executionPayload);
+ out.writeLong(executionId);
+ }
+
}
Modified:
branches/JCA/client/src/main/java/com/metamatrix/dqp/message/ResultsMessage.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/dqp/message/ResultsMessage.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client/src/main/java/com/metamatrix/dqp/message/ResultsMessage.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -28,7 +28,6 @@
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -69,12 +68,6 @@
/** The parameters of a Stored Procedure */
private List parameters;
- /** This object represents the time when command is submitted to the server. */
- private Date processingTimestamp;
-
- /** This object represents the time when results are produced on the server. */
- private Date completedTimestamp;
-
/** OPTION DEBUG log if OPTION DEBUG was used */
private String debugLog;
@@ -100,10 +93,6 @@
* @since 4.2
*/
public ResultsMessage(RequestMessage requestMsg){
- if(requestMsg != null){
- this.processingTimestamp = requestMsg.getProcessingTimestamp();
- this.completedTimestamp = new Date();
- }
this.results = new ArrayList[0];
}
@@ -236,13 +225,6 @@
parameters = list;
}
- public Date getProcessingTimestamp() {
- return this.processingTimestamp;
- }
-
- public Date getCompletedTimestamp() {
- return this.completedTimestamp;
- }
/**
* @param strings
*/
@@ -284,8 +266,6 @@
//Parameters
parameters = ExternalizeUtil.readList(in);
- processingTimestamp = (Date)in.readObject();
- completedTimestamp = (Date)in.readObject();
debugLog = (String)in.readObject();
annotations = (Collection)in.readObject();
isUpdateResult = in.readBoolean();
@@ -320,8 +300,6 @@
// Parameters
ExternalizeUtil.writeList(out, parameters);
- out.writeObject(processingTimestamp);
- out.writeObject(completedTimestamp);
out.writeObject(debugLog);
out.writeObject(annotations);
out.writeBoolean(isUpdateResult);
Modified: branches/JCA/client/src/main/java/org/teiid/adminapi/Request.java
===================================================================
--- branches/JCA/client/src/main/java/org/teiid/adminapi/Request.java 2010-02-22 18:48:13
UTC (rev 1866)
+++ branches/JCA/client/src/main/java/org/teiid/adminapi/Request.java 2010-02-23 16:07:13
UTC (rev 1867)
@@ -35,11 +35,6 @@
*/
public interface Request extends AdminObject {
- /**
- * @return time when the request was created
- */
- public long getCreatedTime();
-
/**
* Get the ExecutionId for a Request
* @return ExecutionId
Modified: branches/JCA/client/src/main/java/org/teiid/adminapi/impl/RequestMetadata.java
===================================================================
---
branches/JCA/client/src/main/java/org/teiid/adminapi/impl/RequestMetadata.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client/src/main/java/org/teiid/adminapi/impl/RequestMetadata.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -22,7 +22,6 @@
package org.teiid.adminapi.impl;
-import java.io.Serializable;
import java.util.Date;
import org.jboss.managed.api.annotation.ManagementProperty;
@@ -33,7 +32,7 @@
@MetaMapping(RequestMetadataMapper.class)
-public class RequestMetadata extends AdminObjectImpl implements Request, Serializable {
+public class RequestMetadata extends AdminObjectImpl implements Request {
private static final long serialVersionUID = -2779106368517784259L;
@@ -70,16 +69,6 @@
}
@Override
- @ManagementProperty(description="Time when request submitted",
readOnly=true)
- public long getCreatedTime() {
- return this.createdTime;
- }
-
- public void setCreatedTiime(long time) {
- this.createdTime = time;
- }
-
- @Override
@ManagementProperty(description="Processing time for the request",
readOnly=true)
public long getProcessingTime() {
return this.processTime;
Modified:
branches/JCA/client/src/main/java/org/teiid/adminapi/impl/RequestMetadataMapper.java
===================================================================
---
branches/JCA/client/src/main/java/org/teiid/adminapi/impl/RequestMetadataMapper.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client/src/main/java/org/teiid/adminapi/impl/RequestMetadataMapper.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -71,7 +71,6 @@
request.set("executionId",
SimpleValueSupport.wrap(object.getExecutionId()));
request.set("sessionId", SimpleValueSupport.wrap(object.getSessionId()));
- request.set("createdTime",
SimpleValueSupport.wrap(object.getCreatedTime()));
request.set("processingTime",
SimpleValueSupport.wrap(object.getProcessingTime()));
request.set("command", SimpleValueSupport.wrap(object.getCommand()));
request.set("sourceRequest",
SimpleValueSupport.wrap(object.sourceRequest()));
@@ -94,7 +93,6 @@
RequestMetadata request = new RequestMetadata();
request.setExecutionId((Long)
metaValueFactory.unwrap(compositeValue.get("executionId")));
request.setSessionId((Long)
metaValueFactory.unwrap(compositeValue.get("sessionId")));
- request.setCreatedTiime((Long)
metaValueFactory.unwrap(compositeValue.get("createdTime")));
request.setProcessingTime((Long)
metaValueFactory.unwrap(compositeValue.get("processingTime")));
request.setCommand((String)
metaValueFactory.unwrap(compositeValue.get("command")));
request.setSourceRequest((Boolean)
metaValueFactory.unwrap(compositeValue.get("sourceRequest")));
Added: branches/JCA/client/src/main/java/org/teiid/transport/ClientServiceRegistry.java
===================================================================
--- branches/JCA/client/src/main/java/org/teiid/transport/ClientServiceRegistry.java
(rev 0)
+++
branches/JCA/client/src/main/java/org/teiid/transport/ClientServiceRegistry.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -0,0 +1,31 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.transport;
+
+import com.metamatrix.api.exception.ComponentNotFoundException;
+
+public interface ClientServiceRegistry {
+
+ <T> T getClientService(Class<T> iface) throws ComponentNotFoundException;
+
+}
Property changes on:
branches/JCA/client/src/main/java/org/teiid/transport/ClientServiceRegistry.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java
===================================================================
---
branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -40,7 +40,6 @@
import com.metamatrix.client.ExceptionUtil;
import com.metamatrix.common.comm.CommonCommPlugin;
import com.metamatrix.common.comm.api.ServerConnection;
-import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ConnectionException;
import com.metamatrix.common.comm.platform.CommPlatformPlugin;
@@ -49,7 +48,7 @@
import com.metamatrix.platform.security.api.SessionToken;
public class LocalServerConnection implements ServerConnection {
- private static final String TEIID_RUNTIME = "java:teiid/runtime-engine";
+ private static final String TEIID_RUNTIME = "java:teiid/engine-deployer";
private final LogonResult result;
private boolean shutdown;
@@ -60,7 +59,7 @@
public synchronized LogonResult authenticate(Properties connProps) throws
ConnectionException, CommunicationException {
try {
- connProps.setProperty("localConnection", "true");
+ connProps.setProperty(ServerConnection.LOCAL_CONNECTION,
Boolean.TRUE.toString());
LogonResult logonResult = this.getService(ILogon.class).logon(connProps);
return logonResult;
} catch (LogonException e) {
@@ -75,18 +74,16 @@
}
}
-
- @SuppressWarnings("unchecked")
public <T> T getService(final Class<T> iface) {
- return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]
{iface}, new InvocationHandler() {
+ return iface.cast(Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]
{iface}, new InvocationHandler() {
public Object invoke(Object arg0, Method arg1, Object[] arg2) throws Throwable {
if (!isOpen()) {
throw ExceptionUtil.convertException(arg1, new
MetaMatrixComponentException(CommonCommPlugin.Util.getString("LocalTransportHandler.Transport_shutdown")));
//$NON-NLS-1$
}
try {
- ServerConnectionFactory scf = lookup(TEIID_RUNTIME);
- T service = scf.getService(iface);
+ ClientServiceRegistry scf = lookup(TEIID_RUNTIME);
+ T service = scf.getClientService(iface);
if (!(iface.equals(ILogon.class))) {
SessionToken.setSession(result.getSessionToken());
@@ -99,8 +96,7 @@
SessionToken.setSession(null);
}
}
- });
-
+ }));
}
public boolean isOpen() {
@@ -145,7 +141,7 @@
return (conn instanceof LocalServerConnection);
}
- public static <T> T lookup(String jndiName) throws NamingException {
+ protected <T> T lookup(String jndiName) throws NamingException {
InitialContext ic = new InitialContext();
return (T)ic.lookup(jndiName);
}
Modified:
branches/JCA/client/src/test/java/com/metamatrix/dqp/message/TestRequestMessage.java
===================================================================
---
branches/JCA/client/src/test/java/com/metamatrix/dqp/message/TestRequestMessage.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client/src/test/java/com/metamatrix/dqp/message/TestRequestMessage.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -23,13 +23,13 @@
package com.metamatrix.dqp.message;
import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
import junit.framework.TestCase;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.core.util.UnitTestUtil;
+import com.metamatrix.dqp.message.RequestMessage.StatementType;
import com.metamatrix.jdbc.api.ExecutionProperties;
public class TestRequestMessage extends TestCase {
@@ -44,7 +44,7 @@
public static RequestMessage example() {
RequestMessage message = new RequestMessage();
- message.setCallableStatement(true);
+ message.setStatementType(StatementType.CALLABLE);
message.setFetchSize(100);
List params = new ArrayList();
params.add(new Integer(100));
@@ -54,9 +54,6 @@
message.setParameterValues(params);
message.setPartialResults(true);
- message.setPreparedStatement(false);
- message.setSubmittedTimestamp(new Date(11111111L));
- message.setProcessingTimestamp(new Date(12345678L));
message.setStyleSheet("myStyleSheet"); //$NON-NLS-1$
message.setExecutionPayload("myExecutionPayload"); //$NON-NLS-1$
try {
@@ -85,8 +82,6 @@
assertEquals(new Integer(400), copy.getParameterValues().get(3));
assertFalse(copy.isPreparedStatement());
- assertEquals(new Date(11111111L), copy.getSubmittedTimestamp());
- assertEquals(new Date(12345678L), copy.getProcessingTimestamp());
assertEquals("myStyleSheet", copy.getStyleSheet()); //$NON-NLS-1$
assertEquals("myExecutionPayload", copy.getExecutionPayload());
//$NON-NLS-1$
assertEquals(ExecutionProperties.TXN_WRAP_ON, copy.getTxnAutoWrapMode());
//$NON-NLS-1$
Modified:
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMCallableStatement.java
===================================================================
---
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMCallableStatement.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMCallableStatement.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -51,6 +51,8 @@
import com.metamatrix.common.util.SqlUtil;
import com.metamatrix.common.util.TimestampWithTimezone;
import com.metamatrix.dqp.message.RequestMessage;
+import com.metamatrix.dqp.message.RequestMessage.ResultsMode;
+import com.metamatrix.dqp.message.RequestMessage.StatementType;
/**
* <p> This class inherits Statement methods, which deal with SQL statements in
@@ -91,10 +93,9 @@
@Override
protected RequestMessage createRequestMessage(String[] commands,
- boolean isBatchedCommand, Boolean requiresResultSet) {
- RequestMessage message = super.createRequestMessage(commands, isBatchedCommand,
requiresResultSet);
- message.setCallableStatement(true);
- message.setPreparedStatement(false);
+ boolean isBatchedCommand, ResultsMode resultsMode) {
+ RequestMessage message = super.createRequestMessage(commands, isBatchedCommand,
resultsMode);
+ message.setStatementType(StatementType.CALLABLE);
return message;
}
Modified: branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMConnection.java
===================================================================
---
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMConnection.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMConnection.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -65,6 +65,7 @@
import com.metamatrix.common.xa.MMXid;
import com.metamatrix.common.xa.XATransactionException;
import com.metamatrix.dqp.client.ClientSideDQP;
+import com.metamatrix.dqp.client.ResultsFuture;
import com.metamatrix.jdbc.api.ExecutionProperties;
/**
@@ -347,8 +348,9 @@
private void directCommit() throws SQLException {
try {
- this.dqp.commit();
- } catch (XATransactionException e) {
+ ResultsFuture<?> future = this.dqp.commit();
+ future.get();
+ } catch (Exception e) {
throw MMSQLException.create(e);
}
logger.fine(JDBCPlugin.Util.getString("MMConnection.Commit_success"));
//$NON-NLS-1$
@@ -716,8 +718,9 @@
if (!autoCommitFlag) {
try {
try {
- this.dqp.rollback();
- } catch (XATransactionException e) {
+ ResultsFuture<?> future = this.dqp.rollback();
+ future.get();
+ } catch (Exception e) {
throw MMSQLException.create(e);
}
logger.fine(JDBCPlugin.Util.getString("MMConnection.Rollback_success"));
//$NON-NLS-1$
@@ -802,8 +805,9 @@
transactionXid = null;
this.autoCommitFlag = true;
try {
- this.dqp.commit(arg0, arg1);
- } catch (XATransactionException e) {
+ ResultsFuture<?> future = this.dqp.commit(arg0, arg1);
+ future.get();
+ } catch (Exception e) {
throw MMSQLException.create(e);
}
}
@@ -812,8 +816,9 @@
checkConnection();
this.autoCommitFlag = true;
try {
- this.dqp.end(arg0, arg1);
- } catch (XATransactionException e) {
+ ResultsFuture<?> future = this.dqp.end(arg0, arg1);
+ future.get();
+ } catch (Exception e) {
throw MMSQLException.create(e);
}
}
@@ -821,8 +826,9 @@
protected void forgetTransaction(MMXid arg0) throws SQLException {
checkConnection();
try {
- this.dqp.forget(arg0);
- } catch (XATransactionException e) {
+ ResultsFuture<?> future = this.dqp.forget(arg0);
+ future.get();
+ } catch (Exception e) {
throw MMSQLException.create(e);
}
}
@@ -831,8 +837,9 @@
checkConnection();
transactionXid = null;
try {
- return this.dqp.prepare(arg0);
- } catch (XATransactionException e) {
+ ResultsFuture<Integer> future = this.dqp.prepare(arg0);
+ return future.get();
+ } catch (Exception e) {
throw MMSQLException.create(e);
}
}
@@ -840,8 +847,9 @@
protected Xid[] recoverTransaction(int arg0) throws SQLException {
checkConnection();
try {
- return this.dqp.recover(arg0);
- } catch (XATransactionException e) {
+ ResultsFuture<Xid[]> future = this.dqp.recover(arg0);
+ return future.get();
+ } catch (Exception e) {
throw MMSQLException.create(e);
}
}
@@ -851,8 +859,9 @@
transactionXid = null;
this.autoCommitFlag = true;
try {
- this.dqp.rollback(arg0);
- } catch (XATransactionException e) {
+ ResultsFuture<?> future = this.dqp.rollback(arg0);
+ future.get();
+ } catch (Exception e) {
throw MMSQLException.create(e);
}
}
@@ -860,8 +869,9 @@
protected void startTransaction(MMXid arg0, int arg1, int timeout) throws
SQLException {
checkConnection();
try {
- this.dqp.start(arg0, arg1, timeout);
- } catch (XATransactionException e) {
+ ResultsFuture<?> future = this.dqp.start(arg0, arg1, timeout);
+ future.get();
+ } catch (Exception e) {
throw MMSQLException.create(e);
}
transactionXid = arg0;
Modified:
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMPreparedStatement.java
===================================================================
---
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMPreparedStatement.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMPreparedStatement.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -60,6 +60,8 @@
import com.metamatrix.core.util.ObjectConverterUtil;
import com.metamatrix.dqp.client.MetadataResult;
import com.metamatrix.dqp.message.RequestMessage;
+import com.metamatrix.dqp.message.RequestMessage.ResultsMode;
+import com.metamatrix.dqp.message.RequestMessage.StatementType;
import com.metamatrix.jdbc.api.ExecutionProperties;
/**
@@ -186,7 +188,7 @@
@Override
//## JDBC4.0-end ##
public boolean execute() throws SQLException {
- executeSql(new String[] {this.prepareSql}, false, null);
+ executeSql(new String[] {this.prepareSql}, false, ResultsMode.EITHER);
return hasResultSet();
}
@@ -196,7 +198,7 @@
return new int[0];
}
try{
- executeSql(new String[] {this.prepareSql}, true, false);
+ executeSql(new String[] {this.prepareSql}, true, ResultsMode.UPDATECOUNT);
}finally{
batchParameterList.clear();
}
@@ -207,7 +209,7 @@
@Override
//## JDBC4.0-end ##
public ResultSet executeQuery() throws SQLException {
- executeSql(new String[] {this.prepareSql}, false, true);
+ executeSql(new String[] {this.prepareSql}, false, ResultsMode.RESULTSET);
return resultSet;
}
@@ -215,17 +217,17 @@
@Override
//## JDBC4.0-end ##
public int executeUpdate() throws SQLException {
- executeSql(new String[] {this.prepareSql}, false, false);
+ executeSql(new String[] {this.prepareSql}, false, ResultsMode.UPDATECOUNT);
return this.updateCounts[0];
}
@Override
protected RequestMessage createRequestMessage(String[] commands,
- boolean isBatchedCommand, Boolean requiresResultSet) {
- RequestMessage message = super.createRequestMessage(commands, false,
requiresResultSet);
- message.setPreparedStatement(true);
+ boolean isBatchedCommand, ResultsMode resultsMode) {
+ RequestMessage message = super.createRequestMessage(commands, false, resultsMode);
+ message.setStatementType(StatementType.PREPARED);
message.setParameterValues(isBatchedCommand?getParameterValuesList():
getParameterValues());
- message.setPreparedBatchUpdate(isBatchedCommand);
+ message.setBatchedUpdate(isBatchedCommand);
return message;
}
Modified: branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java
===================================================================
--- branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java 2010-02-22
18:48:13 UTC (rev 1866)
+++ branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -126,7 +126,7 @@
this.statement = statement;
this.parameters = parameters;
// server latency-related timestamp
- this.processingTimestamp = resultsMsg.getProcessingTimestamp();
+ this.processingTimestamp = statement.getProcessingTimestamp();
this.requestID = statement.getCurrentRequestID();
this.cursorType = statement.getResultSetType();
this.batchResults = new BatchResults(this, getCurrentBatch(resultsMsg),
this.cursorType == ResultSet.TYPE_FORWARD_ONLY ? 1 : BatchResults.DEFAULT_SAVED_BATCHES);
@@ -1116,7 +1116,7 @@
}
protected void setResultsData(ResultsMessage resultsMsg) {
- this.completedTimestamp = resultsMsg.getCompletedTimestamp();
+ this.completedTimestamp = new java.util.Date();
this.statement.accumulateWarnings(resultsMsg.getWarnings());
}
@@ -1186,9 +1186,9 @@
}
/**
- * Gets the time command execution is compleated on the server.
+ * Gets the time command execution is completed on the server.
*
- * @return Date object representing time the commond finished execution.
+ * @return Date object representing time the command finished execution.
*/
public java.util.Date getCompletedTimestamp() throws SQLException {
checkClosed();
Modified: branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java
===================================================================
--- branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java 2010-02-22
18:48:13 UTC (rev 1866)
+++ branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -33,6 +33,7 @@
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -53,10 +54,12 @@
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.util.SqlUtil;
+import com.metamatrix.core.util.ObjectConverterUtil;
import com.metamatrix.dqp.client.ClientSideDQP;
import com.metamatrix.dqp.message.ParameterInfo;
import com.metamatrix.dqp.message.RequestMessage;
import com.metamatrix.dqp.message.ResultsMessage;
+import com.metamatrix.dqp.message.RequestMessage.ResultsMode;
import com.metamatrix.jdbc.api.Annotation;
import com.metamatrix.jdbc.api.ExecutionProperties;
import com.metamatrix.jdbc.api.PlanNode;
@@ -157,6 +160,8 @@
private Pattern setStatement =
Pattern.compile("\\s*set\\s*(\\w+)\\s*=\\s*(\\w*)", Pattern.CASE_INSENSITIVE);
//$NON-NLS-1$
+ private Date processingTimestamp;
+
/**
* Factory Constructor
* @param driverConnection
@@ -318,7 +323,7 @@
@Override
//## JDBC4.0-end ##
public boolean execute(String sql) throws SQLException {
- executeSql(new String[] {sql}, false, null);
+ executeSql(new String[] {sql}, false, ResultsMode.EITHER);
return hasResultSet();
}
@@ -330,7 +335,7 @@
return new int[0];
}
String[] commands = (String[])batchedUpdates.toArray(new
String[batchedUpdates.size()]);
- executeSql(commands, true, false);
+ executeSql(commands, true, ResultsMode.UPDATECOUNT);
return updateCounts;
}
@@ -338,7 +343,7 @@
@Override
//## JDBC4.0-end ##
public ResultSet executeQuery(String sql) throws SQLException {
- executeSql(new String[] {sql}, false, true);
+ executeSql(new String[] {sql}, false, ResultsMode.RESULTSET);
return resultSet;
}
@@ -347,7 +352,7 @@
//## JDBC4.0-end ##
public int executeUpdate(String sql) throws SQLException {
String[] commands = new String[] {sql};
- executeSql(commands, false, false);
+ executeSql(commands, false, ResultsMode.UPDATECOUNT);
return this.updateCounts[0];
}
@@ -402,13 +407,13 @@
resultSet.setMaxFieldSize(this.maxFieldSize);
}
- protected void executeSql(String[] commands, boolean isBatchedCommand, Boolean
requiresResultSet)
+ protected void executeSql(String[] commands, boolean isBatchedCommand, ResultsMode
resultsMode)
throws SQLException, MMSQLException {
checkStatement();
resetExecutionState();
//handle set statement
- if (commands.length == 1 && requiresResultSet != Boolean.TRUE) {
+ if (commands.length == 1 && resultsMode != ResultsMode.RESULTSET) {
Matcher match = setStatement.matcher(commands[0]);
if (match.matches()) {
String key = match.group(1);
@@ -419,7 +424,7 @@
}
}
- RequestMessage reqMessage = createRequestMessage(commands, isBatchedCommand,
requiresResultSet);
+ RequestMessage reqMessage = createRequestMessage(commands, isBatchedCommand,
resultsMode);
ResultsMessage resultsMsg = null;
try {
resultsMsg = sendRequestMessageAndWait(reqMessage);
@@ -466,11 +471,11 @@
}
protected RequestMessage createRequestMessage(String[] commands,
- boolean isBatchedCommand, Boolean requiresResultSet) {
+ boolean isBatchedCommand, ResultsMode resultsMode) {
RequestMessage reqMessage = new RequestMessage();
reqMessage.setCommands(commands);
reqMessage.setBatchedUpdate(isBatchedCommand);
- reqMessage.setRequireResultSet(requiresResultSet);
+ reqMessage.setResultsMode(resultsMode);
return reqMessage;
}
@@ -761,6 +766,15 @@
// Get result set cache mode
String rsCache =
getExecutionProperty(ExecutionProperties.RESULT_SET_CACHE_MODE);
res.setUseResultSetCache(Boolean.valueOf(rsCache).booleanValue());
+
+ res.setAnsiQuotedIdentifiers(Boolean.valueOf(
+ getExecutionProperty(ExecutionProperties.ANSI_QUOTED_IDENTIFIERS))
+ .booleanValue());
+ String sqlOptions = getExecutionProperty(ExecutionProperties.PROP_SQL_OPTIONS);
+ if (sqlOptions != null &&
+
sqlOptions.toUpperCase().indexOf(ExecutionProperties.SQL_OPTION_SHOWPLAN.toUpperCase())
>= 0) {
+ res.setShowPlan(true);
+ }
}
/**
@@ -821,34 +835,13 @@
/**
* Send out request message with necessary states.
- * @param transaction UsertTransaction
- * @param sql String of command or prepared string
- * @param listener Message Listener
- * @param timeout Maybe 0
- * @param isPreparedStatement flag indicating whether this statement is a
PreparedStatement
- * @param isCallableStatement flag indicating whether this statement is a
CallableStatement
- * @param params Parameters values of either PreparedStatement or CallableStatement
- * @param isBatchedCommand flag indicating whether the statements are being executed
as a batch
- * @throws SQLException
- * @throws TimeoutException
- * @throws InterruptedException
- * @throws CommunicationException
*/
protected ResultsMessage sendRequestMessageAndWait(RequestMessage reqMsg)
throws SQLException, InterruptedException, TimeoutException {
-
+ this.processingTimestamp = new Date();
this.currentRequestID = this.driverConnection.nextRequestID();
// Create a request message
- reqMsg.markSubmissionStart();
reqMsg.setExecutionPayload(this.payload);
- reqMsg.setAnsiQuotedIdentifiers(Boolean.valueOf(
- getExecutionProperty(ExecutionProperties.ANSI_QUOTED_IDENTIFIERS))
- .booleanValue());
- String sqlOptions = getExecutionProperty(ExecutionProperties.PROP_SQL_OPTIONS);
- if (sqlOptions != null &&
-
sqlOptions.toUpperCase().indexOf(ExecutionProperties.SQL_OPTION_SHOWPLAN.toUpperCase())
>= 0) {
- reqMsg.setShowPlan(true);
- }
reqMsg.setCursorType(this.resultSetType);
reqMsg.setFetchSize(this.fetchSize);
reqMsg.setStyleSheet(this.styleSheet);
@@ -905,23 +898,7 @@
* @throws IOException if unable to read the style sheet from the Reader object.
*/
public void attachStylesheet(Reader reader) throws IOException {
- BufferedReader bufferedReader = null;
- StringBuffer buffer = new StringBuffer();
- try {
- bufferedReader = new BufferedReader(reader);
- while(true) {
- String line = bufferedReader.readLine();
- if(line == null) {
- break;
- }
- buffer.append( line );
- }
- } finally {
- if(bufferedReader != null) {
- bufferedReader.close();
- }
- }
- this.styleSheet = buffer.toString();
+ this.styleSheet = ObjectConverterUtil.convertToString(reader);
}
/**
@@ -981,6 +958,10 @@
public Collection getAnnotations() {
return this.annotations;
}
+
+ public Date getProcessingTimestamp() {
+ return processingTimestamp;
+ }
public void setPartialResults(boolean isPartialResults){
if(isPartialResults){
Modified:
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java
===================================================================
---
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -32,7 +32,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
-import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
@@ -675,35 +674,6 @@
rs.close();
}
-
- /**
- * Case 4293 - timestamps for begin and end processing should both be set
- * server-side (from the same system clock)
- */
- @Test public void testProcessingTime() throws Exception {
- RequestMessage request = new RequestMessage();
- request.setProcessingTimestamp(new Date(12345678L));
- ResultsMessage resultsMsg = new ResultsMessage(request);
-
- // these two lines not important to the test
- resultsMsg.setColumnNames(new String[] { "IntNum" }); //$NON-NLS-1$
- resultsMsg.setDataTypes(new String[] { MMJDBCSQLTypeInfo.INTEGER });
-
- // expected results
- long expectedProcessingTime = resultsMsg.getCompletedTimestamp()
- .getTime()
- - resultsMsg.getProcessingTimestamp().getTime();
-
- // sleep for a couple milliseconds
- Thread.sleep(200);
-
- MMResultSet rs = new MMResultSet(resultsMsg, statement);
-
- long actualProcessingTime = rs.getProcessingTime();
-
- assertEquals(expectedProcessingTime, actualProcessingTime);
-
- }
/**
* 3 batches
@@ -850,7 +820,6 @@
private static ResultsMessage exampleResultsMsg4(int begin, int length, int fetchSize,
boolean lastBatch) {
RequestMessage request = new RequestMessage();
- request.setProcessingTimestamp(new Date(1L));
request.setExecutionId(REQUEST_ID);
ResultsMessage resultsMsg = new ResultsMessage(request);
List[] results = exampleResults1(length, begin);
@@ -879,7 +848,6 @@
@Test public void testDateType() throws SQLException {
RequestMessage request = new RequestMessage();
- request.setProcessingTimestamp(new Date(1L));
request.setExecutionId(REQUEST_ID);
ResultsMessage resultsMsg = new ResultsMessage(request);
resultsMsg.setResults(new List[] {Arrays.asList(new Timestamp(0))});
Modified:
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMPreparedStatement.java
===================================================================
---
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMPreparedStatement.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMPreparedStatement.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -41,6 +41,7 @@
import com.metamatrix.dqp.client.ResultsFuture;
import com.metamatrix.dqp.message.RequestMessage;
import com.metamatrix.dqp.message.ResultsMessage;
+import com.metamatrix.dqp.message.RequestMessage.ResultsMode;
import com.metamatrix.platform.security.api.LogonResult;
/**
@@ -113,8 +114,7 @@
// Now verify the statement's RequestMessage is what we expect
assertEquals("Command does not match", sqlCommand,
statement.requestMessage.getCommandString()); //$NON-NLS-1$
assertEquals("Parameter values do not match", expectedParameterValues,
statement.requestMessage.getParameterValues()); //$NON-NLS-1$
- assertTrue("RequestMessage.isPreparedBatchUpdate should be true",
statement.requestMessage.isPreparedBatchUpdate()); //$NON-NLS-1$
- assertFalse("RequestMessage.isBatchedUpdate should be false",
statement.requestMessage.isBatchedUpdate()); //$NON-NLS-1$
+ assertTrue("RequestMessage.isBatchedUpdate should be true",
statement.requestMessage.isBatchedUpdate()); //$NON-NLS-1$
assertFalse("RequestMessage.isCallableStatement should be false",
statement.requestMessage.isCallableStatement()); //$NON-NLS-1$
assertTrue("RequestMessage.isPreparedStatement should be true",
statement.requestMessage.isPreparedStatement()); //$NON-NLS-1$
}
@@ -323,9 +323,9 @@
public RequestMessage requestMessage;
@Override
protected RequestMessage createRequestMessage(String[] commands,
- boolean isBatchedCommand, Boolean requiresResultSet) {
+ boolean isBatchedCommand, ResultsMode resultsMode) {
this.requestMessage = super
- .createRequestMessage(commands, isBatchedCommand, requiresResultSet);
+ .createRequestMessage(commands, isBatchedCommand, resultsMode);
return this.requestMessage;
}
Modified: branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMResultSet.java
===================================================================
---
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMResultSet.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMResultSet.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -673,7 +673,7 @@
/** test getProcessingTime() -- include test for getProcessingTimestamp() and
getCompletedTimestamp() */
@Test public void testGetProcessingTime() throws SQLException {
MMResultSet cs = helpExecuteQuery();
- assertTrue(cs.getProcessingTime() == cs.getCompletedTimestamp().getTime() - 1);
+ assertNotNull(cs.getCompletedTimestamp());
cs.close();
}
Modified:
branches/JCA/common-internal/src/main/java/com/metamatrix/common/queue/StatsCapturingWorkManager.java
===================================================================
---
branches/JCA/common-internal/src/main/java/com/metamatrix/common/queue/StatsCapturingWorkManager.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/common-internal/src/main/java/com/metamatrix/common/queue/StatsCapturingWorkManager.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -22,17 +22,21 @@
package com.metamatrix.common.queue;
-import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.LinkedList;
-import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.resource.spi.work.ExecutionContext;
import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkEvent;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
@@ -44,8 +48,104 @@
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.util.LogConstants;
import com.metamatrix.core.log.MessageLevel;
+import com.metamatrix.core.util.NamedThreadFactory;
+/**
+ * StatsCapturingWorkManager acts as a wrapper to the passed in {@link WorkManager} to
+ * capture statistics and implement an unbounded queue of work.
+ */
public class StatsCapturingWorkManager {
+
+ private static class WorkContext {
+ ExecutionContext context;
+ long startTimeout;
+ long submitted = System.currentTimeMillis();
+
+ public WorkContext(ExecutionContext context, long startTimeout) {
+ this.context = context;
+ this.startTimeout = startTimeout;
+ }
+
+ long getStartTimeout() {
+ if (startTimeout == 0) {
+ return 0;
+ }
+ return Math.max(1, startTimeout + submitted - System.currentTimeMillis());
+ }
+
+ }
+
+ private final class WorkWrapper implements Work {
+ private final WorkManager delegate;
+ private final Work work;
+ private final WorkContext workContext;
+
+ private WorkWrapper(WorkManager delegate, Work work, WorkContext workContext) {
+ this.delegate = delegate;
+ this.work = work;
+ this.workContext = workContext;
+ }
+
+ @Override
+ public void run() {
+ Thread t = Thread.currentThread();
+ synchronized (poolLock) {
+ threads.add(t);
+ }
+ String name = t.getName();
+ t.setName(name + "_" + poolName + threadCounter.getAndIncrement());
//$NON-NLS-1$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_POOLING, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_POOLING, "Beginning work with virtual
worker", t.getName()); //$NON-NLS-1$
+ }
+ boolean success = false;
+ try {
+ work.run();
+ success = true;
+ } finally {
+ synchronized (poolLock) {
+ WorkWrapper next = null;
+ if (success) {
+ completedCount++;
+ next = queue.poll();
+ }
+ threads.remove(t);
+ if (next == null) {
+ activeCount--;
+ if (activeCount == 0 && terminated) {
+ poolLock.notifyAll();
+ }
+ } else {
+ try {
+ if (next.workContext == null) {
+ delegate.scheduleWork(next);
+ } else {
+ delegate.scheduleWork(next, next.workContext.getStartTimeout(),
next.workContext.context, next.work instanceof
WorkListener?(WorkListener)next.work:null);
+ }
+ } catch (WorkException e) {
+ handleException(next.work, e);
+ }
+ }
+ }
+ t.setName(name);
+ }
+ }
+
+ @Override
+ public void release() {
+ this.work.release();
+ }
+ }
+
+ private static void handleException(Work work, WorkException e) {
+ if (work instanceof WorkListener) {
+ ((WorkListener)work).workRejected(new WorkEvent(work, WorkEvent.WORK_REJECTED, work,
new WorkRejectedException(e)));
+ } else if (LogManager.isMessageToBeRecorded(LogConstants.CTX_POOLING,
MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_POOLING, e, "Exception adding work to the
WorkManager"); //$NON-NLS-1$
+ }
+ }
+
+ private static ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, new
NamedThreadFactory("Scheduler")); //$NON-NLS-1$
+
private volatile int activeCount;
private volatile int highestActiveCount;
private volatile int highestQueueSize;
@@ -56,104 +156,79 @@
private AtomicInteger threadCounter = new AtomicInteger();
private String poolName;
private int maximumPoolSize;
- private Queue<Work> queue = new LinkedList<Work>();
+ private Queue<WorkWrapper> queue = new LinkedList<WorkWrapper>();
private Set<Thread> threads =
Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<Thread,
Boolean>()));
-
+ private Map<Integer, ScheduledFuture<?>> futures = new HashMap<Integer,
ScheduledFuture<?>>();
+ private int idCounter;
+
public StatsCapturingWorkManager(String name, int maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
this.poolName = name;
}
- public void doWork(WorkManager delegate, Work arg0) throws WorkException {
- delegate.doWork(new StatuscapableWork(arg0));
+ public void scheduleWork(final WorkManager delegate, final Work arg0) throws
WorkException {
+ scheduleWork(delegate, arg0, (WorkContext)null);
}
- public void doWork(WorkManager delegate, Work arg0, long arg1, ExecutionContext arg2,
WorkListener arg3) throws WorkException {
- delegate.doWork(new StatuscapableWork(arg0), arg1, arg2, arg3);
- }
-
- public void scheduleWork(WorkManager delegate, Work arg0) throws WorkException {
- delegate.scheduleWork(new StatuscapableWork(arg0));
- }
-
- public void scheduleWork(WorkManager delegate, Work arg0, long arg1, ExecutionContext
arg2,WorkListener arg3) throws WorkException {
- delegate.scheduleWork(new StatuscapableWork(arg0), arg1, arg2, arg3);
- }
-
- public long startWork(WorkManager delegate,Work arg0) throws WorkException {
- return delegate.startWork(new StatuscapableWork(arg0));
- }
-
- public long startWork(WorkManager delegate, Work arg0, long arg1, ExecutionContext arg2,
WorkListener arg3) throws WorkException {
- return delegate.startWork(new StatuscapableWork(arg0), arg1, arg2, arg3);
- }
-
-
- class StatuscapableWork implements Work{
-
- public StatuscapableWork(Work work) throws WorkRejectedException {
- boolean atMaxThreads = false;
- boolean newMaxQueueSize = false;
- synchronized (poolLock) {
- checkForTermination();
- submittedCount++;
- atMaxThreads = activeCount == maximumPoolSize;
- queue.add(work);
- if (atMaxThreads) {
- int queueSize = queue.size();
- if (queueSize > highestQueueSize) {
- atMaxThreads = true;
- highestQueueSize = queueSize;
- }
- } else {
- activeCount++;
- highestActiveCount = Math.max(activeCount, highestActiveCount);
+ private void scheduleWork(final WorkManager delegate, final Work work, WorkContext
workContext)
+ throws WorkRejectedException, WorkException {
+ boolean atMaxThreads = false;
+ boolean newMaxQueueSize = false;
+ synchronized (poolLock) {
+ checkForTermination();
+ submittedCount++;
+ atMaxThreads = activeCount == maximumPoolSize;
+ if (atMaxThreads) {
+ queue.add(new WorkWrapper(delegate, work, workContext));
+ int queueSize = queue.size();
+ if (queueSize > highestQueueSize) {
+ newMaxQueueSize = true;
+ highestQueueSize = queueSize;
}
+ } else {
+ activeCount++;
+ highestActiveCount = Math.max(activeCount, highestActiveCount);
}
- if (atMaxThreads) {
- if (newMaxQueueSize && maximumPoolSize > 1) {
- throw new WorkRejectedException(
CommonPlugin.Util.getString("WorkerPool.Max_thread", maximumPoolSize, poolName,
highestQueueSize)); //$NON-NLS-1$
- }
- }
}
-
- @Override
- public void release() {
+ if (atMaxThreads) {
+ if (newMaxQueueSize && maximumPoolSize > 1) {
+ LogManager.logWarning(LogConstants.CTX_POOLING,
CommonPlugin.Util.getString("WorkerPool.Max_thread", maximumPoolSize, poolName,
highestQueueSize)); //$NON-NLS-1$
+ }
+ return;
}
+ if (workContext == null) {
+ delegate.scheduleWork(new WorkWrapper(delegate, work, null));
+ } else {
+ delegate.scheduleWork(new WorkWrapper(delegate, work, null),
workContext.getStartTimeout(), workContext.context, work instanceof
WorkListener?(WorkListener)work:null);
+ }
+ }
- @Override
- public void run() {
- Thread t = Thread.currentThread();
- threads.add(t);
- String name = t.getName();
- t.setName(name + "_" + poolName + threadCounter.getAndIncrement());
//$NON-NLS-1$
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_POOLING, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_POOLING, "Beginning work with virtual
worker", t.getName()); //$NON-NLS-1$
- }
-
- Work r = queue.poll();
- if (r != null) {
- boolean success = false;
- try {
- r.run();
- success = true;
- } finally {
- synchronized (poolLock) {
- if (success) {
- completedCount++;
+ public void scheduleWork(final WorkManager delegate, final Work arg0, final
ExecutionContext arg2, long delay) throws WorkException {
+ if (delay < 1) {
+ scheduleWork(delegate, arg0, new WorkContext(arg2, WorkManager.INDEFINITE));
+ } else {
+ synchronized (futures) {
+ final int id = idCounter++;
+ ScheduledFuture<?> sf = stpe.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ futures.remove(id);
+ scheduleWork(delegate, arg0, new WorkContext(arg2, WorkManager.INDEFINITE));
+ } catch (WorkException e) {
+ handleException(arg0, e);
}
- threads.remove(t);
- activeCount--;
}
- t.setName(name);
- }
- }
+ }, delay, TimeUnit.MILLISECONDS);
+ this.futures.put(id, sf);
+ }
}
}
-
+
private void checkForTermination() throws WorkRejectedException {
if (terminated) {
- throw new WorkRejectedException();
+ throw new WorkRejectedException("Queue has been terminated"); //$NON-NLS-1$
}
}
@@ -174,21 +249,40 @@
this.terminated = true;
}
- public List<Runnable> shutdownNow() {
+ public void shutdownNow() {
this.shutdown();
synchronized (poolLock) {
- synchronized (threads) {
- for (Thread t : threads) {
- t.interrupt();
- }
+ for (Thread t : threads) {
+ t.interrupt();
}
- List<Runnable> result = Collections.EMPTY_LIST;
- if (!queue.isEmpty()) {
- new ArrayList<Runnable>(queue);
- queue.clear();
+ queue.clear();
+ }
+ synchronized (futures) {
+ for (ScheduledFuture<?> future : futures.values()) {
+ future.cancel(true);
}
- return result;
+ futures.clear();
}
}
+
+ public boolean isTerminated() {
+ return terminated;
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long timeoutMillis = unit.toMillis(timeout);
+ long finalMillis = System.currentTimeMillis() + timeoutMillis;
+ synchronized (poolLock) {
+ while (this.activeCount > 0 || !terminated) {
+ if (timeoutMillis < 1) {
+ return false;
+ }
+ poolLock.wait(timeoutMillis);
+ timeoutMillis = finalMillis - System.currentTimeMillis();
+ }
+ }
+ return true;
+ }
}
Deleted:
branches/JCA/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPool.java
===================================================================
---
branches/JCA/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPool.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPool.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -1,56 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.queue;
-
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
-
-public interface WorkerPool extends Executor {
-
- void shutdown();
-
- List<Runnable> shutdownNow();
-
- boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException;
-
- boolean isTerminated();
-
- WorkerPoolStatisticsMetadata getStats();
-
- boolean hasWork();
-
- ScheduledFuture<?> schedule(Runnable command,
- long delay,
- TimeUnit unit);
-
- ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
- long initialDelay,
- long period,
- TimeUnit unit);
-
-}
\ No newline at end of file
Deleted:
branches/JCA/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPoolFactory.java
===================================================================
---
branches/JCA/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPoolFactory.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPoolFactory.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -1,373 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.queue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.Executor;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
-
-import com.metamatrix.common.CommonPlugin;
-import com.metamatrix.common.log.LogManager;
-import com.metamatrix.common.util.LogConstants;
-import com.metamatrix.core.log.MessageLevel;
-import com.metamatrix.core.util.NamedThreadFactory;
-
-/**
- * Creates named, queued, daemon Thread pools.
- * <br/>
- * The requirements are:
- * <ol>
- * <li>minimize thread creation</li>
- * <li>allow for proper timeout of idle threads</li>
- * <li>allow for queuing</li>
- * </ol>
- * <br/>
- * A non-fifo (lifo) {@link SynchronousQueue} based {@link ThreadPoolExecutor} satisfies
1 and 2, but not 3.
- * A bounded or unbound queue based {@link ThreadPoolExecutor} allows for 3, but will
tend to create
- * up to the maximum number of threads and makes no guarantee on thread scheduling.
- * <br/>
- * So the approach here is to use virtual thread pools off of single shared {@link
SynchronousQueue}
- * backed {@link ThreadPoolExecutor}.
- * <br/>
- * There is also only a single master scheduling thread with actual executions deferred
to the calling
- * WorkerPool.
- *
- * TODO: this probably needs to be re-thought, especially since the lifo ordering of a
{@link SynchronousQueue}
- * is not guaranteed behavior. also there's a race condition between previously
retiring threads and new work -
- * prior to being returned to the shared pool we can create extra threads if the shared
pool is exhausted.
- * TODO: bounded queuing - we never bothered bounding in the past with our worker pools,
but reasonable
- * defaults would be a good idea.
- */
-public class WorkerPoolFactory {
-
- private static ThreadPoolExecutor tpe = new ThreadPoolExecutor(0,
- Integer.MAX_VALUE, 2, TimeUnit.MINUTES,
- new SynchronousQueue<Runnable>(), new NamedThreadFactory("Worker")) {
//$NON-NLS-1$
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- if (t != null) {
- LogManager.logError(LogConstants.CTX_POOLING, t,
CommonPlugin.Util.getString("WorkerPool.uncaughtException")); //$NON-NLS-1$
- }
- }
- };
-
- private static ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, new
NamedThreadFactory("Scheduler")); //$NON-NLS-1$
-
- static class StatsCapturingSharedThreadPoolExecutor implements WorkerPool {
-
- class ScheduledFutureTask extends FutureTask<Void> implements
ScheduledFuture<Void> {
- private ScheduledFuture<?> scheduledFuture;
- private boolean periodic;
- private volatile boolean running;
-
- public ScheduledFutureTask(Runnable runnable, boolean periodic) {
- super(runnable, null);
- this.periodic = periodic;
- }
-
- public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
- scheduledTasks.add(this);
- this.scheduledFuture = scheduledFuture;
- }
-
- @Override
- public long getDelay(TimeUnit unit) {
- return this.scheduledFuture.getDelay(unit);
- }
-
- @Override
- public int compareTo(Delayed o) {
- return this.scheduledFuture.compareTo(o);
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- this.scheduledFuture.cancel(false);
- scheduledTasks.remove(this);
- return super.cancel(mayInterruptIfRunning);
- }
-
- public Runnable getParent() {
- return new Runnable() {
- @Override
- public void run() {
- if (running || terminated) {
- return;
- }
- running = periodic;
- execute(ScheduledFutureTask.this);
- }
- };
- }
-
- @Override
- public void run() {
- if (periodic) {
- if (!this.runAndReset()) {
- this.scheduledFuture.cancel(false);
- scheduledTasks.remove(this);
- }
- running = false;
- } else {
- scheduledTasks.remove(this);
- super.run();
- }
- }
- }
-
- private volatile int activeCount;
- private volatile int highestActiveCount;
- private volatile int highestQueueSize;
- private volatile boolean terminated;
- private volatile int submittedCount;
- private volatile int completedCount;
- private Object poolLock = new Object();
- private AtomicInteger threadCounter = new AtomicInteger();
- private Set<Thread> threads =
Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<Thread,
Boolean>()));
- private Set<ScheduledFutureTask> scheduledTasks =
Collections.synchronizedSet(Collections.newSetFromMap(new
IdentityHashMap<ScheduledFutureTask, Boolean>()));
-
- private String poolName;
- private int maximumPoolSize;
- private Queue<Runnable> queue = new LinkedList<Runnable>();
- private Executor thread;
-
- public StatsCapturingSharedThreadPoolExecutor(String name, int maximumPoolSize,
Executor thread) {
- this.maximumPoolSize = maximumPoolSize;
- this.poolName = name;
- this.thread = thread;
- }
-
- @Override
- public void execute(final Runnable command) {
- boolean atMaxThreads = false;
- boolean newMaxQueueSize = false;
- synchronized (poolLock) {
- checkForTermination();
- submittedCount++;
- atMaxThreads = activeCount == maximumPoolSize;
- if (atMaxThreads) {
- queue.add(command);
- int queueSize = queue.size();
- if (queueSize > highestQueueSize) {
- atMaxThreads = true;
- highestQueueSize = queueSize;
- }
- } else {
- activeCount++;
- highestActiveCount = Math.max(activeCount, highestActiveCount);
- }
- }
- if (atMaxThreads) {
- if (newMaxQueueSize && maximumPoolSize > 1) {
- LogManager.logWarning(LogConstants.CTX_POOLING,
CommonPlugin.Util.getString("WorkerPool.Max_thread", maximumPoolSize, poolName,
highestQueueSize)); //$NON-NLS-1$
- }
- return;
- }
- this.thread.execute(new Runnable() {
- @Override
- public void run() {
- Thread t = Thread.currentThread();
- threads.add(t);
- String name = t.getName();
- t.setName(name + "_" + poolName + threadCounter.getAndIncrement());
//$NON-NLS-1$
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_POOLING, MessageLevel.TRACE))
{
- LogManager.logTrace(LogConstants.CTX_POOLING, "Beginning work with virtual
worker", t.getName()); //$NON-NLS-1$
- }
- Runnable r = command;
- while (r != null) {
- boolean success = false;
- try {
- r.run();
- success = true;
- } finally {
- synchronized (poolLock) {
- if (success) {
- completedCount++;
- r = queue.poll();
- }
- if (!success || r == null) {
- threads.remove(t);
- activeCount--;
- if (activeCount == 0 && terminated) {
- poolLock.notifyAll();
- }
- }
- }
- t.setName(name);
- }
- }
- };
- });
- }
-
- private void checkForTermination() {
- if (terminated) {
- throw new RejectedExecutionException();
- }
- }
-
- public int getActiveCount() {
- return activeCount;
- }
-
- public int getSubmittedCount() {
- return submittedCount;
- }
-
- public int getCompletedCount() {
- return completedCount;
- }
-
- public int getPoolSize() {
- return activeCount;
- }
-
- public boolean isTerminated() {
- return terminated;
- }
-
- public void shutdown() {
- this.terminated = true;
- synchronized (scheduledTasks) {
- for (ScheduledFuture<?> future : new
ArrayList<ScheduledFuture<?>>(scheduledTasks)) {
- future.cancel(false);
- }
- scheduledTasks.clear();
- }
- }
-
- public int getLargestPoolSize() {
- return this.highestActiveCount;
- }
-
- @Override
- public WorkerPoolStatisticsMetadata getStats() {
- WorkerPoolStatisticsMetadata stats = new WorkerPoolStatisticsMetadata();
- stats.setName(poolName);
- stats.setQueued(queue.size());
- stats.setHighestQueued(highestQueueSize);
- stats.setActiveThreads(getActiveCount());
- stats.setMaxThreads(this.maximumPoolSize);
- stats.setTotalSubmitted(getSubmittedCount());
- stats.setHighestActiveThreads(getLargestPoolSize());
- stats.setTotalCompleted(getCompletedCount());
- return stats;
- }
-
- @Override
- public boolean hasWork() {
- synchronized (poolLock) {
- return this.getSubmittedCount() - this.getCompletedCount() > 0 &&
!this.isTerminated();
- }
- }
-
- @Override
- public List<Runnable> shutdownNow() {
- this.shutdown();
- synchronized (poolLock) {
- synchronized (threads) {
- for (Thread t : threads) {
- t.interrupt();
- }
- }
- List<Runnable> result = new ArrayList<Runnable>(queue);
- queue.clear();
- return result;
- }
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException {
- long timeoutMillis = unit.toMillis(timeout);
- long finalMillis = System.currentTimeMillis() + timeoutMillis;
- synchronized (poolLock) {
- while (this.activeCount > 0 || !terminated) {
- if (timeoutMillis < 1) {
- return false;
- }
- poolLock.wait(timeoutMillis);
- timeoutMillis = finalMillis - System.currentTimeMillis();
- }
- }
- return true;
- }
-
- @Override
- public ScheduledFuture<?> schedule(final Runnable command, long delay,
- TimeUnit unit) {
- checkForTermination();
- ScheduledFutureTask sft = new ScheduledFutureTask(command, false);
- synchronized (scheduledTasks) {
- ScheduledFuture<?> future = stpe.schedule(sft.getParent(), delay, unit);
- sft.setScheduledFuture(future);
- return sft;
- }
- }
-
- @Override
- public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command,
- long initialDelay, long period, TimeUnit unit) {
- checkForTermination();
- ScheduledFutureTask sft = new ScheduledFutureTask(command, true);
- synchronized (scheduledTasks) {
- ScheduledFuture<?> future = stpe.scheduleAtFixedRate(sft.getParent(),
initialDelay, period, unit);
- sft.setScheduledFuture(future);
- return sft;
- }
- }
- }
-
- /**
- * Creates a WorkerPool that prefers thread reuse over thread creation based upon the
given parameters
- *
- * @param name
- * @param numThreads the maximum number of worker threads allowed
- * @return
- */
- public static WorkerPool newWorkerPool(String name, int numThreads) {
- return new StatsCapturingSharedThreadPoolExecutor(name, numThreads, tpe);
- }
-
-
- public static WorkerPool newWorkerPool(String name, int numThreads, final Executor
executor) {
- return new StatsCapturingSharedThreadPoolExecutor(name, numThreads, executor);
- }
-
-}
Deleted: branches/JCA/common-internal/src/main/java/org/teiid/ContainerHelper.java
===================================================================
--- branches/JCA/common-internal/src/main/java/org/teiid/ContainerHelper.java 2010-02-22
18:48:13 UTC (rev 1866)
+++ branches/JCA/common-internal/src/main/java/org/teiid/ContainerHelper.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-package org.teiid;
-
-import java.security.Principal;
-
-import javax.security.auth.Subject;
-
-
-public interface ContainerHelper {
-
- boolean assosiateSecurityContext(String securityDomain, Object context);
-
- void clearSecurityContext(String securityDomain);
-
- Object getSecurityContext(String securityDomain);
-
- Object createSecurityContext(String securityDomain, Principal p, Object credentials,
Subject subject);
-
- <T> T getService(Class<T> clazz);
-}
Deleted: branches/JCA/common-internal/src/main/java/org/teiid/ContainerUtil.java
===================================================================
--- branches/JCA/common-internal/src/main/java/org/teiid/ContainerUtil.java 2010-02-22
18:48:13 UTC (rev 1866)
+++ branches/JCA/common-internal/src/main/java/org/teiid/ContainerUtil.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -1,48 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-package org.teiid;
-
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import com.metamatrix.core.MetaMatrixRuntimeException;
-
-public class ContainerUtil {
-
- public static <T> T lookup(String jndiName) {
- try {
- InitialContext ic = new InitialContext();
- return (T)ic.lookup(jndiName);
- } catch (NamingException e) {
- throw new MetaMatrixRuntimeException("Object with JNDI name
"+jndiName+" not found");
- }
- }
-
- public static boolean exists(String jndiName) {
- try {
- lookup(jndiName);
- return true;
- }catch(MetaMatrixRuntimeException e) {
- return false;
- }
- }
-}
Added: branches/JCA/common-internal/src/main/java/org/teiid/SecurityHelper.java
===================================================================
--- branches/JCA/common-internal/src/main/java/org/teiid/SecurityHelper.java
(rev 0)
+++ branches/JCA/common-internal/src/main/java/org/teiid/SecurityHelper.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid;
+
+import java.security.Principal;
+
+import javax.security.auth.Subject;
+
+public interface SecurityHelper {
+
+ boolean assosiateSecurityContext(String securityDomain, Object context);
+
+ void clearSecurityContext(String securityDomain);
+
+ Object getSecurityContext(String securityDomain);
+
+ Object createSecurityContext(String securityDomain, Principal p, Object credentials,
Subject subject);
+
+}
Property changes on:
branches/JCA/common-internal/src/main/java/org/teiid/SecurityHelper.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/FakeWorkItem.java
===================================================================
---
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/FakeWorkItem.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/FakeWorkItem.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -24,9 +24,11 @@
import java.sql.Timestamp;
+import javax.resource.spi.work.Work;
+
/**
*/
-public class FakeWorkItem implements Runnable {
+public class FakeWorkItem implements Work {
private static boolean DEBUG = false;
@@ -68,4 +70,9 @@
}
}
+ @Override
+ public void release() {
+
+ }
+
}
Copied:
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/FakeWorkManager.java
(from rev 1854,
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeWorkManager.java)
===================================================================
---
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/FakeWorkManager.java
(rev 0)
+++
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/FakeWorkManager.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -0,0 +1,73 @@
+package com.metamatrix.common.queue;
+
+import javax.resource.spi.work.ExecutionContext;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkEvent;
+import javax.resource.spi.work.WorkException;
+import javax.resource.spi.work.WorkListener;
+import javax.resource.spi.work.WorkManager;
+
+import org.mockito.Mockito;
+
+public class FakeWorkManager implements WorkManager {
+ private Thread t;
+
+ @Override
+ public void doWork(Work arg0) throws WorkException {
+ execute(arg0, null, true);
+ }
+
+ @Override
+ public void doWork(Work arg0, long arg1, ExecutionContext arg2, WorkListener arg3)
throws WorkException {
+ execute(arg0, arg3, true);
+ }
+
+ @Override
+ public void scheduleWork(Work arg0) throws WorkException {
+ execute(arg0, null, false);
+ }
+
+ @Override
+ public void scheduleWork(Work arg0, long arg1, ExecutionContext arg2, WorkListener arg3)
throws WorkException {
+ execute(arg0, arg3, false);
+ }
+
+ @Override
+ public long startWork(Work arg0) throws WorkException {
+ execute(arg0, null, false);
+ return 0;
+ }
+
+ @Override
+ public long startWork(Work arg0, long arg1, ExecutionContext arg2, WorkListener arg3)
throws WorkException {
+ execute(arg0, arg3, false);
+ return 0;
+ }
+
+ void execute(final Work arg0, final WorkListener arg3, boolean join) throws
WorkException {
+ if (arg3 != null) {
+ arg3.workAccepted(Mockito.mock(WorkEvent.class));
+ arg3.workStarted(Mockito.mock(WorkEvent.class));
+ }
+
+ t = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ arg0.run();
+ if (arg3 != null) {
+ arg3.workCompleted(Mockito.mock(WorkEvent.class));
+ }
+ }
+ });
+ t.start();
+ if (join) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ throw new WorkException(e);
+ }
+ }
+ }
+
+}
Property changes on:
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/FakeWorkManager.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Deleted:
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java
===================================================================
---
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -1,173 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.queue;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Test;
-import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
-
-/**
- */
-public class TestQueueWorkerPool {
-
- @Test public void testQueuing() throws Exception {
- final long SINGLE_WAIT = 50;
- final int WORK_ITEMS = 10;
- final int MAX_THREADS = 5;
-
- final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test",
MAX_THREADS); //$NON-NLS-1$
-
- for(int i=0; i<WORK_ITEMS; i++) {
- pool.execute(new FakeWorkItem(SINGLE_WAIT));
- }
-
- pool.shutdown();
- pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
- assertTrue(pool.isTerminated());
- WorkerPoolStatisticsMetadata stats = pool.getStats();
- assertEquals(10, stats.getTotalCompleted());
- assertEquals("Expected threads to be maxed out", MAX_THREADS,
stats.getHighestActiveThreads()); //$NON-NLS-1$
- }
-
- @Test public void testThreadReuse() throws Exception {
- final long SINGLE_WAIT = 50;
- final long NUM_THREADS = 5;
-
- final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5);
//$NON-NLS-1$
-
- for(int i=0; i<NUM_THREADS; i++) {
- pool.execute(new FakeWorkItem(SINGLE_WAIT));
-
- try {
- Thread.sleep(SINGLE_WAIT*2);
- } catch(InterruptedException e) {
- }
- }
-
- pool.shutdown();
-
- WorkerPoolStatisticsMetadata stats = pool.getStats();
- assertEquals("Expected 1 thread for serial execution", 1,
stats.getHighestActiveThreads()); //$NON-NLS-1$
-
- pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
- }
-
- @Test(expected=RejectedExecutionException.class) public void testShutdown() throws
Exception {
- final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5);
//$NON-NLS-1$
- pool.shutdown();
- pool.execute(new FakeWorkItem(1));
- }
-
- @Test public void testScheduleCancel() throws Exception {
- final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5);
//$NON-NLS-1$
- ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- }
- }, 0, 5, TimeUnit.MILLISECONDS);
- future.cancel(true);
- assertFalse(future.cancel(true));
- }
-
- @Test public void testSchedule() throws Exception {
- final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5);
//$NON-NLS-1$
- final ArrayList<String> result = new ArrayList<String>();
- ScheduledFuture<?> future = pool.schedule(new Runnable() {
- @Override
- public void run() {
- result.add("hello"); //$NON-NLS-1$
- }
- }, 5, TimeUnit.MILLISECONDS);
- future.cancel(true);
- Thread.sleep(10);
- assertEquals(0, result.size());
- future = pool.schedule(new Runnable() {
- @Override
- public void run() {
- result.add("hello"); //$NON-NLS-1$
- }
- }, 5, TimeUnit.MILLISECONDS);
- Thread.sleep(10);
- pool.shutdown();
- pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
- assertEquals(1, result.size());
- }
-
- @Test(expected=ExecutionException.class) public void testScheduleException() throws
Exception {
- final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5);
//$NON-NLS-1$
- ScheduledFuture<?> future = pool.schedule(new Runnable() {
- @Override
- public void run() {
- throw new RuntimeException();
- }
- }, 0, TimeUnit.MILLISECONDS);
- future.get();
- }
-
- /**
- * Here each execution exceeds the period
- */
- @Test public void testScheduleRepeated() throws Exception {
- final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5);
//$NON-NLS-1$
- final ArrayList<String> result = new ArrayList<String>();
- ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- result.add("hello"); //$NON-NLS-1$
- try {
- Thread.sleep(75);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }, 0, 10, TimeUnit.MILLISECONDS);
- Thread.sleep(100);
- future.cancel(true);
- assertEquals(2, result.size());
- }
-
- @Test public void testFailingWork() throws Exception {
- final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5);
//$NON-NLS-1$
- final AtomicInteger count = new AtomicInteger();
- pool.execute(new Runnable() {
- @Override
- public void run() {
- count.getAndIncrement();
- throw new RuntimeException();
- }
- });
- Thread.sleep(100);
- assertEquals(1, count.get());
- }
-
-}
Copied:
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/TestStatsCapturingWorkManager.java
(from rev 1835,
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java)
===================================================================
---
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/TestStatsCapturingWorkManager.java
(rev 0)
+++
branches/JCA/common-internal/src/test/java/com/metamatrix/common/queue/TestStatsCapturingWorkManager.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -0,0 +1,176 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.queue;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkManager;
+import javax.resource.spi.work.WorkRejectedException;
+
+import org.junit.Test;
+import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
+
+/**
+ */
+public class TestStatsCapturingWorkManager {
+
+ private WorkManager manager = new FakeWorkManager();
+
+ @Test public void testQueuing() throws Exception {
+ final long SINGLE_WAIT = 50;
+ final int WORK_ITEMS = 10;
+ final int MAX_THREADS = 5;
+
+ final StatsCapturingWorkManager pool = new
StatsCapturingWorkManager("test", MAX_THREADS); //$NON-NLS-1$
+
+ for(int i=0; i<WORK_ITEMS; i++) {
+ pool.scheduleWork(manager, new FakeWorkItem(SINGLE_WAIT));
+ }
+
+ pool.shutdown();
+ pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ assertTrue(pool.isTerminated());
+ WorkerPoolStatisticsMetadata stats = pool.getStats();
+ assertEquals(10, stats.getTotalCompleted());
+ assertEquals("Expected threads to be maxed out", MAX_THREADS,
stats.getHighestActiveThreads()); //$NON-NLS-1$
+ }
+
+ @Test public void testThreadReuse() throws Exception {
+ final long SINGLE_WAIT = 50;
+ final long NUM_THREADS = 5;
+
+ StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test",
5); //$NON-NLS-1$
+
+ for(int i=0; i<NUM_THREADS; i++) {
+ pool.scheduleWork(manager, new FakeWorkItem(SINGLE_WAIT));
+
+ try {
+ Thread.sleep(SINGLE_WAIT*2);
+ } catch(InterruptedException e) {
+ }
+ }
+
+ pool.shutdown();
+
+ WorkerPoolStatisticsMetadata stats = pool.getStats();
+ assertEquals("Expected 1 thread for serial execution", 1,
stats.getHighestActiveThreads()); //$NON-NLS-1$
+
+ pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ }
+
+ @Test(expected=WorkRejectedException.class) public void testShutdown() throws
Exception {
+ StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5);
//$NON-NLS-1$
+ pool.shutdown();
+ pool.scheduleWork(manager, new FakeWorkItem(1));
+ }
+
+ /*@Test public void testScheduleCancel() throws Exception {
+ StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5);
//$NON-NLS-1$
+ ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ }
+ }, 0, 5, TimeUnit.MILLISECONDS);
+ future.cancel(true);
+ assertFalse(future.cancel(true));
+ }*/
+
+ @Test public void testSchedule() throws Exception {
+ StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5);
//$NON-NLS-1$
+ final ArrayList<String> result = new ArrayList<String>();
+ pool.scheduleWork(manager, new Work() {
+
+ @Override
+ public void run() {
+ result.add("hello"); //$NON-NLS-1$
+ }
+
+ @Override
+ public void release() {
+
+ }
+ }, null, 5);
+ Thread.sleep(10);
+ pool.shutdown();
+ pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ assertEquals(1, result.size());
+ }
+
+ /*(a)Test(expected=ExecutionException.class) public void testScheduleException() throws
Exception {
+ StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5);
//$NON-NLS-1$
+ ScheduledFuture<?> future = pool.schedule(new Runnable() {
+ @Override
+ public void run() {
+ throw new RuntimeException();
+ }
+ }, 0, TimeUnit.MILLISECONDS);
+ future.get();
+ }*/
+
+ /**
+ * Here each execution exceeds the period
+ */
+ /*@Test public void testScheduleRepeated() throws Exception {
+ StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5);
//$NON-NLS-1$
+ final ArrayList<String> result = new ArrayList<String>();
+ ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ result.add("hello"); //$NON-NLS-1$
+ try {
+ Thread.sleep(75);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 0, 10, TimeUnit.MILLISECONDS);
+ Thread.sleep(100);
+ future.cancel(true);
+ assertEquals(2, result.size());
+ }*/
+
+ @Test public void testFailingWork() throws Exception {
+ StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5);
//$NON-NLS-1$
+ final AtomicInteger count = new AtomicInteger();
+ pool.scheduleWork(manager, new Work() {
+ @Override
+ public void run() {
+ count.getAndIncrement();
+ throw new RuntimeException();
+ }
+
+ @Override
+ public void release() {
+
+ }
+ });
+ Thread.sleep(100);
+ assertEquals(1, count.get());
+ }
+
+}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/dqp/message/AtomicRequestMessage.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/dqp/message/AtomicRequestMessage.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/main/java/com/metamatrix/dqp/message/AtomicRequestMessage.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -25,7 +25,6 @@
package com.metamatrix.dqp.message;
import java.io.Serializable;
-import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
import org.teiid.dqp.internal.process.DQPWorkContext;
@@ -69,11 +68,8 @@
// results fetch size
private int fetchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
- // The time when the command was created by the client
- private Date submittedTimestamp;
-
// The time when command begins processing on the server.
- private Date processingTimestamp;
+ private long processingTimestamp = System.currentTimeMillis();
// whether to use ResultSet cache if there is one
private boolean useResultSetCache;
@@ -148,54 +144,13 @@
}
/**
- * Get time that the time when the command was created by the client.
- * @return timestamp in millis
- */
- public Date getSubmittedTimestamp() {
- return submittedTimestamp;
- }
-
- /**
- * Set time that the time when the command was created by the client.
- * NOTE: By default, this gets set to the current time by the constructor.
- * @param submittedTimestamp Time submitted to server.
- */
- public void setSubmittedTimestamp(Date submittedTimestamp) {
- this.submittedTimestamp = submittedTimestamp;
- }
-
- /**
- * Start the clock on submission start - this should be called when the request is
originally created.
- */
- public void markSubmissionStart() {
- setSubmittedTimestamp(new Date());
- }
-
-
- /**
* Get time that the request was assigned a unique ID by the server.
* @return timestamp in millis
*/
- public Date getProcessingTimestamp() {
+ public long getProcessingTimestamp() {
return processingTimestamp;
}
- /**
- * Set time that the request is submitted on the server.
- * @param processingTimestamp Time submitted to server.
- */
- public void setProcessingTimestamp(Date processingTimestamp) {
- this.processingTimestamp = processingTimestamp;
- }
-
- /**
- * Start the clock on processing times - this should be called when the query
- * hits the QueryService or SubscriptionService.
- */
- public void markProcessingStart() {
- setProcessingTimestamp(new Date());
- }
-
public boolean useResultSetCache() {
//not use caching when there is a txn
return useResultSetCache
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/platform/security/api/service/SessionService.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/platform/security/api/service/SessionService.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/main/java/com/metamatrix/platform/security/api/service/SessionService.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -28,6 +28,7 @@
import javax.security.auth.login.LoginException;
import org.teiid.adminapi.impl.SessionMetadata;
+import org.teiid.dqp.internal.process.DQPCore;
import com.metamatrix.admin.api.exception.security.InvalidSessionException;
import com.metamatrix.api.exception.security.AuthorizationException;
@@ -141,4 +142,7 @@
public void setLocalSession(long sessionID);
SessionMetadata getActiveSession(long sessionID);
+
+ void setDqp(DQPCore dqp);
+
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -41,6 +41,7 @@
import org.jboss.managed.api.annotation.ManagementProperties;
import org.jboss.managed.api.annotation.ManagementProperty;
import org.jboss.managed.api.annotation.ViewUse;
+import org.teiid.SecurityHelper;
import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
import org.teiid.connector.api.Connection;
import org.teiid.connector.api.Connector;
@@ -84,6 +85,8 @@
private String connectorName;
private StatsCapturingWorkManager workManager;
+ private SecurityHelper securityHelper;
+
protected ConnectorWorkItemFactory workItemFactory;
private volatile ConnectorStatus state = ConnectorStatus.NOT_INITIALIZED;
@@ -97,10 +100,10 @@
private SourceCapabilities cachedCapabilities;
public ConnectorManager(String name) {
- this(name, DEFAULT_MAX_THREADS);
+ this(name, DEFAULT_MAX_THREADS, null);
}
- public ConnectorManager(String name, int maxThreads) {
+ public ConnectorManager(String name, int maxThreads, SecurityHelper securityHelper)
{
if (name == null) {
throw new IllegalArgumentException("Connector name can not be null");
}
@@ -109,8 +112,13 @@
}
this.connectorName = name;
this.workManager = new StatsCapturingWorkManager(this.connectorName, maxThreads);
+ this.securityHelper = securityHelper;
}
+ SecurityHelper getSecurityHelper() {
+ return securityHelper;
+ }
+
public String getName() {
return this.connectorName;
}
@@ -173,7 +181,6 @@
ConnectorWorkItem item = workItemFactory.createWorkItem(message, receiver,
workManager);
Assertion.isNull(requestStates.put(atomicRequestId, item), "State already
existed"); //$NON-NLS-1$
- message.markProcessingStart();
enqueueRequest(workManager, item);
}
@@ -181,7 +188,7 @@
try {
// if connector is immutable, then we do not want pass-on the transaction
context.
if (work.securityContext.isTransactional()) {
- this.workManager.scheduleWork(workManager, work, 0,
work.requestMsg.getTransactionContext(), work);
+ this.workManager.scheduleWork(workManager, work,
work.requestMsg.getTransactionContext(), 0);
}
else {
this.workManager.scheduleWork(workManager, work);
@@ -241,7 +248,7 @@
public void release() {
}
- }, delay, null, null);
+ }, null, delay);
} catch (WorkException e) {
throw new ConnectorException(e);
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -29,8 +29,6 @@
import javax.resource.spi.work.WorkEvent;
-import org.teiid.ContainerHelper;
-import org.teiid.ContainerUtil;
import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.connector.api.Connection;
import org.teiid.connector.api.Connector;
@@ -542,7 +540,7 @@
try {
asynchCancel();
} catch (ConnectorException e) {
- LogManager.logError(LogConstants.CTX_CONNECTOR, event.getException(),
this.id.toString()); //$NON-NLS-1$
+ LogManager.logError(LogConstants.CTX_CONNECTOR, event.getException(),
this.id.toString());
}
}
@@ -550,8 +548,7 @@
protected boolean assosiateSecurityContext() {
DQPWorkContext context = requestMsg.getWorkContext();
if (context.getSubject() != null) {
- ContainerHelper helper =
ContainerUtil.lookup("teiid/container-helper");
- return helper.assosiateSecurityContext(context.getSecurityDomain(),
context.getSecurityContext());
+ return
manager.getSecurityHelper().assosiateSecurityContext(context.getSecurityDomain(),
context.getSecurityContext());
}
return false;
}
@@ -560,8 +557,7 @@
protected void clearSecurityContext() {
DQPWorkContext context = requestMsg.getWorkContext();
if (context.getSubject() != null) {
- ContainerHelper helper =
ContainerUtil.lookup("teiid/container-helper");
- helper.clearSecurityContext(context.getSecurityDomain());
+ manager.getSecurityHelper().clearSecurityContext(context.getSecurityDomain());
}
}
}
\ No newline at end of file
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -34,7 +34,7 @@
* Represents a task that performs work that may take more than one processing pass to
complete.
* During processing the WorkItem may receive events asynchronously through the moreWork
method.
*/
-public abstract class AbstractWorkItem implements Work, WorkListener{
+public abstract class AbstractWorkItem implements Work, WorkListener {
enum ThreadState {
MORE_WORK, WORKING, IDLE, DONE
Added:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/ContainerServiceProvider.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/ContainerServiceProvider.java
(rev 0)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/ContainerServiceProvider.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.process;
+
+import javax.resource.spi.XATerminator;
+import javax.resource.spi.work.WorkManager;
+
+public interface ContainerServiceProvider {
+
+ XATerminator getXATerminator();
+
+ WorkManager getWorkManager();
+
+ DQPConfiguration getDQPConfiguration();
+
+}
Property changes on:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/ContainerServiceProvider.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -39,7 +39,7 @@
private String processName = "localhost";
private int maxThreads = DEFAULT_MAX_PROCESS_WORKERS;
private int timeSliceInMilli = DEFAULT_PROCESSOR_TIMESLICE;
- private boolean optionDebugAllowed = true;
+ private boolean processDebugAllowed;
private int maxRowsFetchSize = DEFAULT_FETCH_SIZE;
private int lobChunkSizeInKB = 100;
private int preparedPlanCacheMaxCount = SessionAwareCache.DEFAULT_MAX_SIZE_TOTAL;
@@ -75,13 +75,13 @@
public void setTimeSliceInMilli(Integer timeSliceInMilli) {
this.timeSliceInMilli = timeSliceInMilli;
}
-
- public boolean isOptionDebugAllowed() {
- return optionDebugAllowed;
+
+ public boolean isProcessDebugAllowed() {
+ return processDebugAllowed;
}
-
- public void setOptionDebugAllowed(Boolean optionDebugAllowed) {
- this.optionDebugAllowed = optionDebugAllowed;
+
+ public void setProcessDebugAllowed(boolean processDebugAllowed) {
+ this.processDebugAllowed = processDebugAllowed;
}
public int getMaxRowsFetchSize() {
Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -29,13 +29,18 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkEvent;
import javax.resource.spi.work.WorkException;
+import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.Xid;
+import org.teiid.SecurityHelper;
import org.teiid.adminapi.Admin;
import org.teiid.adminapi.AdminException;
import org.teiid.adminapi.impl.RequestMetadata;
@@ -49,6 +54,7 @@
import com.metamatrix.api.exception.query.QueryMetadataException;
import com.metamatrix.api.exception.security.SessionServiceException;
import com.metamatrix.common.buffer.BufferManager;
+import com.metamatrix.common.comm.api.ResultsReceiver;
import com.metamatrix.common.lob.LobChunk;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.queue.StatsCapturingWorkManager;
@@ -82,6 +88,51 @@
*/
public class DQPCore implements ClientSideDQP {
+ private final static class FutureWork<T> implements Work, WorkListener {
+ private final ResultsReceiver<T> receiver;
+ private final Callable<T> toCall;
+
+ private FutureWork(ResultsReceiver<T> receiver,
+ Callable<T> processor) {
+ this.receiver = receiver;
+ this.toCall = processor;
+ }
+
+ @Override
+ public void run() {
+ try {
+ receiver.receiveResults(toCall.call());
+ } catch (Throwable t) {
+ receiver.exceptionOccurred(t);
+ }
+ }
+
+ @Override
+ public void release() {
+
+ }
+
+ @Override
+ public void workAccepted(WorkEvent arg0) {
+
+ }
+
+ @Override
+ public void workCompleted(WorkEvent arg0) {
+
+ }
+
+ @Override
+ public void workRejected(WorkEvent arg0) {
+ receiver.exceptionOccurred(arg0.getException());
+ }
+
+ @Override
+ public void workStarted(WorkEvent arg0) {
+
+ }
+ }
+
static class ClientState {
List<RequestID> requests;
TempTableStoreImpl tempTableStoreImpl;
@@ -142,11 +193,17 @@
private Map<RequestID, RequestWorkItem> requests = new
ConcurrentHashMap<RequestID, RequestWorkItem>();
private Map<String, ClientState> clientState = Collections.synchronizedMap(new
HashMap<String, ClientState>());
private DQPContextCache contextCache;
+ private SecurityHelper securityHelper;
/**
* perform a full shutdown and wait for 10 seconds for all threads to finish
*/
public void stop() {
+ processWorkerPool.shutdownNow();
+ try {
+ processWorkerPool.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ }
// TODO: Should we be doing more cleanup here??
LogManager.logDetail(LogConstants.CTX_DQP, "Stopping the DQP");
//$NON-NLS-1$
}
@@ -192,8 +249,7 @@
req.setExecutionId(holder.requestID.getExecutionID());
req.setSessionId(Long.parseLong(holder.requestID.getConnectionID()));
req.setCommand(holder.requestMsg.getCommandString());
- req.setCreatedTiime(holder.requestMsg.getSubmittedTimestamp().getTime());
-
req.setProcessingTime(holder.requestMsg.getProcessingTimestamp().getTime());
+ req.setProcessingTime(holder.getProcessingTimestamp());
if (holder.getTransactionContext() != null &&
holder.getTransactionContext().getXid() != null) {
req.setTransactionId(holder.getTransactionContext().getXid().toString());
@@ -213,8 +269,7 @@
info.setExecutionId(arm.getRequestID().getExecutionID());
info.setSessionId(Long.parseLong(holder.requestID.getConnectionID()));
info.setCommand(arm.getCommand().toString());
- info.setCreatedTiime(arm.getSubmittedTimestamp().getTime());
- info.setProcessingTime(arm.getProcessingTimestamp().getTime());
+ info.setProcessingTime(arm.getProcessingTimestamp());
info.setSourceRequest(true);
info.setNodeId(arm.getAtomicRequestID().getNodeID());
@@ -229,7 +284,6 @@
public ResultsFuture<ResultsMessage> executeRequest(long reqID,RequestMessage
requestMsg) {
DQPWorkContext workContext = DQPWorkContext.getWorkContext();
RequestID requestID = workContext.getRequestID(reqID);
- requestMsg.markProcessingStart();
requestMsg.setFetchSize(Math.min(requestMsg.getFetchSize(), maxFetchSize));
Request request = null;
if ( requestMsg.isPreparedStatement() || requestMsg.isCallableStatement()) {
@@ -282,8 +336,8 @@
try {
this.processWorkerPool.scheduleWork(this.workManager, work);
} catch (WorkException e) {
- //TODO: how can be turn this into result?
- e.printStackTrace();
+ //TODO: cancel? close?
+ throw new MetaMatrixRuntimeException(e);
}
}
@@ -291,6 +345,14 @@
this.workManager = mgr;
}
+ SecurityHelper getSecurityHelper() {
+ return this.securityHelper;
+ }
+
+ public void setSecurityHelper(SecurityHelper securityHelper) {
+ this.securityHelper = securityHelper;
+ }
+
public ResultsFuture<?> closeLobChunkStream(int lobRequestId,
long requestId, String streamId)
throws MetaMatrixProcessingException {
@@ -353,17 +415,6 @@
return processWorkerPool.getStats();
}
- /**
- * Cancel and close all requests associated with the clientConnection/session. Also
runs a final cleanup any caches within
- * the session's scope.
- */
- @Override
- public void terminateSession() throws MetaMatrixComponentException {
- DQPWorkContext context = DQPWorkContext.getWorkContext();
- terminateSession(context.getSessionId());
- }
-
-
public void terminateSession(long terminateeId) {
DQPWorkContext context = DQPWorkContext.getWorkContext();
@@ -492,7 +543,7 @@
public Collection<SessionMetadata> getActiveSessions() throws
SessionServiceException {
if (this.sessionService == null) {
- return Collections.EMPTY_LIST;
+ return Collections.emptyList();
}
return this.sessionService.getActiveSessions();
}
@@ -506,7 +557,7 @@
public Collection<org.teiid.adminapi.Transaction> getTransactions() {
if (this.transactionService == null) {
- return Collections.EMPTY_LIST;
+ return Collections.emptyList();
}
return this.transactionService.getTransactions();
}
@@ -600,7 +651,7 @@
this.processorTimeslice = config.getTimeSliceInMilli();
this.maxFetchSize = config.getMaxRowsFetchSize();
- this.processorDebugAllowed = config.isOptionDebugAllowed();
+ this.processorDebugAllowed = config.isProcessDebugAllowed();
this.maxCodeTableRecords = config.getCodeTablesMaxRowsPerTable();
this.maxCodeTables = config.getCodeTablesMaxCount();
this.maxCodeRecords = config.getCodeTablesMaxRows();
@@ -651,7 +702,7 @@
this.transactionService = service;
}
- public List getXmlSchemas(String docName) throws MetaMatrixComponentException,
QueryMetadataException {
+ public List<String> getXmlSchemas(String docName) throws
MetaMatrixComponentException, QueryMetadataException {
DQPWorkContext workContext = DQPWorkContext.getWorkContext();
QueryMetadataInterface metadata =
workContext.getVDB().getAttachment(QueryMetadataInterface.class);
@@ -667,55 +718,108 @@
}
// local txn
- public void begin() throws XATransactionException {
+ public ResultsFuture<?> begin() throws XATransactionException {
String threadId = DQPWorkContext.getWorkContext().getConnectionID();
this.getTransactionService().begin(threadId);
+ return null;
}
// local txn
- public void commit() throws XATransactionException {
- String threadId = DQPWorkContext.getWorkContext().getConnectionID();
- this.getTransactionService().commit(threadId);
+ public ResultsFuture<?> commit() throws XATransactionException {
+ final String threadId = DQPWorkContext.getWorkContext().getConnectionID();
+ Callable<Void> processor = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ getTransactionService().commit(threadId);
+ return null;
+ }
+ };
+ return addWork(processor);
}
// local txn
- public void rollback() throws XATransactionException {
- String threadId = DQPWorkContext.getWorkContext().getConnectionID();
- this.getTransactionService().rollback(threadId);
+ public ResultsFuture<?> rollback() throws XATransactionException {
+ final String threadId = DQPWorkContext.getWorkContext().getConnectionID();
+ Callable<Void> processor = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ getTransactionService().rollback(threadId);
+ return null;
+ }
+ };
+ return addWork(processor);
}
// global txn
- public void commit(MMXid xid, boolean onePhase) throws XATransactionException {
- String threadId = DQPWorkContext.getWorkContext().getConnectionID();
- this.getTransactionService().commit(threadId, xid, onePhase, false);
+ public ResultsFuture<?> commit(final MMXid xid, final boolean onePhase) throws
XATransactionException {
+ final String threadId = DQPWorkContext.getWorkContext().getConnectionID();
+ Callable<Void> processor = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ getTransactionService().commit(threadId, xid, onePhase, false);
+ return null;
+ }
+ };
+ return addWork(processor);
}
// global txn
- public void end(MMXid xid, int flags) throws XATransactionException {
+ public ResultsFuture<?> end(MMXid xid, int flags) throws XATransactionException {
String threadId = DQPWorkContext.getWorkContext().getConnectionID();
this.getTransactionService().end(threadId, xid, flags, false);
+ return null;
}
// global txn
- public void forget(MMXid xid) throws XATransactionException {
+ public ResultsFuture<?> forget(MMXid xid) throws XATransactionException {
String threadId = DQPWorkContext.getWorkContext().getConnectionID();
this.getTransactionService().forget(threadId, xid, false);
+ return null;
}
+
// global txn
- public int prepare(MMXid xid) throws XATransactionException {
- return
this.getTransactionService().prepare(DQPWorkContext.getWorkContext().getConnectionID(),xid,
false);
+ public ResultsFuture<Integer> prepare(final MMXid xid) throws
XATransactionException {
+ Callable<Integer> processor = new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ return
getTransactionService().prepare(DQPWorkContext.getWorkContext().getConnectionID(),xid,
false);
+ }
+ };
+ return addWork(processor);
}
+
+ private <T> ResultsFuture<T> addWork(Callable<T> processor) {
+ ResultsFuture<T> result = new ResultsFuture<T>();
+ ResultsReceiver<T> receiver = result.getResultsReceiver();
+ try {
+ this.workManager.scheduleWork(new FutureWork<T>(receiver, processor));
+ } catch (WorkException e) {
+ throw new MetaMatrixRuntimeException(e);
+ }
+ return result;
+ }
+
// global txn
- public Xid[] recover(int flag) throws XATransactionException {
- return this.getTransactionService().recover(flag, false);
+ public ResultsFuture<Xid[]> recover(int flag) throws XATransactionException {
+ ResultsFuture<Xid[]> result = new ResultsFuture<Xid[]>();
+ result.getResultsReceiver().receiveResults(this.getTransactionService().recover(flag,
false));
+ return result;
}
// global txn
- public void rollback(MMXid xid) throws XATransactionException {
- this.getTransactionService().rollback(DQPWorkContext.getWorkContext().getConnectionID(),xid,
false);
+ public ResultsFuture<?> rollback(final MMXid xid) throws XATransactionException {
+ Callable<Void> processor = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ getTransactionService().rollback(DQPWorkContext.getWorkContext().getConnectionID(),xid,
false);
+ return null;
+ }
+ };
+ return addWork(processor);
}
// global txn
- public void start(MMXid xid, int flags, int timeout)
+ public ResultsFuture<?> start(MMXid xid, int flags, int timeout)
throws XATransactionException {
String threadId = DQPWorkContext.getWorkContext().getConnectionID();
this.getTransactionService().start(threadId, xid, flags, timeout, false);
+ return null;
}
public MetadataResult getMetadata(long requestID)
@@ -739,6 +843,7 @@
public void setSessionService(SessionService service) {
this.sessionService = service;
+ service.setDqp(this);
}
public SessionService getSessionService() {
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -332,7 +332,6 @@
RequestMessage request = workItem.requestMsg;
// build the atomic request based on original request + context info
AtomicRequestMessage aqr = new AtomicRequestMessage(request,
workItem.getDqpWorkContext(), nodeID);
- aqr.markSubmissionStart();
aqr.setCommand(command);
aqr.setModelName(modelName);
aqr.setUseResultSetCache(request.useResultSetCache());
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -103,7 +103,7 @@
List<SPParameter> spParams = proc.getParameters();
proc.clearParameters();
int inParameterCount = values.size();
- if (this.requestMsg.isPreparedBatchUpdate() && values.size() > 0) {
+ if (this.requestMsg.isBatchedUpdate() && values.size() > 0) {
inParameterCount = ((List)values.get(0)).size();
}
int index = 1;
@@ -161,7 +161,7 @@
createCommandContext();
}
- if (requestMsg.isPreparedBatchUpdate()) {
+ if (requestMsg.isBatchedUpdate()) {
handlePreparedBatchUpdate();
} else {
List<Reference> params = prepPlan.getReferences();
@@ -186,7 +186,7 @@
*/
private void handlePreparedBatchUpdate() throws QueryMetadataException,
MetaMatrixComponentException, QueryResolverException, QueryPlannerException,
QueryValidatorException {
- List<List<?>> paramValues = requestMsg.getParameterValues();
+ List<List<?>> paramValues = (List<List<?>>)
requestMsg.getParameterValues();
if (paramValues.isEmpty()) {
throw new QueryValidatorException("No batch values sent for prepared batch
update"); //$NON-NLS-1$
}
Modified: branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -52,6 +52,7 @@
import com.metamatrix.dqp.DQPPlugin;
import com.metamatrix.dqp.message.RequestID;
import com.metamatrix.dqp.message.RequestMessage;
+import com.metamatrix.dqp.message.RequestMessage.ResultsMode;
import com.metamatrix.dqp.service.AuthorizationService;
import com.metamatrix.dqp.service.TransactionContext;
import com.metamatrix.dqp.service.TransactionService;
@@ -214,8 +215,9 @@
StoredProcedure proc = (StoredProcedure)userCommand;
returnsResultSet = proc.returnsResultSet();
}
- if (this.requestMsg.getRequireResultSet() != null &&
this.requestMsg.getRequireResultSet() != returnsResultSet) {
- throw new
QueryValidatorException(DQPPlugin.Util.getString(this.requestMsg.getRequireResultSet()?"Request.no_result_set":"Request.result_set"));
//$NON-NLS-1$ //$NON-NLS-2$
+ if ((this.requestMsg.getResultsMode() == ResultsMode.UPDATECOUNT &&
!returnsUpdateCount)
+ || (this.requestMsg.getResultsMode() == ResultsMode.RESULTSET &&
!returnsResultSet)) {
+ throw new
QueryValidatorException(DQPPlugin.Util.getString(this.requestMsg.getResultsMode()==ResultsMode.RESULTSET?"Request.no_result_set":"Request.result_set"));
//$NON-NLS-1$ //$NON-NLS-2$
}
// Create command context, used in rewriting, planning, and processing
@@ -288,7 +290,7 @@
private Command parseCommand() throws QueryParserException {
String[] commands = requestMsg.getCommands();
ParseInfo parseInfo = createParseInfo(this.requestMsg);
- if (!requestMsg.isBatchedUpdate()) {
+ if (requestMsg.isPreparedStatement() || requestMsg.isCallableStatement() ||
!requestMsg.isBatchedUpdate()) {
String commandStr = commands[0];
return QueryParser.getQueryParser().parseCommand(commandStr, parseInfo);
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -33,8 +33,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.teiid.ContainerHelper;
-import org.teiid.ContainerUtil;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
import com.metamatrix.api.exception.MetaMatrixComponentException;
@@ -122,6 +120,9 @@
private TupleBatch savedBatch;
private Map<Integer, LobWorkItem> lobStreams = Collections.synchronizedMap(new
HashMap<Integer, LobWorkItem>(4));
+ /**The time when command begins processing on the server.*/
+ private long processingTimestamp = System.currentTimeMillis();
+
public RequestWorkItem(DQPCore dqpCore, RequestMessage requestMsg, Request request,
ResultsReceiver<ResultsMessage> receiver, RequestID requestID, DQPWorkContext
workContext) {
this.requestMsg = requestMsg;
this.requestID = requestID;
@@ -691,11 +692,14 @@
return dqpWorkContext;
}
+ public long getProcessingTimestamp() {
+ return processingTimestamp;
+ }
+
@Override
protected boolean assosiateSecurityContext() {
if (dqpWorkContext.getSubject() != null) {
- ContainerHelper helper =
ContainerUtil.lookup("teiid/container-helper");
- return
helper.assosiateSecurityContext(dqpWorkContext.getSecurityDomain(),dqpWorkContext.getSecurityContext());
+ return
dqpCore.getSecurityHelper().assosiateSecurityContext(dqpWorkContext.getSecurityDomain(),dqpWorkContext.getSecurityContext());
}
return false;
}
@@ -703,8 +707,7 @@
@Override
protected void clearSecurityContext() {
if (dqpWorkContext.getSubject() != null) {
- ContainerHelper helper =
ContainerUtil.lookup("teiid/container-helper");
- helper.clearSecurityContext(dqpWorkContext.getSecurityDomain());
+
dqpCore.getSecurityHelper().clearSecurityContext(dqpWorkContext.getSecurityDomain());
}
}
}
\ No newline at end of file
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -194,12 +194,11 @@
return new Xid[0];
}
- return new Xid[0];
-// try {
-// return this.provider.getXATerminator().recover(flag);
-// } catch (XAException e) {
-// throw new XATransactionException(e);
-// }
+ try {
+ return this.provider.getXATerminator().recover(flag);
+ } catch (XAException e) {
+ throw new XATransactionException(e);
+ }
}
/**
Modified:
branches/JCA/engine/src/test/java/com/metamatrix/dqp/message/TestAtomicRequestMessage.java
===================================================================
---
branches/JCA/engine/src/test/java/com/metamatrix/dqp/message/TestAtomicRequestMessage.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/test/java/com/metamatrix/dqp/message/TestAtomicRequestMessage.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -22,8 +22,6 @@
package com.metamatrix.dqp.message;
-import java.util.Date;
-
import junit.framework.TestCase;
import org.teiid.dqp.internal.datamgr.language.TestQueryImpl;
@@ -50,7 +48,6 @@
message.setCommand(TestQueryImpl.helpExample());
message.setFetchSize(100);
message.setPartialResults(true);
- message.setProcessingTimestamp(new Date(12345678L));
message.setRequestID(new RequestID(5000L));
//AtomicRequestMessage-specific stuff
@@ -59,12 +56,13 @@
}
public void testSerialize() throws Exception {
- AtomicRequestMessage copy = UnitTestUtil.helpSerialize(example());
+ AtomicRequestMessage example = example();
+ AtomicRequestMessage copy = UnitTestUtil.helpSerialize(example);
assertEquals(TestQueryImpl.helpExample(), copy.getCommand());
assertEquals(100, copy.getFetchSize());
- assertEquals(new Date(12345678L), copy.getProcessingTimestamp());
+ assertEquals(example.getProcessingTimestamp(), copy.getProcessingTimestamp());
assertEquals(new RequestID(5000L), copy.getRequestID());
assertEquals("2", copy.getWorkContext().getConnectionID());
//$NON-NLS-1$
//AtomicRequestMessage-specific stuff
Deleted:
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeWorkManager.java
===================================================================
---
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeWorkManager.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/FakeWorkManager.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -1,62 +0,0 @@
-package org.teiid.dqp.internal.datamgr.impl;
-
-import javax.resource.spi.work.ExecutionContext;
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkEvent;
-import javax.resource.spi.work.WorkException;
-import javax.resource.spi.work.WorkListener;
-import javax.resource.spi.work.WorkManager;
-
-import org.mockito.Mockito;
-
-public class FakeWorkManager implements WorkManager {
- Thread t;
- @Override
- public void doWork(Work arg0) throws WorkException {
- execute(arg0);
- }
-
- @Override
- public void doWork(Work arg0, long arg1, ExecutionContext arg2, WorkListener arg3)
throws WorkException {
- execute(arg0);
- }
-
- @Override
- public void scheduleWork(Work arg0) throws WorkException {
- execute(arg0);
- }
-
- @Override
- public void scheduleWork(Work arg0, long arg1, ExecutionContext arg2, WorkListener arg3)
throws WorkException {
- execute(arg0);
- }
-
- @Override
- public long startWork(Work arg0) throws WorkException {
- execute(arg0);
- return 0;
- }
-
- @Override
- public long startWork(Work arg0, long arg1, ExecutionContext arg2, WorkListener arg3)
throws WorkException {
- execute(arg0);
- return 0;
- }
-
- void execute(Work arg0) {
- if (arg0 instanceof WorkListener) {
- WorkListener wl = (WorkListener)arg0;
- wl.workAccepted(Mockito.mock(WorkEvent.class));
- wl.workStarted(Mockito.mock(WorkEvent.class));
- }
-
- t = new Thread(arg0);
- t.start();
-
- if (arg0 instanceof WorkListener) {
- WorkListener wl = (WorkListener)arg0;
- wl.workCompleted(Mockito.mock(WorkEvent.class));
- }
- }
-
-}
Modified:
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java
===================================================================
---
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -39,6 +39,7 @@
import org.teiid.dqp.internal.datamgr.impl.TestConnectorWorkItem.QueueResultsReceiver;
import com.metamatrix.common.application.exception.ApplicationLifecycleException;
+import com.metamatrix.common.queue.FakeWorkManager;
import com.metamatrix.dqp.message.AtomicRequestMessage;
/**
Modified:
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorStateManager.java
===================================================================
---
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorStateManager.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorStateManager.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -37,6 +37,7 @@
import org.teiid.connector.api.ConnectorEnvironment;
import org.teiid.dqp.internal.datamgr.impl.TestConnectorWorkItem.QueueResultsReceiver;
+import com.metamatrix.common.queue.FakeWorkManager;
import com.metamatrix.dqp.client.ResultsFuture;
import com.metamatrix.dqp.message.AtomicRequestID;
import com.metamatrix.dqp.message.AtomicRequestMessage;
Modified:
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
===================================================================
---
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -49,6 +49,7 @@
import com.metamatrix.common.comm.api.ResultsReceiver;
import com.metamatrix.common.log.LogManager;
+import com.metamatrix.common.queue.FakeWorkManager;
import com.metamatrix.dqp.client.ResultsFuture;
import com.metamatrix.dqp.message.AtomicRequestMessage;
import com.metamatrix.dqp.message.AtomicResultsMessage;
@@ -144,7 +145,7 @@
// processing will close
assertFalse(state.isDoneProcessing());
- wm.doWork(state);
+ wm.doWork(state, 0, null, state);
AtomicResultsMessage arm = resultsFuture.get(1000, TimeUnit.MILLISECONDS);
@@ -273,9 +274,7 @@
state.requestClose();
assertFalse(resultsFuture.isDone());
- wm.doWork(state);
-
- wm.t.join();
+ wm.doWork(state, 0, null, state);
AtomicResultsMessage arm = resultsFuture.get(1000,
TimeUnit.MILLISECONDS);
@@ -292,18 +291,14 @@
FakeQueuingAsynchConnectorWorkItem state = new
FakeQueuingAsynchConnectorWorkItem(request, manager, resultsReceiver, wm);
- wm.doWork(state);
-
- wm.t.join();
+ wm.doWork(state, 0, null, state);
assertFalse(state.isDoneProcessing());
connector.setReturnsFinalBatch(true);
state.requestMore();
- wm.doWork(state);
+ wm.doWork(state, 0, null, state);
- wm.t.join();
-
assertTrue(state.isDoneProcessing());
assertEquals(3, resultsReceiver.results.size());
@@ -318,9 +313,7 @@
FakeWorkManager wm = new FakeWorkManager();
FakeQueuingAsynchConnectorWorkItem state = new
FakeQueuingAsynchConnectorWorkItem(request, manager, resultsReceiver, wm);
- wm.doWork(state);
-
- wm.t.join();
+ wm.doWork(state, 0, null, state);
assertFalse(state.isDoneProcessing());
Modified:
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
---
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -38,9 +38,9 @@
import org.teiid.adminapi.Visibility;
import org.teiid.dqp.internal.datamgr.impl.ConnectorManagerRepository;
import org.teiid.dqp.internal.datamgr.impl.FakeTransactionService;
-import org.teiid.dqp.internal.datamgr.impl.FakeWorkManager;
import com.metamatrix.api.exception.query.QueryResolverException;
+import com.metamatrix.common.queue.FakeWorkManager;
import com.metamatrix.dqp.message.RequestMessage;
import com.metamatrix.dqp.message.ResultsMessage;
import com.metamatrix.dqp.service.AutoGenDataService;
@@ -49,14 +49,13 @@
import com.metamatrix.platform.security.api.SessionToken;
import com.metamatrix.query.unittest.FakeMetadataFactory;
-
public class TestDQPCore {
private DQPCore core;
@Before public void setUp() throws Exception {
DQPWorkContext context =
FakeMetadataFactory.buildWorkContext(FakeMetadataFactory.exampleBQTCached(),
FakeMetadataFactory.exampleBQTVDB());
- context.getVDB().getModel("BQT3").setVisibility(Visibility.PRIVATE);
+ context.getVDB().getModel("BQT3").setVisibility(Visibility.PRIVATE);
//$NON-NLS-1$
ConnectorManagerRepository repo =
Mockito.mock(ConnectorManagerRepository.class);
Mockito.stub(repo.getConnectorManager(Mockito.anyString())).toReturn(new
AutoGenDataService());
@@ -77,7 +76,6 @@
public RequestMessage exampleRequestMessage(String sql) {
RequestMessage msg = new RequestMessage(sql);
- msg.setCallableStatement(false);
msg.setCursorType(ResultSet.TYPE_SCROLL_INSENSITIVE);
msg.setFetchSize(10);
msg.setPartialResults(false);
Modified:
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java
===================================================================
---
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -86,8 +86,6 @@
private RequestID addRequest(DQPCore rm, String sessionId, int executionId) {
RequestMessage r0 = new RequestMessage("test command"); //$NON-NLS-1$
RequestID id = new RequestID(sessionId, executionId);
- r0.markSubmissionStart();
- r0.markProcessingStart();
addRequest(rm, r0, id, null, null);
return id;
}
Modified:
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
---
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -33,10 +33,10 @@
import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
import org.teiid.dqp.internal.datamgr.impl.ConnectorManagerRepository;
import org.teiid.dqp.internal.datamgr.impl.FakeTransactionService;
-import org.teiid.dqp.internal.datamgr.impl.FakeWorkManager;
import com.metamatrix.api.exception.MetaMatrixException;
import com.metamatrix.common.comm.api.ResultsReceiver;
+import com.metamatrix.common.queue.FakeWorkManager;
import com.metamatrix.dqp.message.AtomicRequestID;
import com.metamatrix.dqp.message.AtomicRequestMessage;
import com.metamatrix.dqp.message.AtomicResultsMessage;
Modified:
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java
===================================================================
---
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -42,6 +42,7 @@
import com.metamatrix.api.exception.query.QueryValidatorException;
import com.metamatrix.common.buffer.BufferManagerFactory;
import com.metamatrix.dqp.message.RequestMessage;
+import com.metamatrix.dqp.message.RequestMessage.StatementType;
import com.metamatrix.dqp.service.AutoGenDataService;
import com.metamatrix.platform.security.api.SessionToken;
import com.metamatrix.query.metadata.QueryMetadataInterface;
@@ -232,11 +233,14 @@
//Create Request
RequestMessage request = new RequestMessage(preparedSql);
- request.setPreparedStatement(true);
- request.setCallableStatement(callableStatement);
+ if (callableStatement) {
+ request.setStatementType(StatementType.CALLABLE);
+ } else {
+ request.setStatementType(StatementType.PREPARED);
+ }
request.setParameterValues(values);
if (values != null && values.size() > 0 && values.get(0) instanceof
List) {
- request.setPreparedBatchUpdate(true);
+ request.setBatchedUpdate(true);
}
if (limitResults) {
request.setRowLimit(1);
Modified:
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
===================================================================
---
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -39,6 +39,7 @@
import com.metamatrix.api.exception.query.QueryValidatorException;
import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.dqp.message.RequestMessage;
+import com.metamatrix.dqp.message.RequestMessage.StatementType;
import com.metamatrix.dqp.service.AutoGenDataService;
import com.metamatrix.query.analysis.AnalysisRecord;
import com.metamatrix.query.metadata.QueryMetadataInterface;
@@ -190,7 +191,7 @@
RequestMessage message = new RequestMessage(QUERY);
DQPWorkContext workContext = FakeMetadataFactory.buildWorkContext(metadata,
FakeMetadataFactory.example1VDB());
- message.setPreparedStatement(true);
+ message.setStatementType(StatementType.PREPARED);
message.setParameterValues(new ArrayList());
helpProcessMessage(message, cache, workContext);
@@ -198,7 +199,7 @@
//Try again, now that plan is already cached.
//If this doesn't throw an exception, assume it was successful.
message = new RequestMessage(QUERY);
- message.setPreparedStatement(true);
+ message.setStatementType(StatementType.PREPARED);
message.setParameterValues(new ArrayList());
helpProcessMessage(message, cache, workContext);
Deleted:
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/JBossContainerHelper.java
===================================================================
---
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/JBossContainerHelper.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/JBossContainerHelper.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -1,126 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-package org.teiid.jboss;
-
-import java.io.Serializable;
-import java.security.Principal;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.security.auth.Subject;
-
-import org.jboss.security.SecurityContext;
-import org.teiid.ContainerHelper;
-import org.teiid.deployers.VDBRepository;
-import org.teiid.dqp.internal.datamgr.impl.ConnectorManagerRepository;
-import org.teiid.dqp.internal.process.DQPManagementView;
-
-import com.metamatrix.dqp.service.AuthorizationService;
-import com.metamatrix.dqp.service.BufferService;
-import com.metamatrix.platform.security.api.service.SessionService;
-
-public class JBossContainerHelper implements ContainerHelper, Serializable{
- private transient Map<String, Object> attachments =
Collections.synchronizedMap(new HashMap<String, Object>());
-
- private static final long serialVersionUID = 1318670652523708608L;
-
- @Override
- public boolean assosiateSecurityContext(String securityDomain, Object newContext) {
- SecurityContext context = SecurityActions.getSecurityContext();
- if (context == null || (!context.getSecurityDomain().equals(securityDomain) &&
newContext != null)) {
- SecurityActions.setSecurityContext((SecurityContext)newContext);
- return true;
- }
- return false;
- }
-
- @Override
- public void clearSecurityContext(String securityDomain) {
- SecurityContext sc = SecurityActions.getSecurityContext();
- if (sc != null && sc.getSecurityDomain().equals(securityDomain)) {
- SecurityActions.clearSecurityContext();
- }
- }
-
- @Override
- public Object getSecurityContext(String securityDomain) {
- SecurityContext sc = SecurityActions.getSecurityContext();
- if (sc != null && sc.getSecurityDomain().equals(securityDomain)) {
- return sc;
- }
- return null;
- }
-
- @Override
- public Object createSecurityContext(String securityDomain, Principal p, Object
credentials, Subject subject) {
- SecurityActions.pushSecurityContext(p, credentials, subject, securityDomain);
- return getSecurityContext(securityDomain);
- }
-
- public <T> T getService(Class<T> type) {
- if (type == null) {
- throw new IllegalArgumentException("Null type");
- }
- Object result = this.attachments.get(type.getName());
- if (result == null) {
- return null;
- }
- return type.cast(result);
- }
-
- protected <T> T addAttchment(Class<T> type, T attachment) {
- if (type == null) {
- throw new IllegalArgumentException("Null type");
- }
- Object result = this.attachments.put(type.getName(), attachment);
- if (result == null) {
- return null;
- }
- return type.cast(result);
-
- }
-
- public void setVDBRepository(VDBRepository repo) {
- addAttchment(VDBRepository.class, repo);
- }
-
- public void setAuthorizationService(AuthorizationService service) {
- addAttchment(AuthorizationService.class, service);
- }
-
- public void setSessionService(SessionService service) {
- addAttchment(SessionService.class, service);
- }
-
- public void setBufferService(BufferService service) {
- addAttchment(BufferService.class, service);
- }
-
- public void setConnectorManagerRepository(ConnectorManagerRepository repo) {
- addAttchment(ConnectorManagerRepository.class, repo);
- }
-
- public void setDQPManager(DQPManagementView holder) {
- addAttchment(DQPManagementView.class, holder);
- }
-}
Added:
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/JBossSecurityHelper.java
===================================================================
--- branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/JBossSecurityHelper.java
(rev 0)
+++
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/JBossSecurityHelper.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.jboss;
+
+import java.io.Serializable;
+import java.security.Principal;
+
+import javax.security.auth.Subject;
+
+import org.jboss.security.SecurityContext;
+import org.teiid.SecurityHelper;
+
+public class JBossSecurityHelper implements SecurityHelper, Serializable {
+
+ @Override
+ public boolean assosiateSecurityContext(String securityDomain, Object newContext) {
+ SecurityContext context = SecurityActions.getSecurityContext();
+ if (context == null || (!context.getSecurityDomain().equals(securityDomain) &&
newContext != null)) {
+ SecurityActions.setSecurityContext((SecurityContext)newContext);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void clearSecurityContext(String securityDomain) {
+ SecurityContext sc = SecurityActions.getSecurityContext();
+ if (sc != null && sc.getSecurityDomain().equals(securityDomain)) {
+ SecurityActions.clearSecurityContext();
+ }
+ }
+
+ @Override
+ public Object getSecurityContext(String securityDomain) {
+ SecurityContext sc = SecurityActions.getSecurityContext();
+ if (sc != null && sc.getSecurityDomain().equals(securityDomain)) {
+ return sc;
+ }
+ return null;
+ }
+
+ @Override
+ public Object createSecurityContext(String securityDomain, Principal p, Object
credentials, Subject subject) {
+ SecurityActions.pushSecurityContext(p, credentials, subject, securityDomain);
+ return getSecurityContext(securityDomain);
+ }
+
+}
Property changes on:
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/JBossSecurityHelper.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectorBindingDeployer.java
===================================================================
---
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectorBindingDeployer.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectorBindingDeployer.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -34,6 +34,7 @@
import org.jboss.managed.api.factory.ManagedObjectFactory;
import org.jboss.resource.metadata.mcf.ManagedConnectionFactoryDeploymentGroup;
import org.jboss.resource.metadata.mcf.ManagedConnectionFactoryDeploymentMetaData;
+import org.teiid.SecurityHelper;
import org.teiid.connector.api.ConnectorException;
import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
import org.teiid.dqp.internal.datamgr.impl.ConnectorManagerRepository;
@@ -41,6 +42,7 @@
public class ConnectorBindingDeployer extends
AbstractSimpleRealDeployer<ManagedConnectionFactoryDeploymentGroup> implements
ManagedObjectCreator {
protected Logger log = Logger.getLogger(getClass());
private ManagedObjectFactory mof;
+ private SecurityHelper securityHelper;
private ConnectorManagerRepository connectorManagerRepository;
@@ -83,7 +85,7 @@
ConnectorManager createConnectorManger(String deployedConnectorName, int maxThreads)
{
- ConnectorManager mgr = new ConnectorManager(deployedConnectorName, maxThreads);
+ ConnectorManager mgr = new ConnectorManager(deployedConnectorName, maxThreads,
securityHelper);
return mgr;
}
@@ -130,4 +132,8 @@
public void setManagedObjectFactory(ManagedObjectFactory mof) {
this.mof = mof;
}
+
+ public void setSecurityHelper(SecurityHelper securityHelper) {
+ this.securityHelper = securityHelper;
+ }
}
Modified:
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
---
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -21,11 +21,16 @@
*/
package org.teiid.jboss.deployers;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.Date;
import java.util.List;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
import javax.resource.spi.XATerminator;
-import javax.resource.spi.work.WorkManager;
import org.jboss.deployers.spi.DeploymentException;
import org.jboss.deployers.spi.deployer.helpers.AbstractSimpleRealDeployer;
@@ -33,25 +38,29 @@
import org.jboss.logging.Logger;
import org.jboss.resource.metadata.mcf.ManagedConnectionFactoryDeploymentGroup;
import org.jboss.resource.metadata.mcf.ManagedConnectionFactoryDeploymentMetaData;
-import org.teiid.ContainerHelper;
-import org.teiid.ContainerUtil;
+import org.teiid.SecurityHelper;
import org.teiid.adminapi.Admin;
import org.teiid.adminapi.AdminComponentException;
import org.teiid.adminapi.jboss.AdminProvider;
import org.teiid.dqp.internal.datamgr.impl.ConnectorManagerRepository;
+import org.teiid.dqp.internal.process.ContainerServiceProvider;
import org.teiid.dqp.internal.process.DQPConfiguration;
import org.teiid.dqp.internal.process.DQPCore;
import org.teiid.dqp.internal.process.DQPManagementView;
+import org.teiid.dqp.internal.process.DQPWorkContext;
import org.teiid.dqp.internal.transaction.ContainerTransactionProvider;
import org.teiid.dqp.internal.transaction.TransactionServerImpl;
import org.teiid.dqp.internal.transaction.XidFactory;
+import org.teiid.transport.ClientServiceRegistry;
+import org.teiid.transport.ClientServiceRegistryImpl;
import org.teiid.transport.LogonImpl;
import org.teiid.transport.SocketConfiguration;
import org.teiid.transport.SocketTransport;
+import com.metamatrix.api.exception.ComponentNotFoundException;
+import com.metamatrix.client.ExceptionUtil;
import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.log.LogManager;
-import com.metamatrix.core.MetaMatrixRuntimeException;
import com.metamatrix.core.log.MessageLevel;
import com.metamatrix.dqp.client.ClientSideDQP;
import com.metamatrix.dqp.service.AuthorizationService;
@@ -59,21 +68,39 @@
import com.metamatrix.dqp.service.TransactionService;
import com.metamatrix.dqp.util.LogConstants;
import com.metamatrix.platform.security.api.ILogon;
+import com.metamatrix.platform.security.api.SessionToken;
import com.metamatrix.platform.security.api.service.SessionService;
-public class RuntimeEngineDeployer extends
AbstractSimpleRealDeployer<ManagedConnectionFactoryDeploymentGroup> {
- private static final String TEIID_RUNTIME = "java:teiid/runtime-engine";
+public class RuntimeEngineDeployer extends
AbstractSimpleRealDeployer<ManagedConnectionFactoryDeploymentGroup> implements
ClientServiceRegistry {
+ private static final String TEIID_RUNTIME = "java:teiid/runtime-engine";
//$NON-NLS-1$
protected Logger log = Logger.getLogger(getClass());
- private ContainerHelper containerHelper;
+ //injected state
+ private AuthorizationService authorizationService;
+ private SecurityHelper securityHelper;
+ private SessionService sessionService;
+ private BufferService bufferService;
+ private ConnectorManagerRepository connectorManagerRepository;
+ private DQPManagementView dqpManagementView;
private SocketTransport jdbcSocketTransport;
private SocketConfiguration jdbcSocketConfiguration;
private SocketTransport adminSocketTransport;
private SocketConfiguration adminSocketConfiguration;
+ //locally constructed client services
+ private DQPCore dqp;
+ private ILogon logon;
+ private Admin admin;
+ private ClientServiceRegistryImpl csr = new ClientServiceRegistryImpl();
public RuntimeEngineDeployer() {
super(ManagedConnectionFactoryDeploymentGroup.class);
setRelativeOrder(3000);
}
+
+ @Override
+ public <T> T getClientService(Class<T> iface)
+ throws ComponentNotFoundException {
+ return this.csr.getClientService(iface);
+ }
@Override
public void deploy(DeploymentUnit unit, ManagedConnectionFactoryDeploymentGroup group)
throws DeploymentException {
@@ -81,13 +108,12 @@
for (ManagedConnectionFactoryDeploymentMetaData data : deployments) {
String connectorDefinition = data.getConnectionDefinition();
- if
(connectorDefinition.equals("com.metamatrix.common.comm.api.ServerConnectionFactory"))
{
+ if (connectorDefinition.equals(ServerConnectionFactory.class.getName())) {
startEngine();
log.info("Teiid Engine Started = " + new
Date(System.currentTimeMillis()).toString()); //$NON-NLS-1$
}
}
}
-
@Override
public void undeploy(DeploymentUnit unit, ManagedConnectionFactoryDeploymentGroup group)
{
@@ -96,15 +122,15 @@
for (ManagedConnectionFactoryDeploymentMetaData data : deployments) {
String connectorDefinition = data.getConnectionDefinition();
- if
(connectorDefinition.equals("com.metamatrix.common.comm.api.ServerConnectionFactory"))
{
+ if (connectorDefinition.equals(ServerConnectionFactory.class.getName())) {
stopEngine();
log.info("Teiid Engine Stopped = " + new
Date(System.currentTimeMillis()).toString()); //$NON-NLS-1$
}
}
}
- public void setContainerHelper(ContainerHelper helper) {
- this.containerHelper = helper;
+ public void setSecurityHelper(SecurityHelper securityHelper) {
+ this.securityHelper = securityHelper;
}
public void setJdbcSocketConfiguration(SocketConfiguration socketConfig) {
@@ -115,11 +141,32 @@
this.adminSocketConfiguration = socketConfig;
}
+ public void setAuthorizationService(AuthorizationService service) {
+ this.authorizationService = service;
+ }
+
+ public void setSessionService(SessionService service) {
+ this.sessionService = service;
+ }
+
+ public void setBufferService(BufferService service) {
+ this.bufferService = service;
+ }
+
+ public void setConnectorManagerRepository(ConnectorManagerRepository repo) {
+ this.connectorManagerRepository = repo;
+ }
+
+ public void setDQPManager(DQPManagementView holder) {
+ this.dqpManagementView = holder;
+ }
+
private void startEngine() throws DeploymentException {
- ServerConnectionFactory scf = null;
+ ContainerServiceProvider scf = null;
try {
- scf = ContainerUtil.lookup(TEIID_RUNTIME);
- } catch (MetaMatrixRuntimeException e) {
+ InitialContext ic = new InitialContext();
+ scf = (ContainerServiceProvider)ic.lookup(TEIID_RUNTIME);
+ } catch (NamingException e) {
throw new DeploymentException(e.getMessage());
}
@@ -127,33 +174,31 @@
createClientServices(scf);
// Start the socket transport
- DQPConfiguration config = scf.getService(DQPConfiguration.class);
+ DQPConfiguration config = scf.getDQPConfiguration();
if (config.getBindAddress() != null) {
this.jdbcSocketConfiguration.setBindAddress(config.getBindAddress());
}
if (config.getPortNumber() > 0) {
this.jdbcSocketConfiguration.setPortNumber(config.getPortNumber());
}
+
+ csr.registerClientService(ILogon.class, proxyService(ILogon.class, logon),
com.metamatrix.common.util.LogConstants.CTX_SERVER);
+ csr.registerClientService(ClientSideDQP.class, proxyService(ClientSideDQP.class,
dqp), LogConstants.CTX_QUERY_SERVICE);
+ csr.registerClientService(Admin.class, proxyService(Admin.class, admin),
LogConstants.CTX_ADMIN_API);
- this.jdbcSocketTransport = new SocketTransport(this.jdbcSocketConfiguration);
- this.jdbcSocketTransport.setWorkManager(scf.getService(WorkManager.class));
+ this.jdbcSocketTransport = new SocketTransport(this.jdbcSocketConfiguration, csr);
this.jdbcSocketTransport.start();
log.info("Teiid JDBC = " +
(this.jdbcSocketConfiguration.getSSLConfiguration().isSslEnabled()?"mms://":"mm://")+this.jdbcSocketConfiguration.getHostAddress().getHostName()+":"+this.jdbcSocketConfiguration.getPortNumber());
//$NON-NLS-1$
-
- this.adminSocketTransport = new SocketTransport(this.adminSocketConfiguration);
- this.adminSocketTransport.setWorkManager(scf.getService(WorkManager.class));
+
+ this.adminSocketTransport = new SocketTransport(this.adminSocketConfiguration,
csr);
this.adminSocketTransport.start();
log.info("Teiid Admin = " +
(this.adminSocketConfiguration.getSSLConfiguration().isSslEnabled()?"mms://":"mm://")+this.adminSocketConfiguration.getHostAddress().getHostName()+":"+this.adminSocketConfiguration.getPortNumber());
//$NON-NLS-1$
}
private void stopEngine() {
-
- try {
- ServerConnectionFactory scf = ContainerUtil.lookup(TEIID_RUNTIME);
- ClientSideDQP dqp = scf.getService(ClientSideDQP.class);
- ((DQPCore)dqp).stop();
- } catch(MetaMatrixRuntimeException e) {
- // this bean is already shutdown
+ if (dqp != null) {
+ dqp.stop();
+ dqp = null;
}
// Stop socket transport(s)
@@ -168,29 +213,25 @@
}
}
- private void createClientServices(ServerConnectionFactory scf) throws
DeploymentException {
- DQPCore dqp = new DQPCore();
- dqp.setTransactionService(getTransactionService("localhost",
scf.getService(XATerminator.class)));
- dqp.setWorkManager(scf.getService(WorkManager.class));
-
dqp.setAuthorizationService(this.containerHelper.getService(AuthorizationService.class));
- dqp.setBufferService(this.containerHelper.getService(BufferService.class));
- dqp.setSessionService(this.containerHelper.getService(SessionService.class));
-
dqp.setConnectorManagerRepository(this.containerHelper.getService(ConnectorManagerRepository.class));
- dqp.start(scf.getService(DQPConfiguration.class));
+ private void createClientServices(ContainerServiceProvider scf) throws
DeploymentException {
+ dqp = new DQPCore();
+ dqp.setTransactionService(getTransactionService("localhost",
scf.getXATerminator())); //$NON-NLS-1$
+ dqp.setWorkManager(scf.getWorkManager());
+ dqp.setAuthorizationService(authorizationService);
+ dqp.setBufferService(bufferService);
+ dqp.setSessionService(sessionService);
+ dqp.setConnectorManagerRepository(connectorManagerRepository);
+ dqp.setSecurityHelper(this.securityHelper);
+ dqp.start(scf.getDQPConfiguration());
- DQPManagementView holder = this.containerHelper.getService(DQPManagementView.class);
- holder.setDQP(dqp);
+ dqpManagementView.setDQP(dqp);
- scf.registerClientService(ILogon.class, new LogonImpl(dqp.getSessionService(),
"teiid-cluster"), com.metamatrix.common.util.LogConstants.CTX_SERVER);
- scf.registerClientService(ClientSideDQP.class, dqp,
LogConstants.CTX_QUERY_SERVICE);
-
+ this.logon = new LogonImpl(dqp.getSessionService(), "teiid-cluster");
//$NON-NLS-1$
try {
- scf.registerClientService(Admin.class, AdminProvider.getLocal(),
LogConstants.CTX_ADMIN_API);
- } catch (AdminComponentException e) {
- throw new DeploymentException(e.getCause());
- }
-
- scf.registerClientService(ContainerHelper.class, this.containerHelper,
LogConstants.CTX_DQP);
+ this.admin = AdminProvider.getLocal();
+ } catch (AdminComponentException e) {
+ throw new DeploymentException(e.getCause());
+ }
}
private TransactionService getTransactionService(String processName, XATerminator
terminator) {
@@ -201,4 +242,48 @@
return (TransactionService)LogManager.createLoggingProxy(LogConstants.CTX_TXN_LOG,
txnService, new Class[] {TransactionService.class}, MessageLevel.DETAIL);
}
+ private <T> T proxyService(final Class<T> iface, final T instance) {
+
+ return iface.cast(Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]
{iface}, new InvocationHandler() {
+
+ public Object invoke(Object arg0, Method arg1, Object[] arg2) throws Throwable {
+
+ Throwable exception = null;
+ ClassLoader current = Thread.currentThread().getContextClassLoader();
+ try {
+ if (!(iface.equals(ILogon.class))) {
+ logon.assertIdentity(SessionToken.getSession());
+ assosiateSecurityContext();
+ }
+
+ return arg1.invoke(instance, arg2);
+ } catch (InvocationTargetException e) {
+ exception = e.getTargetException();
+ } catch(Throwable t){
+ exception = t;
+ } finally {
+ clearSecurityContext();
+ DQPWorkContext.releaseWorkContext();
+ Thread.currentThread().setContextClassLoader(current);
+ }
+ throw ExceptionUtil.convertException(arg1, exception);
+ }
+ }));
+ }
+
+ private boolean assosiateSecurityContext() {
+ DQPWorkContext context = DQPWorkContext.getWorkContext();
+ if (context.getSubject() != null) {
+ return securityHelper.assosiateSecurityContext(context.getSecurityDomain(),
context.getSecurityContext());
+ }
+ return false;
+ }
+
+ private void clearSecurityContext() {
+ DQPWorkContext context = DQPWorkContext.getWorkContext();
+ if (context.getSubject() != null) {
+ securityHelper.clearSecurityContext(context.getSecurityDomain());
+ }
+ }
+
}
Modified: branches/JCA/runtime/src/main/java/org/teiid/ConnectionInfo.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/ConnectionInfo.java 2010-02-22 18:48:13
UTC (rev 1866)
+++ branches/JCA/runtime/src/main/java/org/teiid/ConnectionInfo.java 2010-02-23 16:07:13
UTC (rev 1867)
@@ -26,14 +26,10 @@
import javax.resource.spi.ConnectionRequestInfo;
-import com.metamatrix.common.comm.ClientServiceRegistry;
-
public class ConnectionInfo implements ConnectionRequestInfo {
Properties properties;
- ClientServiceRegistry clientServices;
- public ConnectionInfo(Properties properties,ClientServiceRegistry clientServices) {
+ public ConnectionInfo(Properties properties) {
this.properties = properties;
- this.clientServices = clientServices;
}
}
Modified: branches/JCA/runtime/src/main/java/org/teiid/TeiidConnectionFactory.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/TeiidConnectionFactory.java 2010-02-22
18:48:13 UTC (rev 1866)
+++ branches/JCA/runtime/src/main/java/org/teiid/TeiidConnectionFactory.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -22,33 +22,23 @@
package org.teiid;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionManager;
import javax.resource.spi.XATerminator;
import javax.resource.spi.work.WorkManager;
-import org.teiid.adminapi.Admin;
+import org.teiid.dqp.internal.process.ContainerServiceProvider;
import org.teiid.dqp.internal.process.DQPConfiguration;
-import org.teiid.dqp.internal.process.DQPWorkContext;
-import com.metamatrix.client.ExceptionUtil;
import com.metamatrix.common.comm.api.ServerConnection;
import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ConnectionException;
import com.metamatrix.common.log.LogManager;
-import com.metamatrix.dqp.embedded.DQPEmbeddedPlugin;
import com.metamatrix.jdbc.LogConfigurationProvider;
import com.metamatrix.jdbc.LogListernerProvider;
-import com.metamatrix.platform.security.api.ILogon;
-import com.metamatrix.platform.security.api.SessionToken;
/**
@@ -56,14 +46,11 @@
* This is also responsible for initializing the DQP if the DQP instance is not
* already alive.
*/
-public class TeiidConnectionFactory implements ServerConnectionFactory {
+public class TeiidConnectionFactory implements ServerConnectionFactory,
ContainerServiceProvider {
private TeiidResourceAdapter ra;
private TeiidManagedConnectionFactory mcf;
private ConnectionManager cxManager;
- private ConcurrentHashMap<Class, Object> clientServices = new
ConcurrentHashMap<Class, Object>();
- private ConcurrentHashMap<Class, String> loggingContext = new
ConcurrentHashMap<Class, String>();
-
public TeiidConnectionFactory(TeiidResourceAdapter ra, TeiidManagedConnectionFactory
mcf, ConnectionManager cxmanager) {
this.ra = ra;
@@ -78,99 +65,27 @@
public ServerConnection getConnection(Properties connectionProperties) throws
CommunicationException, ConnectionException {
try {
// this code will not be invoked as teiid does not use managed connection.
- return (ServerConnection)cxManager.allocateConnection(this.mcf, new
ConnectionInfo(connectionProperties, null));
+ return (ServerConnection)cxManager.allocateConnection(this.mcf, new
ConnectionInfo(connectionProperties));
} catch (ResourceException e) {
throw new ConnectionException(e);
}
}
- public <T> T getService(Class<T> type) {
- if (type.equals(WorkManager.class)) {
- return type.cast(this.ra.getWorkManager());
- }
-
- if (type.equals(XATerminator.class)) {
- return type.cast(this.ra.getXATerminator());
- }
-
- if (type.equals(DQPConfiguration.class)) {
- return type.cast(mcf);
- }
-
- // see if there are any client services.
- Object service = this.clientServices.get(type);
- if (service != null) {
- return type.cast(proxyService(type, type.cast(service)));
- }
-
- return null;
+ @Override
+ public WorkManager getWorkManager() {
+ return this.ra.getWorkManager();
}
@Override
- public <T> void registerClientService(Class<T> type, T instance, String
loggingContext) {
- this.clientServices.put(type, instance);
- this.loggingContext.put(type, loggingContext);
+ public XATerminator getXATerminator() {
+ return this.ra.getXATerminator();
}
-
- @Override
- public <T> String getLoggingContextForService(Class<T> type) {
- return this.loggingContext.get(type);
- }
- private <T> T proxyService(final Class<T> iface, final T instance) {
-
- return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]
{iface}, new InvocationHandler() {
-
- public Object invoke(Object arg0, Method arg1, Object[] arg2) throws Throwable {
-
- Throwable exception = null;
- ClassLoader current = Thread.currentThread().getContextClassLoader();
- try {
- if (!(iface.equals(ILogon.class))) {
- ((ILogon)clientServices.get(ILogon.class)).assertIdentity(SessionToken.getSession());
- assosiateSecurityContext();
- }
-
- // if this is admin session, do not allow any interface other then
"Admin"
- if (DQPWorkContext.getWorkContext().isAdmin()) {
- if (!(iface.equals(Admin.class)) &&
!arg1.getName().equals("ping")) {
- DQPWorkContext context = DQPWorkContext.getWorkContext();
- throw new
IllegalAccessException(DQPEmbeddedPlugin.Util.getString("Illegal_access_on_admin",
context.getSubject(), context.getClientAddress()));
- }
- }
-
- return arg1.invoke(instance, arg2);
- } catch (InvocationTargetException e) {
- exception = e.getTargetException();
- } catch(Throwable t){
- exception = t;
- } finally {
- clearSecurityContext();
- DQPWorkContext.releaseWorkContext();
- Thread.currentThread().setContextClassLoader(current);
- }
- throw ExceptionUtil.convertException(arg1, exception);
- }
- });
- }
-
- private boolean assosiateSecurityContext() {
- DQPWorkContext context = DQPWorkContext.getWorkContext();
- if (context.getSubject() != null) {
- ContainerHelper helper =
(ContainerHelper)this.clientServices.get(ContainerHelper.class);
- return helper.assosiateSecurityContext(context.getSecurityDomain(),
context.getSecurityContext());
- }
- return false;
- }
+ @Override
+ public DQPConfiguration getDQPConfiguration() {
+ return mcf;
+ }
- private void clearSecurityContext() {
- DQPWorkContext context = DQPWorkContext.getWorkContext();
- if (context.getSubject() != null) {
- ContainerHelper helper =
(ContainerHelper)this.clientServices.get(ContainerHelper.class);
- helper.clearSecurityContext(context.getSecurityDomain());
- }
- }
-
// public MMProcess getProcess() {
//
// Properties props = this.bootProperties;
Modified: branches/JCA/runtime/src/main/java/org/teiid/TeiidManagedConnection.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/TeiidManagedConnection.java 2010-02-22
18:48:13 UTC (rev 1866)
+++ branches/JCA/runtime/src/main/java/org/teiid/TeiidManagedConnection.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -37,11 +37,7 @@
import javax.transaction.xa.XAResource;
import org.teiid.connector.api.ConnectorException;
-import org.teiid.transport.LocalServerConnection;
-import com.metamatrix.common.comm.exception.CommunicationException;
-import com.metamatrix.common.comm.exception.ConnectionException;
-
public class TeiidManagedConnection implements ManagedConnection {
protected final Collection<ConnectionEventListener> listeners = new
ArrayList<ConnectionEventListener>();
private PrintWriter log;
Deleted: branches/JCA/runtime/src/main/java/org/teiid/services/MembershipServiceImpl.java
===================================================================
---
branches/JCA/runtime/src/main/java/org/teiid/services/MembershipServiceImpl.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/runtime/src/main/java/org/teiid/services/MembershipServiceImpl.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -1,222 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.services;
-
-import java.io.IOException;
-import java.security.Principal;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.LoginContext;
-import javax.security.auth.login.LoginException;
-
-import org.teiid.ContainerHelper;
-import org.teiid.ContainerUtil;
-
-import com.metamatrix.common.log.LogManager;
-import com.metamatrix.common.util.LogConstants;
-import com.metamatrix.dqp.embedded.DQPEmbeddedPlugin;
-import com.metamatrix.platform.security.api.Credentials;
-
-/**
- * This class serves as the primary implementation of the
- * Membership Service. Based on the security domains specified this class delegates the
responsibility of
- * authenticating user to those security domains in the order they are defined.
- */
-public class MembershipServiceImpl {
- public static final String AT = "@"; //$NON-NLS-1$
- private LoginContext loginContext;
- private String userName;
- private String securitydomain;
- private Object credentials;
-
- public void authenticateUser(String username, Credentials credential, String
applicationName, List<String> domains) throws LoginException {
-
- LogManager.logTrace(LogConstants.CTX_MEMBERSHIP, new Object[]
{"authenticateUser", username, applicationName}); //$NON-NLS-1$
-
- final String baseUsername = getBaseUsername(username);
- final char[] password = credential.getCredentialsAsCharArray();
-
- // If username specifies a domain (user@domain) only that domain is authenticated
against.
- // If username specifies no domain, then all domains are tried in order.
- for (String domain:getDomainsForUser(domains, username)) {
-
- try {
- CallbackHandler handler = new CallbackHandler() {
- @Override
- public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
- for (int i = 0; i < callbacks.length; i++) {
- if (callbacks[i] instanceof NameCallback) {
- NameCallback nc = (NameCallback)callbacks[i];
- nc.setName(baseUsername);
- } else if (callbacks[i] instanceof PasswordCallback) {
- PasswordCallback pc = (PasswordCallback)callbacks[i];
- pc.setPassword(password);
- credentials = password;
- } else {
- throw new UnsupportedCallbackException(callbacks[i], "Unrecognized
Callback");
- }
- }
- }
- };
-
- // this is the configured login for teiid
- this.loginContext = createLoginContext(domain,handler);
- this.loginContext.login();
- this.userName = baseUsername+AT+domain;
- this.securitydomain = domain;
-
- return;
- } catch (LoginException e) {
- LogManager.logDetail(LogConstants.CTX_MEMBERSHIP,e.getMessage());
- }
- }
- throw new
LoginException(DQPEmbeddedPlugin.Util.getString("SessionServiceImpl.The_username_0_and/or_password_are_incorrect",
username ));
- }
-
- protected LoginContext createLoginContext(String domain, CallbackHandler handler)
throws LoginException {
- return new LoginContext(domain, handler);
- }
-
- public LoginContext getLoginContext() {
- return this.loginContext;
- }
-
- public String getUserName() {
- return this.userName;
- }
-
- public String getSecurityDomain() {
- return this.securitydomain;
- }
-
- public Object getSecurityContext() {
- Object sc = null;
- if (this.loginContext != null) {
- ContainerHelper helper =
ContainerUtil.lookup("teiid/container-helper");
- sc = helper.getSecurityContext(this.securitydomain);
- if ( sc == null){
- Subject subject = this.loginContext.getSubject();
- Principal principal = null;
- for(Principal p:subject.getPrincipals()) {
- if (this.userName.startsWith(p.getName())) {
- principal = p;
- break;
- }
- }
- return helper.createSecurityContext(this.securitydomain, principal,
credentials, subject);
- }
- }
- return sc;
- }
-
- static String getBaseUsername(String username) {
- if (username == null) {
- return username;
- }
-
- int index = getQualifierIndex(username);
-
- String result = username;
-
- if (index != -1) {
- result = username.substring(0, index);
- }
-
- //strip the escape character from the remaining ats
- return result.replaceAll("\\\\"+AT, AT); //$NON-NLS-1$
- }
-
- static String escapeName(String name) {
- if (name == null) {
- return name;
- }
-
- return name.replaceAll(AT, "\\\\"+AT); //$NON-NLS-1$
- }
-
- static String getDomainName(String username) {
- if (username == null) {
- return username;
- }
-
- int index = getQualifierIndex(username);
-
- if (index != -1) {
- return username.substring(index + 1);
- }
-
- return null;
- }
-
- static int getQualifierIndex(String username) {
- int index = username.length();
- while ((index = username.lastIndexOf(AT, --index)) != -1) {
- if (index > 0 && username.charAt(index - 1) != '\\') {
- return index;
- }
- }
-
- return -1;
- }
-
- private Collection<String> getDomainsForUser(List<String> domains, String
username) {
- // If username is null, return all domains
- if (username == null) {
- return domains;
- }
-
- String domain = getDomainName(username);
-
- if (domain == null) {
- return domains;
- }
-
- // ------------------------------------------
- // Handle usernames having @ sign
- // ------------------------------------------
- String domainHolder = null;
- for (String d:domains) {
- if(d.equalsIgnoreCase(domain)) {
- domainHolder = d;
- break;
- }
- }
-
- if (domainHolder == null) {
- return Collections.EMPTY_LIST;
- }
-
- LinkedList result = new LinkedList();
- result.add(domainHolder);
- return result;
- }
-}
Modified: branches/JCA/runtime/src/main/java/org/teiid/services/SessionServiceImpl.java
===================================================================
---
branches/JCA/runtime/src/main/java/org/teiid/services/SessionServiceImpl.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/runtime/src/main/java/org/teiid/services/SessionServiceImpl.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -22,11 +22,9 @@
package org.teiid.services;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -39,9 +37,11 @@
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
+import org.teiid.SecurityHelper;
import org.teiid.adminapi.impl.SessionMetadata;
import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.deployers.VDBRepository;
+import org.teiid.dqp.internal.process.DQPCore;
import com.metamatrix.admin.api.exception.security.InvalidSessionException;
import com.metamatrix.api.exception.security.SessionServiceException;
@@ -50,7 +50,6 @@
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.util.LogConstants;
import com.metamatrix.core.util.ArgCheck;
-import com.metamatrix.core.util.StringUtil;
import com.metamatrix.dqp.embedded.DQPEmbeddedPlugin;
import com.metamatrix.metadata.runtime.api.VirtualDatabaseException;
import com.metamatrix.platform.security.api.Credentials;
@@ -60,9 +59,7 @@
/**
* This class serves as the primary implementation of the Session Service.
*/
-public class SessionServiceImpl implements SessionService, Serializable {
- private static final long serialVersionUID = 3366022966048148299L;
-
+public class SessionServiceImpl implements SessionService {
public static final String SECURITY_DOMAINS = "securitydomains";
//$NON-NLS-1$
/*
@@ -70,10 +67,16 @@
*/
private long sessionMaxLimit = DEFAULT_MAX_SESSIONS;
private long sessionExpirationTimeLimit = DEFAULT_SESSION_EXPIRATION;
+ /*
+ * Injected state
+ */
private VDBRepository vdbRepository;
+ private SecurityHelper securityHelper;
+ private DQPCore dqp;
+
private Map<Long, SessionMetadata> sessionCache = new
ConcurrentHashMap<Long, SessionMetadata>();
- private transient Timer sessionMonitor = new Timer("SessionMonitor",
true);
+ private Timer sessionMonitor = new Timer("SessionMonitor", true);
//$NON-NLS-1$
private AtomicLong idSequence = new AtomicLong();
private LinkedList<String> securityDomains = new LinkedList<String>();
private LinkedList<String> adminSecurityDomains = new
LinkedList<String>();
@@ -107,6 +110,13 @@
if (info == null) {
throw new
InvalidSessionException(DQPEmbeddedPlugin.Util.getString("SessionServiceImpl.invalid_session",
sessionID)); //$NON-NLS-1$
}
+ if (info.getVDBName() != null) {
+ try {
+ dqp.terminateSession(info.getSessionId());
+ } catch (Exception e) {
+ LogManager.logWarning(LogConstants.CTX_SESSION,e,"Exception
terminitating session"); //$NON-NLS-1$
+ }
+ }
// try to log out of the context.
try {
@@ -128,7 +138,7 @@
Properties productInfo = new Properties();
LoginContext loginContext = null;
- String securityDomain = "none";
+ String securityDomain = "none"; //$NON-NLS-1$
Object securityContext = null;
List<String> domains = this.securityDomains;
if (adminConnection) {
@@ -138,11 +148,11 @@
if (!domains.isEmpty()) {
// Authenticate user...
// if not authenticated, this method throws exception
- MembershipServiceImpl membership = authenticate(userName, credentials,
applicationName, domains);
+ TeiidLoginContext membership = authenticate(userName, credentials,
applicationName, domains);
loginContext = membership.getLoginContext();
userName = membership.getUserName();
securityDomain = membership.getSecurityDomain();
- securityContext = membership.getSecurityContext();
+ securityContext = membership.getSecurityContext(securityHelper);
}
// Validate VDB and version if logging on to server product...
@@ -197,9 +207,9 @@
return newSession;
}
- protected MembershipServiceImpl authenticate(String userName, Credentials credentials,
String applicationName, List<String> domains)
+ protected TeiidLoginContext authenticate(String userName, Credentials credentials,
String applicationName, List<String> domains)
throws LoginException {
- MembershipServiceImpl membership = new MembershipServiceImpl();
+ TeiidLoginContext membership = new TeiidLoginContext();
membership.authenticateUser(userName, credentials, applicationName, domains);
return membership;
}
@@ -281,10 +291,8 @@
if (domainNameOrder != null && domainNameOrder.trim().length()>0) {
LogManager.logDetail(LogConstants.CTX_MEMBERSHIP, "Security Enabled:
true"); //$NON-NLS-1$
- List domainNames = StringUtil.split(domainNameOrder, ",");
//$NON-NLS-1$
- Iterator domainNameItr = domainNames.iterator();
- while ( domainNameItr.hasNext() ) {
- String domainName = ((String) domainNameItr.next()).trim();
+ String[] domainNames = domainNameOrder.split(","); //$NON-NLS-1$
+ for (String domainName : domainNames) {
this.securityDomains.addLast(domainName);
}
}
@@ -320,4 +328,12 @@
public void setVDBRepository(VDBRepository repo) {
this.vdbRepository = repo;
}
+
+ public void setSecurityHelper(SecurityHelper securityHelper) {
+ this.securityHelper = securityHelper;
+ }
+
+ public void setDqp(DQPCore dqp) {
+ this.dqp = dqp;
+ }
}
Copied: branches/JCA/runtime/src/main/java/org/teiid/services/TeiidLoginContext.java (from
rev 1854,
branches/JCA/runtime/src/main/java/org/teiid/services/MembershipServiceImpl.java)
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/services/TeiidLoginContext.java
(rev 0)
+++
branches/JCA/runtime/src/main/java/org/teiid/services/TeiidLoginContext.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -0,0 +1,220 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.services;
+
+import java.io.IOException;
+import java.security.Principal;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+import org.teiid.SecurityHelper;
+
+import com.metamatrix.common.log.LogManager;
+import com.metamatrix.common.util.LogConstants;
+import com.metamatrix.dqp.embedded.DQPEmbeddedPlugin;
+import com.metamatrix.platform.security.api.Credentials;
+
+/**
+ * This class serves as the primary implementation of the
+ * Membership Service. Based on the security domains specified this class delegates the
responsibility of
+ * authenticating user to those security domains in the order they are defined.
+ */
+public class TeiidLoginContext {
+ public static final String AT = "@"; //$NON-NLS-1$
+ private LoginContext loginContext;
+ private String userName;
+ private String securitydomain;
+ private Object credentials;
+
+ public void authenticateUser(String username, Credentials credential, String
applicationName, List<String> domains) throws LoginException {
+
+ LogManager.logTrace(LogConstants.CTX_MEMBERSHIP, new Object[]
{"authenticateUser", username, applicationName}); //$NON-NLS-1$
+
+ final String baseUsername = getBaseUsername(username);
+ final char[] password = credential.getCredentialsAsCharArray();
+
+ // If username specifies a domain (user@domain) only that domain is authenticated
against.
+ // If username specifies no domain, then all domains are tried in order.
+ for (String domain:getDomainsForUser(domains, username)) {
+
+ try {
+ CallbackHandler handler = new CallbackHandler() {
+ @Override
+ public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
+ for (int i = 0; i < callbacks.length; i++) {
+ if (callbacks[i] instanceof NameCallback) {
+ NameCallback nc = (NameCallback)callbacks[i];
+ nc.setName(baseUsername);
+ } else if (callbacks[i] instanceof PasswordCallback) {
+ PasswordCallback pc = (PasswordCallback)callbacks[i];
+ pc.setPassword(password);
+ credentials = password;
+ } else {
+ throw new UnsupportedCallbackException(callbacks[i], "Unrecognized
Callback"); //$NON-NLS-1$
+ }
+ }
+ }
+ };
+
+ // this is the configured login for teiid
+ this.loginContext = createLoginContext(domain,handler);
+ this.loginContext.login();
+ this.userName = baseUsername+AT+domain;
+ this.securitydomain = domain;
+
+ return;
+ } catch (LoginException e) {
+ LogManager.logDetail(LogConstants.CTX_MEMBERSHIP,e.getMessage());
+ }
+ }
+ throw new
LoginException(DQPEmbeddedPlugin.Util.getString("SessionServiceImpl.The_username_0_and/or_password_are_incorrect",
username )); //$NON-NLS-1$
+ }
+
+ protected LoginContext createLoginContext(String domain, CallbackHandler handler)
throws LoginException {
+ return new LoginContext(domain, handler);
+ }
+
+ public LoginContext getLoginContext() {
+ return this.loginContext;
+ }
+
+ public String getUserName() {
+ return this.userName;
+ }
+
+ public String getSecurityDomain() {
+ return this.securitydomain;
+ }
+
+ public Object getSecurityContext(SecurityHelper helper) {
+ Object sc = null;
+ if (this.loginContext != null) {
+ sc = helper.getSecurityContext(this.securitydomain);
+ if ( sc == null){
+ Subject subject = this.loginContext.getSubject();
+ Principal principal = null;
+ for(Principal p:subject.getPrincipals()) {
+ if (this.userName.startsWith(p.getName())) {
+ principal = p;
+ break;
+ }
+ }
+ return helper.createSecurityContext(this.securitydomain, principal,
credentials, subject);
+ }
+ }
+ return sc;
+ }
+
+ static String getBaseUsername(String username) {
+ if (username == null) {
+ return username;
+ }
+
+ int index = getQualifierIndex(username);
+
+ String result = username;
+
+ if (index != -1) {
+ result = username.substring(0, index);
+ }
+
+ //strip the escape character from the remaining ats
+ return result.replaceAll("\\\\"+AT, AT); //$NON-NLS-1$
+ }
+
+ static String escapeName(String name) {
+ if (name == null) {
+ return name;
+ }
+
+ return name.replaceAll(AT, "\\\\"+AT); //$NON-NLS-1$
+ }
+
+ static String getDomainName(String username) {
+ if (username == null) {
+ return username;
+ }
+
+ int index = getQualifierIndex(username);
+
+ if (index != -1) {
+ return username.substring(index + 1);
+ }
+
+ return null;
+ }
+
+ static int getQualifierIndex(String username) {
+ int index = username.length();
+ while ((index = username.lastIndexOf(AT, --index)) != -1) {
+ if (index > 0 && username.charAt(index - 1) != '\\') {
+ return index;
+ }
+ }
+
+ return -1;
+ }
+
+ private Collection<String> getDomainsForUser(List<String> domains, String
username) {
+ // If username is null, return all domains
+ if (username == null) {
+ return domains;
+ }
+
+ String domain = getDomainName(username);
+
+ if (domain == null) {
+ return domains;
+ }
+
+ // ------------------------------------------
+ // Handle usernames having @ sign
+ // ------------------------------------------
+ String domainHolder = null;
+ for (String d:domains) {
+ if(d.equalsIgnoreCase(domain)) {
+ domainHolder = d;
+ break;
+ }
+ }
+
+ if (domainHolder == null) {
+ return Collections.emptyList();
+ }
+
+ LinkedList<String> result = new LinkedList<String>();
+ result.add(domainHolder);
+ return result;
+ }
+}
Property changes on:
branches/JCA/runtime/src/main/java/org/teiid/services/TeiidLoginContext.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added:
branches/JCA/runtime/src/main/java/org/teiid/transport/ClientServiceRegistryImpl.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/ClientServiceRegistryImpl.java
(rev 0)
+++
branches/JCA/runtime/src/main/java/org/teiid/transport/ClientServiceRegistryImpl.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.transport;
+
+import java.util.HashMap;
+
+import com.metamatrix.api.exception.ComponentNotFoundException;
+import com.metamatrix.core.util.ReflectionHelper;
+import com.metamatrix.dqp.embedded.DQPEmbeddedPlugin;
+
+public class ClientServiceRegistryImpl {
+
+ public static class ClientService {
+ private Object instance;
+ private String loggingContext;
+ private ReflectionHelper reflectionHelper;
+
+ public ClientService(Object instance, String loggingContext,
+ ReflectionHelper reflectionHelper) {
+ this.instance = instance;
+ this.loggingContext = loggingContext;
+ this.reflectionHelper = reflectionHelper;
+ }
+
+ public Object getInstance() {
+ return instance;
+ }
+ public String getLoggingContext() {
+ return loggingContext;
+ }
+ public ReflectionHelper getReflectionHelper() {
+ return reflectionHelper;
+ }
+ }
+
+ private HashMap<String, ClientService> clientServices = new HashMap<String,
ClientService>();
+
+ public <T> T getClientService(Class<T> iface) throws
ComponentNotFoundException {
+ ClientService cs = getClientService(iface.getName());
+ return iface.cast(cs.getInstance());
+ }
+
+ public ClientService getClientService(String iface) throws ComponentNotFoundException {
+ ClientService cs = clientServices.get(iface);
+ if (cs == null) {
+ throw new
ComponentNotFoundException(DQPEmbeddedPlugin.Util.getString("ServerWorkItem.Component_Not_Found",
iface)); //$NON-NLS-1$
+ }
+ return cs;
+ }
+
+ public <T> void registerClientService(Class<T> iface, T instance, String
loggingContext) {
+ this.clientServices.put(iface.getName(), new ClientService(instance, loggingContext,
new ReflectionHelper(iface)));
+ }
+
+}
Property changes on:
branches/JCA/runtime/src/main/java/org/teiid/transport/ClientServiceRegistryImpl.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/LogonImpl.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/LogonImpl.java 2010-02-22
18:48:13 UTC (rev 1866)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/LogonImpl.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -37,6 +37,7 @@
import com.metamatrix.api.exception.security.LogonException;
import com.metamatrix.api.exception.security.SessionServiceException;
import com.metamatrix.common.api.MMURL;
+import com.metamatrix.common.comm.api.ServerConnection;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.util.LogConstants;
import com.metamatrix.core.CoreConstants;
@@ -70,13 +71,13 @@
credential = new Credentials(password.toCharArray());
}
- boolean adminConnection =
Boolean.parseBoolean(connProps.getProperty(MMURL.CONNECTION.ADMIN, "false"));
+ boolean adminConnection =
Boolean.parseBoolean(connProps.getProperty(MMURL.CONNECTION.ADMIN));
try {
- SessionMetadata sessionInfo = service.createSession(user,credential, applicationName,
connProps, adminConnection);;
+ SessionMetadata sessionInfo = service.createSession(user,credential, applicationName,
connProps, adminConnection);
long sessionID = updateDQPContext(sessionInfo, adminConnection);
LogManager.logDetail(LogConstants.CTX_SESSION, new Object[] {"Logon successful
for \"", user, "\" - created SessionID \"", "" +
sessionID, "\"" }); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
//$NON-NLS-4$
- if (connProps.getProperty("localConnection",
"false").equalsIgnoreCase("true")) {
+ if (Boolean.parseBoolean(connProps.getProperty(ServerConnection.LOCAL_CONNECTION))) {
service.setLocalSession(sessionID);
}
return new LogonResult(sessionInfo.getAttachment(SessionToken.class),
sessionInfo.getVDBName(), sessionInfo.getVDBVersion(), clusterName);
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java 2010-02-22
18:48:13 UTC (rev 1866)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -34,21 +34,18 @@
import org.teiid.adminapi.AdminProcessingException;
import org.teiid.dqp.internal.process.DQPWorkContext;
+import org.teiid.transport.ClientServiceRegistryImpl.ClientService;
-import com.metamatrix.api.exception.ComponentNotFoundException;
import com.metamatrix.api.exception.ExceptionHolder;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.comm.api.Message;
-import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.platform.socket.client.ServiceInvocationStruct;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.util.LogConstants;
import com.metamatrix.common.util.crypto.CryptoException;
import com.metamatrix.core.MetaMatrixRuntimeException;
-import com.metamatrix.core.util.ReflectionHelper;
import com.metamatrix.dqp.client.ResultsFuture;
import com.metamatrix.dqp.embedded.DQPEmbeddedPlugin;
-import com.metamatrix.platform.security.api.ILogon;
import com.metamatrix.platform.security.api.SessionToken;
public class ServerWorkItem {
@@ -56,43 +53,36 @@
private final ClientInstance socketClientInstance;
private final Serializable messageKey;
private final Message message;
- private ServerConnectionFactory scf;
+ private final ClientServiceRegistryImpl csr;
- public ServerWorkItem(ClientInstance socketClientInstance, Serializable messageKey,
Message message, ServerConnectionFactory server) {
+ public ServerWorkItem(ClientInstance socketClientInstance, Serializable messageKey,
Message message, ClientServiceRegistryImpl server) {
this.socketClientInstance = socketClientInstance;
this.messageKey = messageKey;
this.message = message;
- this.scf = server;
+ this.csr = server;
}
/**
- * main entry point for remote method calls. encryption/decryption is
- * handled here so that it won't be done by the io thread
+ * main entry point for remote method calls.
*/
public void process() {
Message result = null;
+ String loggingContext = null;
DQPWorkContext.setWorkContext(this.socketClientInstance.getWorkContext());
final boolean encrypt = message.getContents() instanceof SealedObject;
try {
message.setContents(this.socketClientInstance.getCryptor().unsealObject(message.getContents()));
-
if (!(message.getContents() instanceof ServiceInvocationStruct)) {
throw new AssertionError("unknown message contents"); //$NON-NLS-1$
}
final ServiceInvocationStruct serviceStruct =
(ServiceInvocationStruct)message.getContents();
- final Class type = Class.forName(serviceStruct.targetClass);
- Object instance = this.scf.getService(type);
- if (instance == null) {
- throw new
ComponentNotFoundException(DQPEmbeddedPlugin.Util.getString("ServerWorkItem.Component_Not_Found",
serviceStruct.targetClass)); //$NON-NLS-1$
- }
- if (!(instance instanceof ILogon)) {
- SessionToken.setSession(this.socketClientInstance.getWorkContext().getSessionToken());
- }
- ReflectionHelper helper = new ReflectionHelper(instance.getClass());
- Method m = helper.findBestMethodOnTarget(serviceStruct.methodName,
serviceStruct.args);
+ final ClientService clientService =
this.csr.getClientService(serviceStruct.targetClass);
+ loggingContext = clientService.getLoggingContext();
+ SessionToken.setSession(this.socketClientInstance.getWorkContext().getSessionToken());
+ Method m =
clientService.getReflectionHelper().findBestMethodOnTarget(serviceStruct.methodName,
serviceStruct.args);
Object methodResult;
try {
- methodResult = m.invoke(instance, serviceStruct.args);
+ methodResult = m.invoke(clientService.getInstance(), serviceStruct.args);
} catch (InvocationTargetException e) {
throw e.getCause();
}
@@ -106,9 +96,9 @@
try {
asynchResult.setContents(completedFuture.get());
} catch (InterruptedException e) {
- asynchResult.setContents(processException(e,
scf.getLoggingContextForService(type)));
+ asynchResult.setContents(processException(e,
clientService.getLoggingContext()));
} catch (ExecutionException e) {
- asynchResult.setContents(processException(e.getCause(),
scf.getLoggingContextForService(type)));
+ asynchResult.setContents(processException(e.getCause(),
clientService.getLoggingContext()));
}
sendResult(asynchResult, encrypt);
}
@@ -121,10 +111,11 @@
}
} catch (Throwable t) {
Message holder = new Message();
- holder.setContents(processException(t, null));
+ holder.setContents(processException(t, loggingContext));
result = holder;
} finally {
DQPWorkContext.releaseWorkContext();
+ SessionToken.setSession(null);
}
if (result != null) {
Modified:
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java
===================================================================
---
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -29,7 +29,6 @@
import org.teiid.dqp.internal.process.DQPWorkContext;
import com.metamatrix.common.comm.api.Message;
-import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.platform.CommPlatformPlugin;
import com.metamatrix.common.comm.platform.socket.Handshake;
@@ -54,14 +53,14 @@
private final ObjectChannel objectSocket;
private Cryptor cryptor;
- private ServerConnectionFactory server;
+ private ClientServiceRegistryImpl csr;
private boolean usingEncryption;
private DhKeyGenerator keyGen;
private DQPWorkContext workContext = new DQPWorkContext();
- public SocketClientInstance(ObjectChannel objectSocket, ServerConnectionFactory
server, boolean isClientEncryptionEnabled) {
+ public SocketClientInstance(ObjectChannel objectSocket, ClientServiceRegistryImpl
csr, boolean isClientEncryptionEnabled) {
this.objectSocket = objectSocket;
- this.server = server;
+ this.csr = csr;
this.usingEncryption = isClientEncryptionEnabled;
SocketAddress address = this.objectSocket.getRemoteAddress();
if (address instanceof InetSocketAddress) {
@@ -139,7 +138,7 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_SERVER, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_SERVER, "processing message:" +
packet); //$NON-NLS-1$
}
- ServerWorkItem work = new ServerWorkItem(this, packet.getMessageKey(), packet,
this.server);
+ ServerWorkItem work = new ServerWorkItem(this, packet.getMessageKey(), packet,
this.csr);
work.process();
}
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/SocketListener.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/SocketListener.java 2010-02-22
18:48:13 UTC (rev 1866)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/SocketListener.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -23,67 +23,57 @@
package org.teiid.transport;
import java.net.InetSocketAddress;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
import javax.net.ssl.SSLEngine;
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkException;
-import javax.resource.spi.work.WorkManager;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
import org.teiid.transport.ChannelListener.ChannelListenerFactory;
-import com.metamatrix.common.CommonPlugin;
-import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.platform.socket.ObjectChannel;
import com.metamatrix.common.log.LogManager;
-import com.metamatrix.common.queue.WorkerPool;
-import com.metamatrix.common.queue.WorkerPoolFactory;
import com.metamatrix.common.util.ApplicationInfo;
import com.metamatrix.common.util.LogConstants;
import com.metamatrix.core.log.MessageLevel;
+import com.metamatrix.core.util.NamedThreadFactory;
/**
* Server-side class to listen for new connection requests and create a
SocketClientConnection for each connection request.
*/
public class SocketListener implements ChannelListenerFactory {
- private static final String TEIID_RUNTIME = "java:teiid/runtime-engine";
private SSLAwareChannelHandler channelHandler;
private Channel serverChanel;
private boolean isClientEncryptionEnabled;
- private WorkerPool workerPool;
- private Executor executor;
+ private ExecutorService nettyPool;
+ private ClientServiceRegistryImpl csr;
/**
*
* @param port
- * @param bindaddress
- * @param server
* @param inputBufferSize
* @param outputBufferSize
- * @param workerPool
* @param engine null if SSL is disabled
+ * @param bindaddress
+ * @param server
*/
public SocketListener(int port, String bindAddress, int inputBufferSize,
- int outputBufferSize, int maxWorkers, SSLEngine engine, boolean
isClientEncryptionEnabled, WorkManager workManager) {
+ int outputBufferSize, int maxWorkers, SSLEngine engine, boolean
isClientEncryptionEnabled, ClientServiceRegistryImpl csr) {
this.isClientEncryptionEnabled = isClientEncryptionEnabled;
+ this.csr = csr;
if (port < 0 || port > 0xFFFF) {
throw new IllegalArgumentException("port out of range:" + port);
//$NON-NLS-1$
}
- this.executor = new WorkManagerExecutor(workManager);
- this.workerPool = WorkerPoolFactory.newWorkerPool("SocketWorker",
maxWorkers, this.executor); //$NON-NLS-1$
+ this.nettyPool = Executors.newCachedThreadPool(new
NamedThreadFactory("NIO")); //$NON-NLS-1$
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_SERVER,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_SERVER, "server = " +
bindAddress + "binding to port:" + port); //$NON-NLS-1$ //$NON-NLS-2$
}
- ChannelFactory factory = new NioServerSocketChannelFactory(this.executor,
this.executor, Math.min(Runtime.getRuntime().availableProcessors(), maxWorkers));
+ ChannelFactory factory = new NioServerSocketChannelFactory(this.nettyPool,
this.nettyPool, Math.min(Runtime.getRuntime().availableProcessors(), maxWorkers));
ServerBootstrap bootstrap = new ServerBootstrap(factory);
this.channelHandler = new SSLAwareChannelHandler(this, engine,
Thread.currentThread().getContextClassLoader());
@@ -99,10 +89,6 @@
this.serverChanel = bootstrap.bind(new InetSocketAddress(bindAddress, port));
}
- public WorkerPoolStatisticsMetadata getProcessPoolStats() {
- return this.workerPool.getStats();
- }
-
public int getPort() {
return ((InetSocketAddress)this.serverChanel.getLocalAddress()).getPort();
}
@@ -113,7 +99,7 @@
public void stop() {
this.serverChanel.close();
- this.workerPool.shutdownNow();
+ this.nettyPool.shutdownNow();
}
public SocketListenerStats getStats() {
@@ -126,41 +112,7 @@
}
public ChannelListener createChannelListener(ObjectChannel channel) {
- return new SocketClientInstance(channel, getServer(), this.isClientEncryptionEnabled);
+ return new SocketClientInstance(channel, csr, this.isClientEncryptionEnabled);
}
-
- static class WorkManagerExecutor implements Executor{
- WorkManager workManager;
-
- public WorkManagerExecutor(WorkManager workManager){
- this.workManager = workManager;
- }
-
- @Override
- public void execute(final Runnable command) {
- try {
- workManager.scheduleWork(new Work() {
- @Override
- public void run() {
- command.run();
- }
-
- @Override
- public void release() {
- }
- });
- } catch (WorkException e) {
- LogManager.logError(LogConstants.CTX_POOLING, e,
CommonPlugin.Util.getString("WorkerPool.uncaughtException")); //$NON-NLS-1$
- }
- }
- }
- protected ServerConnectionFactory getServer() {
- try {
- InitialContext ic = new InitialContext();
- return (ServerConnectionFactory)ic.lookup(TEIID_RUNTIME);
- } catch (NamingException e) {
- return null;
- }
- }
}
\ No newline at end of file
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/SocketTransport.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/SocketTransport.java 2010-02-22
18:48:13 UTC (rev 1866)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/SocketTransport.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -25,10 +25,6 @@
import java.net.UnknownHostException;
import java.security.GeneralSecurityException;
-import javax.resource.spi.work.WorkManager;
-
-import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
-
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.util.LogConstants;
import com.metamatrix.core.MetaMatrixRuntimeException;
@@ -41,11 +37,12 @@
public class SocketTransport {
private SocketListener listener;
- private WorkManager workManager;
private SocketConfiguration config;
+ private ClientServiceRegistryImpl csr;
- public SocketTransport(SocketConfiguration config) {
+ public SocketTransport(SocketConfiguration config, ClientServiceRegistryImpl csr) {
this.config = config;
+ this.csr = csr;
}
public void start() {
@@ -54,7 +51,8 @@
try {
if (this.config.isEnabled()) {
LogManager.logDetail(LogConstants.CTX_SERVER,
DQPEmbeddedPlugin.Util.getString("SocketTransport.1", new Object[] {bindAddress,
String.valueOf(this.config.getPortNumber())})); //$NON-NLS-1$
- this.listener = new SocketListener(this.config.getPortNumber(), bindAddress,
this.config.getInputBufferSize(), this.config.getOutputBufferSize(),
this.config.getMaxSocketThreads(), this.config.getSSLConfiguration().getServerSSLEngine(),
this.config.getSSLConfiguration().isClientEncryptionEnabled(), this.workManager);
+ this.listener = new SocketListener(this.config.getPortNumber(), bindAddress,
this.config.getInputBufferSize(), this.config.getOutputBufferSize(),
this.config.getMaxSocketThreads(), this.config.getSSLConfiguration().getServerSSLEngine(),
this.config.getSSLConfiguration().isClientEncryptionEnabled(), csr);
+
}
else {
LogManager.logDetail(LogConstants.CTX_SERVER,
DQPEmbeddedPlugin.Util.getString("SocketTransport.3")); //$NON-NLS-1$
@@ -73,10 +71,6 @@
this.listener.stop();
}
- public WorkerPoolStatisticsMetadata getProcessPoolStats() {
- return listener.getProcessPoolStats();
- }
-
public int getPort() {
return this.listener.getPort();
}
@@ -85,7 +79,4 @@
return this.listener.getStats();
}
- public void setWorkManager(WorkManager mgr) {
- this.workManager = mgr;
- }
}
Modified:
branches/JCA/runtime/src/test/java/org/teiid/services/TestMembershipServiceImpl.java
===================================================================
---
branches/JCA/runtime/src/test/java/org/teiid/services/TestMembershipServiceImpl.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/runtime/src/test/java/org/teiid/services/TestMembershipServiceImpl.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -32,7 +32,7 @@
import junit.framework.TestCase;
import org.mockito.Mockito;
-import org.teiid.services.MembershipServiceImpl;
+import org.teiid.services.TeiidLoginContext;
import com.metamatrix.platform.security.api.Credentials;
@@ -41,17 +41,17 @@
public void testBaseUsername() throws Exception {
- assertEquals("foo(a)bar.com",
MembershipServiceImpl.getBaseUsername("foo\\@bar.com(a)foo")); //$NON-NLS-1$
//$NON-NLS-2$
+ assertEquals("foo(a)bar.com",
TeiidLoginContext.getBaseUsername("foo\\@bar.com(a)foo")); //$NON-NLS-1$
//$NON-NLS-2$
- assertEquals("foo",
MembershipServiceImpl.getDomainName("me\\@bar.com(a)foo")); //$NON-NLS-1$
//$NON-NLS-2$
+ assertEquals("foo",
TeiidLoginContext.getDomainName("me\\@bar.com(a)foo")); //$NON-NLS-1$
//$NON-NLS-2$
- assertEquals(null, MembershipServiceImpl.getDomainName("@"));
//$NON-NLS-1$
+ assertEquals(null, TeiidLoginContext.getDomainName("@"));
//$NON-NLS-1$
- assertEquals("@",
MembershipServiceImpl.getBaseUsername("@")); //$NON-NLS-1$ //$NON-NLS-2$
+ assertEquals("@", TeiidLoginContext.getBaseUsername("@"));
//$NON-NLS-1$ //$NON-NLS-2$
}
- private MembershipServiceImpl createMembershipService() throws Exception {
- MembershipServiceImpl membershipService = new MembershipServiceImpl() {
+ private TeiidLoginContext createMembershipService() throws Exception {
+ TeiidLoginContext membershipService = new TeiidLoginContext() {
public LoginContext createLoginContext(String domain, CallbackHandler handler) throws
LoginException {
LoginContext context = Mockito.mock(LoginContext.class);
return context;
@@ -62,7 +62,7 @@
public void testAuthenticate() throws Exception {
- MembershipServiceImpl ms = createMembershipService();
+ TeiidLoginContext ms = createMembershipService();
List<String> domains = new ArrayList<String>();
domains.add("testFile");
ms.authenticateUser("user1", new
Credentials("pass1".toCharArray()), null, domains); //$NON-NLS-1$ //$NON-NLS-2$
Modified:
branches/JCA/runtime/src/test/java/org/teiid/services/TestSessionServiceImpl.java
===================================================================
---
branches/JCA/runtime/src/test/java/org/teiid/services/TestSessionServiceImpl.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/runtime/src/test/java/org/teiid/services/TestSessionServiceImpl.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -13,7 +13,7 @@
import org.junit.Test;
import org.mockito.Mockito;
import org.teiid.adminapi.impl.SessionMetadata;
-import org.teiid.services.MembershipServiceImpl;
+import org.teiid.services.TeiidLoginContext;
import org.teiid.services.SessionServiceImpl;
import com.metamatrix.admin.api.exception.security.InvalidSessionException;
@@ -22,7 +22,7 @@
public class TestSessionServiceImpl {
public void validateSession(boolean securityEnabled) throws Exception {
- final MembershipServiceImpl impl = Mockito.mock(MembershipServiceImpl.class);
+ final TeiidLoginContext impl = Mockito.mock(TeiidLoginContext.class);
Mockito.stub(impl.getUserName()).toReturn("steve@somedomain");
Mockito.stub(impl.getLoginContext()).toReturn(Mockito.mock(LoginContext.class));
final ArrayList<String> domains = new ArrayList<String>();
@@ -30,7 +30,7 @@
SessionServiceImpl ssi = new SessionServiceImpl() {
@Override
- protected MembershipServiceImpl authenticate(String userName, Credentials credentials,
String applicationName, List<String> domains)
+ protected TeiidLoginContext authenticate(String userName, Credentials credentials,
String applicationName, List<String> domains)
throws LoginException {
impl.authenticateUser(userName, credentials, applicationName, domains);
return impl;
Modified: branches/JCA/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
===================================================================
--- branches/JCA/runtime/src/test/java/org/teiid/transport/TestCommSockets.java 2010-02-22
18:48:13 UTC (rev 1866)
+++ branches/JCA/runtime/src/test/java/org/teiid/transport/TestCommSockets.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -22,10 +22,8 @@
*/
package org.teiid.transport;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
import java.net.InetSocketAddress;
import java.util.Properties;
@@ -36,13 +34,10 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import org.teiid.dqp.internal.datamgr.impl.FakeWorkManager;
import com.metamatrix.api.exception.ComponentNotFoundException;
import com.metamatrix.api.exception.security.LogonException;
import com.metamatrix.common.api.MMURL;
-import com.metamatrix.common.comm.ClientServiceRegistry;
-import com.metamatrix.common.comm.ClientServiceRegistryImpl;
import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ConnectionException;
@@ -72,10 +67,7 @@
}
@Test public void testFailedConnect() throws Exception {
- ClientServiceRegistry csr = new ClientServiceRegistryImpl();
- SessionService sessionService = mock(SessionService.class);
- csr.registerClientService(ILogon.class, new LogonImpl(sessionService,
"fakeCluster"), "foo"); //$NON-NLS-1$ //$NON-NLS-2$
- listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),1024,
1024, 1, null, true, new FakeWorkManager());
+ listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),1024,
1024, 1, null, true, null);
try {
Properties p = new Properties();
@@ -142,19 +134,16 @@
SSLEngine serverSSL, boolean isClientEncryptionEnabled, Properties socketConfig)
throws CommunicationException,
ConnectionException {
if (listener == null) {
- listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
1024, 1024, 1, serverSSL, isClientEncryptionEnabled, new FakeWorkManager()) {
- protected ServerConnectionFactory getServer() {
- ServerConnectionFactory server = Mockito.mock(ServerConnectionFactory.class);
- Mockito.stub(server.getService(ILogon.class)).toReturn(new
LogonImpl(mock(SessionService.class), "fakeCluster") { //$NON-NLS-1$
- @Override
- public LogonResult logon(Properties connProps)
- throws LogonException, ComponentNotFoundException {
- return new LogonResult();
- }
- });
- return server;
+ ClientServiceRegistryImpl server = new ClientServiceRegistryImpl();
+ server.registerClientService(ILogon.class, new LogonImpl(mock(SessionService.class),
"fakeCluster") { //$NON-NLS-1$
+ @Override
+ public LogonResult logon(Properties connProps)
+ throws LogonException, ComponentNotFoundException {
+ return new LogonResult();
}
- };
+
+ }, null);
+ listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
1024, 1024, 1, serverSSL, isClientEncryptionEnabled, server);
SocketListenerStats stats = listener.getStats();
assertEquals(0, stats.maxSockets);
Modified: branches/JCA/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
===================================================================
---
branches/JCA/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-02-22
18:48:13 UTC (rev 1866)
+++
branches/JCA/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-02-23
16:07:13 UTC (rev 1867)
@@ -22,10 +22,7 @@
package org.teiid.transport;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import java.io.IOException;
import java.io.Serializable;
@@ -34,13 +31,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import javax.resource.spi.ConnectionManager;
-
import org.junit.Test;
-import org.mockito.Mockito;
-import org.teiid.TeiidConnectionFactory;
-import org.teiid.TeiidManagedConnectionFactory;
-import org.teiid.TeiidResourceAdapter;
import org.teiid.dqp.internal.process.DQPWorkContext;
import com.metamatrix.admin.api.exception.security.InvalidSessionException;
@@ -51,7 +42,6 @@
import com.metamatrix.common.api.MMURL;
import com.metamatrix.common.comm.api.Message;
import com.metamatrix.common.comm.api.ResultsReceiver;
-import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ConnectionException;
import com.metamatrix.common.comm.platform.socket.client.SocketServerConnection;
@@ -61,7 +51,6 @@
import com.metamatrix.common.comm.platform.socket.client.UrlServerDiscovery;
import com.metamatrix.common.util.crypto.Cryptor;
import com.metamatrix.common.util.crypto.NullCryptor;
-import com.metamatrix.common.xa.XATransactionException;
import com.metamatrix.dqp.client.ClientSideDQP;
import com.metamatrix.dqp.client.ResultsFuture;
import com.metamatrix.platform.security.api.ILogon;
@@ -94,10 +83,10 @@
private static class FakeClientServerInstance extends SocketServerInstanceImpl
implements ClientInstance {
- ServerConnectionFactory server;
+ ClientServiceRegistryImpl server;
private ResultsReceiver<Object> listener;
- public FakeClientServerInstance(ServerConnectionFactory server) {
+ public FakeClientServerInstance(ClientServiceRegistryImpl server) {
super();
this.server = server;
}
@@ -153,7 +142,7 @@
@Test
public void testMethodInvocation() throws Exception {
- TeiidConnectionFactory csr = new TeiidConnectionFactory(new TeiidResourceAdapter(), new
TeiidManagedConnectionFactory(), Mockito.mock(ConnectionManager.class));
+ ClientServiceRegistryImpl csr = new ClientServiceRegistryImpl();
csr.registerClientService(ILogon.class, new ILogon() {
public ResultsFuture<?> logoff()
@@ -205,10 +194,11 @@
}
ClientSideDQP dqp = connection.getService(ClientSideDQP.class);
try {
- dqp.begin();
+ ResultsFuture<?> future = dqp.begin();
+ future.get();
fail("exception expected"); //$NON-NLS-1$
- } catch (XATransactionException e) {
- assertEquals("Component not found: com.metamatrix.dqp.client.ClientSideDQP",
e.getMessage()); //$NON-NLS-1$
+ } catch (Exception e) {
+ assertTrue(e.getMessage().endsWith("Component not found:
com.metamatrix.dqp.client.ClientSideDQP")); //$NON-NLS-1$
}
}