Author: shawkins
Date: 2009-07-24 21:49:03 -0400 (Fri, 24 Jul 2009)
New Revision: 1186
Modified:
trunk/client/src/main/java/com/metamatrix/dqp/embedded/DQPEmbeddedProperties.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWrapper.java
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/ConnectionPool.java
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedDataService.java
trunk/runtime/src/test/java/com/metamatrix/dqp/embedded/services/TestEmbeddedConfigurationService.java
Log:
TEIID-607 removing the connection test from the connector startup - the case that removed
it wasn't committed prior to branching. also updated the status logic so that the
connector itself is starting the polling task.
Modified:
trunk/client/src/main/java/com/metamatrix/dqp/embedded/DQPEmbeddedProperties.java
===================================================================
---
trunk/client/src/main/java/com/metamatrix/dqp/embedded/DQPEmbeddedProperties.java 2009-07-24
15:47:30 UTC (rev 1185)
+++
trunk/client/src/main/java/com/metamatrix/dqp/embedded/DQPEmbeddedProperties.java 2009-07-25
01:49:03 UTC (rev 1186)
@@ -32,7 +32,6 @@
public static final String VDB_DEFINITION = "vdb.definition";
//$NON-NLS-1$
public static final String USER_DEFINED_FUNCTIONS =
"dqp.userDefinedFunctionsFile"; //$NON-NLS-1$
public static final String COMMON_EXTENSION_CLASPATH =
"dqp.extension.CommonClasspath"; //$NON-NLS-1$
- public static final String DQP_KEYSTORE = "dqp.keystore"; //$NON-NLS-1$
public static final String DQP_WORKDIR = "dqp.workdir"; //$NON-NLS-1$
public static final String DQP_DEPLOYDIR = "dqp.deploydir"; //$NON-NLS-1$
public static final String DQP_LIBDIR = "dqp.lib"; //$NON-NLS-1$
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2009-07-24
15:47:30 UTC (rev 1185)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2009-07-25
01:49:03 UTC (rev 1186)
@@ -102,7 +102,6 @@
*/
public class ConnectorManager implements ApplicationService {
- private static final int TIME_BETWEEN_STATUS_CHECKS = 5000;
public static final int DEFAULT_MAX_THREADS = 20;
private static final String DEFAULT_MAX_RESULTSET_CACHE_SIZE = "20";
//$NON-NLS-1$
private static final String DEFAULT_MAX_RESULTSET_CACHE_AGE = "3600000";
//$NON-NLS-1$
@@ -119,14 +118,14 @@
private boolean synchWorkers;
private boolean isXa;
private boolean isImmutable;
+
+ private volatile ConnectorStatus state = ConnectorStatus.NOT_INITIALIZED;
//services acquired in start
private MetadataService metadataService;
private TransactionService transactionService;
private BufferService bufferService;
- private volatile Boolean started;
-
private ClassLoaderManager clManager;
// known requests
@@ -134,9 +133,6 @@
private Properties props;
private ClassLoader classloader;
- private ConnectorStatus previousStatus;
- private long lastStatusCheck = -1;
-
public void initialize(Properties props) {
this.props = props;
@@ -297,35 +293,29 @@
return requestStates.size();
}
- /**
- * @see com.metamatrix.dqp.internal.datamgr.ConnectorManager#isAlive()
- */
public ConnectorStatus getStatus() {
- if (this.connector == null) {
- return ConnectorStatus.NOT_INITIALIZED;
+ ConnectorWrapper connectorWrapper = this.connector;
+ ConnectorStatus result = this.state;
+ if (result != ConnectorStatus.OPEN) {
+ return result;
}
-
- // we want to avoid repeated calls to status, as they may be expensive to make.
- if (lastStatusCheck == -1 || (System.currentTimeMillis() - lastStatusCheck >=
TIME_BETWEEN_STATUS_CHECKS)) {
- ClassLoader contextloader = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(classloader);
- this.previousStatus = this.connector.getStatus();
- this.lastStatusCheck = System.currentTimeMillis();
- } finally {
- Thread.currentThread().setContextClassLoader(contextloader);
- }
- }
- return this.previousStatus;
+ ClassLoader contextloader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(classloader);
+ return connectorWrapper.getStatus();
+ } finally {
+ Thread.currentThread().setContextClassLoader(contextloader);
+ }
}
/**
* initialize this <code>ConnectorManager</code>.
*/
public synchronized void start(ApplicationEnvironment env) throws
ApplicationLifecycleException {
- if (this.started != null) {
- throw new
ApplicationLifecycleException("ConnectorManager.cannot_restart"); //$NON-NLS-1$
+ if (this.state != ConnectorStatus.NOT_INITIALIZED) {
+ return;
}
+ this.state = ConnectorStatus.INIT_FAILED;
connectorName = props.getProperty(ConnectorPropertyNames.CONNECTOR_BINDING_NAME,
"Unknown_Binding_Name"); //$NON-NLS-1$
String connIDStr = props.getProperty(ConnectorPropertyNames.CONNECTOR_ID);
connectorID = new ConnectorID(connIDStr);
@@ -374,7 +364,7 @@
this.rsCache = createResultSetCache(rsCacheProps);
}
this.workItemFactory = new ConnectorWorkItemFactory(this, this.rsCache, synchWorkers);
- this.started = true;
+ this.state = ConnectorStatus.OPEN;
}
private String buildClasspath(Properties connectorProperties) {
@@ -534,26 +524,14 @@
}
/**
- * Queries the Connector Manager, if it already has been started.
- * @return
- * @since 4.3
- */
- public boolean started() {
- if (this.started != null) {
- return this.started;
- }
- return false;
- }
-
- /**
* Stop this connector.
*/
public void stop() throws ApplicationLifecycleException {
synchronized (this) {
- if (this.started == null || this.started == false) {
+ if (this.state == ConnectorStatus.CLOSED) {
return;
}
- this.started = false;
+ this.state= ConnectorStatus.CLOSED;
}
if (this.connectorWorkerPool != null) {
this.connectorWorkerPool.shutdownNow();
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWrapper.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWrapper.java 2009-07-24
15:47:30 UTC (rev 1185)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWrapper.java 2009-07-25
01:49:03 UTC (rev 1186)
@@ -22,6 +22,8 @@
package org.teiid.dqp.internal.datamgr.impl;
+import java.util.concurrent.TimeUnit;
+
import org.teiid.connector.api.Connection;
import org.teiid.connector.api.Connector;
import org.teiid.connector.api.ConnectorCapabilities;
@@ -34,7 +36,9 @@
import org.teiid.connector.xa.api.TransactionContext;
import org.teiid.connector.xa.api.XAConnection;
import org.teiid.connector.xa.api.XAConnector;
+import org.teiid.dqp.internal.pooling.connector.ConnectionPool;
+import com.metamatrix.common.util.PropertiesUtils;
import com.metamatrix.dqp.service.ConnectorStatus;
/**
@@ -44,8 +48,8 @@
private Connector actualConnector;
private String name;
+ private volatile ConnectorStatus status = ConnectorStatus.UNABLE_TO_CHECK;
-
public ConnectorWrapper(Connector actualConnector){
this.actualConnector = actualConnector;
}
@@ -53,7 +57,20 @@
public void start(ConnectorEnvironment environment) throws ConnectorException {
name = environment.getConnectorName();
actualConnector.start(environment);
+ int interval = PropertiesUtils.getIntProperty(environment.getProperties(),
ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL,
ConnectionPool.DEFAULT_SOURCE_CONNECTION_TEST_INTERVAL);
+ if (interval > 0 && isConnectionTestable()) {
+ environment.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ updateStatus();
+ }
+ }, 0, interval, TimeUnit.SECONDS);
+ }
}
+
+ protected boolean isConnectionTestable() {
+ return supportsSingleIdentity();
+ }
public void stop() {
actualConnector.stop();
@@ -97,13 +114,21 @@
}
public final ConnectorStatus getStatus() {
+ return status;
+ }
+
+ protected void updateStatus() {
+ this.status = testConnection();
+ }
+
+ protected ConnectorStatus testConnection() {
if (supportsSingleIdentity()) {
Connection conn = null;
try {
conn = this.getConnection(null);
return conn.isAlive()?ConnectorStatus.OPEN:ConnectorStatus.DATA_SOURCE_UNAVAILABLE;
} catch (ConnectorException e) {
- return ConnectorStatus.INIT_FAILED;
+ return ConnectorStatus.DATA_SOURCE_UNAVAILABLE;
} finally {
if (conn != null) {
conn.close();
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/ConnectionPool.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/ConnectionPool.java 2009-07-24
15:47:30 UTC (rev 1185)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/ConnectionPool.java 2009-07-25
01:49:03 UTC (rev 1186)
@@ -96,7 +96,7 @@
static final int DEFAULT_WAIT_FOR_SOURCE_TIME = 120000;
static final int DEFAULT_CLEANING_INTERVAL = 60;
static final boolean DEFAULT_ENABLE_SHRINKING = true;
- static final int DEFAULT_SOURCE_CONNECTION_TEST_INTERVAL = 600; //10 minutes
+ public static final int DEFAULT_SOURCE_CONNECTION_TEST_INTERVAL = 600; //10 minutes
private static class ConnectionsForId {
LinkedList<ConnectionWrapper> used = new
LinkedList<ConnectionWrapper>();
@@ -179,7 +179,7 @@
waitForSourceTime = PropertiesUtils.getIntProperty(poolProperties,
WAIT_FOR_SOURCE_TIME, DEFAULT_WAIT_FOR_SOURCE_TIME);
cleaningInterval = PropertiesUtils.getIntProperty(poolProperties,
CLEANING_INTERVAL, DEFAULT_CLEANING_INTERVAL) * 1000;
enableShrinking = PropertiesUtils.getBooleanProperty(poolProperties,
ENABLE_SHRINKING, DEFAULT_ENABLE_SHRINKING);
- testConnectInterval = PropertiesUtils.getIntProperty(poolProperties,
SOURCE_CONNECTION_TEST_INTERVAL, DEFAULT_SOURCE_CONNECTION_TEST_INTERVAL);
+ testConnectInterval = PropertiesUtils.getIntProperty(poolProperties,
SOURCE_CONNECTION_TEST_INTERVAL, DEFAULT_SOURCE_CONNECTION_TEST_INTERVAL) * 1000;
if (enableShrinking && !this.shuttingDownPool) {
env.scheduleAtFixedRate(new Runnable() {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java 2009-07-24
15:47:30 UTC (rev 1185)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java 2009-07-25
01:49:03 UTC (rev 1186)
@@ -43,6 +43,7 @@
import org.teiid.dqp.internal.datamgr.impl.ConnectorWrapper;
import com.metamatrix.common.stats.ConnectionPoolStats;
+import com.metamatrix.dqp.service.ConnectorStatus;
/**
@@ -177,7 +178,7 @@
return conn;
}
- public Collection <ConnectionPoolStats>getConnectionPoolStats() {
+ public Collection<ConnectionPoolStats> getConnectionPoolStats() {
Collection<ConnectionPoolStats> pools = new
ArrayList<ConnectionPoolStats>(2);
setStats(pool, poolStats);
@@ -190,7 +191,21 @@
return pools;
}
+
+ @Override
+ protected boolean isConnectionTestable() {
+ return true;
+ }
+ @Override
+ protected ConnectorStatus testConnection() {
+ if (this.pool.getNumberOfConnectionsInUse() > 0) {
+ return ConnectorStatus.OPEN;
+ }
+ //TODO: call is alive on an unused connection
+ return super.testConnection();
+ }
+
private void setStats(ConnectionPool connpool, ConnectionPoolStats stats) {
stats.setConnectionsWaiting(connpool.getNumberOfConnectinsWaiting());
@@ -198,8 +213,6 @@
stats.setConnectionsDestroyed(connpool.getTotalDestroyedConnectionCount());
stats.setConnectionsInUse(connpool.getNumberOfConnectionsInUse());
stats.setTotalConnections(connpool.getTotalConnectionCount());
-
-
}
}
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java 2009-07-24
15:47:30 UTC (rev 1185)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManagerImpl.java 2009-07-25
01:49:03 UTC (rev 1186)
@@ -35,7 +35,6 @@
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations.Mock;
import org.teiid.connector.api.Connection;
import org.teiid.connector.api.Connector;
import org.teiid.connector.api.ConnectorException;
@@ -44,6 +43,7 @@
import org.teiid.connector.api.ExecutionContext;
import org.teiid.dqp.internal.cache.ResultSetCache;
import org.teiid.dqp.internal.datamgr.impl.TestConnectorWorkItem.QueueResultsReceiver;
+import org.teiid.dqp.internal.pooling.connector.ConnectionPool;
import org.teiid.dqp.internal.pooling.connector.FakeSourceConnectionFactory;
import org.teiid.dqp.internal.process.DQPWorkContext;
@@ -167,6 +167,7 @@
Properties props = new Properties();
props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS,
FakeConnector.class.getName());
props.setProperty(ConnectorPropertyNames.USE_RESULTSET_CACHE,
Boolean.TRUE.toString());
+ props.setProperty(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL,
String.valueOf(-1));
startConnectorManager(cm, props);
ConnectorWrapper wrapper = cm.getConnector();
FakeConnector fc = (FakeConnector)wrapper.getActualConnector();
@@ -225,6 +226,7 @@
final String connectorName = mockConnector.getClass().getName();
props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS, connectorName);
+ startConnectorManager(cm, props);
cm.setConnector(new ConnectorWrapper(mockConnector)); // to make them same
connector
// no identity can be defined
@@ -235,63 +237,51 @@
ConnectorManager cm = new ConnectorManager();
assertEquals(ConnectorStatus.NOT_INITIALIZED, cm.getStatus());
- Properties props = new Properties();
Connector mockConnector = Mockito.mock(Connector.class);
Connection mockConnection = Mockito.mock(Connection.class);
ConnectorIdentity mockIdentity = Mockito.mock(ConnectorIdentity.class);
Mockito.stub(mockConnector.getConnection((ExecutionContext)Mockito.anyObject())).toReturn(mockConnection);
Mockito.stub(mockConnector.createIdentity(null)).toReturn(mockIdentity);
-
- final String connectorName = mockConnector.getClass().getName();
- props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS, connectorName);
- startConnectorManager(cm, props);
- cm.setConnector(new ConnectorWrapper(mockConnector)); // to make them same
connector
-
- // check Open
Mockito.stub(mockConnection.isAlive()).toReturn(true);
- assertEquals(ConnectorStatus.OPEN, cm.getStatus());
+
+ ConnectorWrapper wrapper = new ConnectorWrapper(mockConnector);
+
+ wrapper.updateStatus();
+ assertEquals(ConnectorStatus.OPEN, wrapper.getStatus());
}
- @Test public void testConnectorStatus_dead() throws Exception {
+ @Test public void testConnectorStatus_unavailable() throws Exception {
ConnectorManager cm = new ConnectorManager();
assertEquals(ConnectorStatus.NOT_INITIALIZED, cm.getStatus());
- Properties props = new Properties();
Connector mockConnector = Mockito.mock(Connector.class);
Connection mockConnection = Mockito.mock(Connection.class);
ConnectorIdentity mockIdentity = Mockito.mock(ConnectorIdentity.class);
Mockito.stub(mockConnector.getConnection((ExecutionContext)Mockito.anyObject())).toReturn(mockConnection);
Mockito.stub(mockConnector.createIdentity(null)).toReturn(mockIdentity);
-
- final String connectorName = mockConnector.getClass().getName();
- props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS, connectorName);
- startConnectorManager(cm, props);
- cm.setConnector(new ConnectorWrapper(mockConnector)); // to make them same
connector
-
- // data source not available
Mockito.stub(mockConnection.isAlive()).toReturn(false);
- assertEquals(ConnectorStatus.DATA_SOURCE_UNAVAILABLE, cm.getStatus());
+
+ ConnectorWrapper wrapper = new ConnectorWrapper(mockConnector);
+
+ wrapper.updateStatus();
+ assertEquals(ConnectorStatus.DATA_SOURCE_UNAVAILABLE, wrapper.getStatus());
}
@Test public void testConnectorStatus_exception() throws Exception {
ConnectorManager cm = new ConnectorManager();
assertEquals(ConnectorStatus.NOT_INITIALIZED, cm.getStatus());
- Properties props = new Properties();
Connector mockConnector = Mockito.mock(Connector.class);
ConnectorIdentity mockIdentity = Mockito.mock(ConnectorIdentity.class);
Mockito.stub(mockConnector.getConnection((ExecutionContext)Mockito.anyObject())).toThrow(new
ConnectorException());
Mockito.stub(mockConnector.createIdentity(null)).toReturn(mockIdentity);
- final String connectorName = mockConnector.getClass().getName();
- props.setProperty(ConnectorPropertyNames.CONNECTOR_CLASS, connectorName);
- startConnectorManager(cm, props);
- cm.setConnector(new ConnectorWrapper(mockConnector)); // to make them same
connector
-
- // data source not available
- assertEquals(ConnectorStatus.INIT_FAILED, cm.getStatus());
+ ConnectorWrapper wrapper = new ConnectorWrapper(mockConnector);
+
+ wrapper.updateStatus();
+ assertEquals(ConnectorStatus.DATA_SOURCE_UNAVAILABLE, wrapper.getStatus());
}
}
\ No newline at end of file
Modified:
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedDataService.java
===================================================================
---
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedDataService.java 2009-07-24
15:47:30 UTC (rev 1185)
+++
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedDataService.java 2009-07-25
01:49:03 UTC (rev 1186)
@@ -263,7 +263,7 @@
ConnectorBinding binding = getConnectorBinding(deployedConnectorBindingName);
if (binding != null) {
ConnectorManager mgr = getConnectorManager(binding);
- if (mgr != null && !mgr.started()) {
+ if (mgr != null && mgr.getStatus() ==
ConnectorStatus.NOT_INITIALIZED) {
// Start the manager
mgr.start(env);
@@ -292,17 +292,15 @@
if (binding != null) {
ConnectorManager mgr = getConnectorManager(binding, false);
if (mgr != null ) {
- if (mgr.started()) {
- // Run the stop command no matter what state they are in, since the
Alive status is not
- // always reliable, it is only based on the Connector implementation.
This is fool proof.
- mgr.stop();
-
- // remove from the local configuration. We want to create a new
connector binding each time
- // we start, so that we can initialize with correct properties, in
case they chnaged.
- removeConnectorBinding(binding.getDeployedName());
-
- DQPEmbeddedPlugin.logInfo("DataService.Connector_Stopped",
new Object[] {binding.getDeployedName()}); //$NON-NLS-1$
- }
+ // Run the stop command no matter what state they are in, since the Alive
status is not
+ // always reliable, it is only based on the Connector implementation.
This is fool proof.
+ mgr.stop();
+
+ // remove from the local configuration. We want to create a new connector
binding each time
+ // we start, so that we can initialize with correct properties, in case
they chnaged.
+ removeConnectorBinding(binding.getDeployedName());
+
+ DQPEmbeddedPlugin.logInfo("DataService.Connector_Stopped", new
Object[] {binding.getDeployedName()}); //$NON-NLS-1$
}
}
else {
Modified:
trunk/runtime/src/test/java/com/metamatrix/dqp/embedded/services/TestEmbeddedConfigurationService.java
===================================================================
---
trunk/runtime/src/test/java/com/metamatrix/dqp/embedded/services/TestEmbeddedConfigurationService.java 2009-07-24
15:47:30 UTC (rev 1185)
+++
trunk/runtime/src/test/java/com/metamatrix/dqp/embedded/services/TestEmbeddedConfigurationService.java 2009-07-25
01:49:03 UTC (rev 1186)
@@ -45,8 +45,6 @@
import com.metamatrix.common.config.model.BasicConnectorBinding;
import com.metamatrix.common.config.model.BasicConnectorBindingType;
import com.metamatrix.common.config.model.BasicExtensionModule;
-import com.metamatrix.common.util.crypto.CryptoUtil;
-import com.metamatrix.common.util.crypto.NullCryptor;
import com.metamatrix.common.vdb.api.VDBArchive;
import com.metamatrix.common.vdb.api.VDBDefn;
import com.metamatrix.core.util.FileUtils;