Author: rhauch
Date: 2008-07-25 15:37:40 -0400 (Fri, 25 Jul 2008)
New Revision: 370
Added:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/AbstractRepositorySource.java
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/ManagedRepositoryConnectionFactory.java
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java
Removed:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPool.java
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPoolTest.java
Modified:
trunk/connectors/dna-connector-federation/src/main/java/org/jboss/dna/connector/federation/FederatedRepositorySource.java
trunk/connectors/dna-connector-federation/src/test/java/org/jboss/dna/connector/federation/FederatedRepositorySourceTest.java
trunk/connectors/dna-connector-federation/src/test/java/org/jboss/dna/connector/federation/FederatedRepositoryTest.java
trunk/connectors/dna-connector-inmemory/src/main/java/org/jboss/dna/connector/inmemory/InMemoryRepositorySource.java
trunk/connectors/dna-connector-jbosscache/src/main/java/org/jboss/dna/connector/jbosscache/JBossCacheSource.java
trunk/dna-repository/src/main/java/org/jboss/dna/repository/RepositorySourceManager.java
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositorySource.java
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/SimpleRepositorySource.java
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/TimeDelayingRepositorySource.java
Log:
DNA-189 - Change RepositorySource to extend the ManagedRepositoryConnectionFactory
interface, so it has management methods
http://jira.jboss.com/jira/browse/DNA-189
The RepositorySource only has methods for getting/setting properties and creating
connections. This means that managing the set of connections is not part of the interface,
and this results in the RepositorySource instances needing to be wrapped by
RepositoryConnectionPools. This makes it more difficult (and less consistent) to manage
the instances.
Change name of RepositoryConnectionPool interface to ManagedRepositoryConnectionFactory
Change RepositorySource to extend the ManagedRepositoryConnectionFactory interface (rather
than RepositoryConnectionFactory)
Change name of BasicRepositoryConnectionPool to RepositoryConnectionPool (since there is
no "pool" interface anymore)
Create an AbstractRepositorySource implementation that uses a pool to manage the
instances
Change the existing RepositorySource implementations to extend AbstractRepositorySource
(this is not required, but it does encapsulate the management of the connections)
Clean up RepositorySourceManager, since it no longer needs to distinguish between
RepositorySource and pools (since all RepositorySource are now manageable)
Modified:
trunk/connectors/dna-connector-federation/src/main/java/org/jboss/dna/connector/federation/FederatedRepositorySource.java
===================================================================
---
trunk/connectors/dna-connector-federation/src/main/java/org/jboss/dna/connector/federation/FederatedRepositorySource.java 2008-07-24
19:52:13 UTC (rev 369)
+++
trunk/connectors/dna-connector-federation/src/main/java/org/jboss/dna/connector/federation/FederatedRepositorySource.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -66,6 +66,7 @@
import org.jboss.dna.spi.graph.commands.impl.BasicCompositeCommand;
import org.jboss.dna.spi.graph.commands.impl.BasicGetChildrenCommand;
import org.jboss.dna.spi.graph.commands.impl.BasicGetNodeCommand;
+import org.jboss.dna.spi.graph.connection.AbstractRepositorySource;
import org.jboss.dna.spi.graph.connection.ExecutionEnvironment;
import org.jboss.dna.spi.graph.connection.RepositoryConnection;
import org.jboss.dna.spi.graph.connection.RepositoryConnectionFactories;
@@ -76,13 +77,12 @@
* @author Randall Hauch
*/
@ThreadSafe
-public class FederatedRepositorySource implements RepositorySource {
+public class FederatedRepositorySource extends AbstractRepositorySource {
/**
*/
private static final long serialVersionUID = 7587346948013486977L;
- public static final int DEFAULT_RETRY_LIMIT = 0;
public static final String[] DEFAULT_CONFIGURATION_SOURCE_PROJECTION_RULES =
{"/dna:system => /"};
protected static final String REPOSITORY_NAME = "repositoryName";
@@ -111,7 +111,6 @@
private String executionContextFactoryJndiName;
private String securityDomain;
private String repositoryJndiName;
- private int retryLimit = DEFAULT_RETRY_LIMIT;
private transient FederatedRepository repository;
private transient Context jndiContext;
@@ -120,6 +119,7 @@
* repository name}.
*/
public FederatedRepositorySource() {
+ super();
}
/**
@@ -129,6 +129,7 @@
* @throws IllegalArgumentException if the federation service is null or the
repository name is null or blank
*/
public FederatedRepositorySource( String repositoryName ) {
+ super();
ArgCheck.isNotNull(repositoryName, "repositoryName");
this.repositoryName = repositoryName;
}
@@ -136,323 +137,6 @@
/**
* {@inheritDoc}
*/
- public int getRetryLimit() {
- return this.retryLimit;
- }
-
- /**
- * {@inheritDoc}
- */
- public void setRetryLimit( int limit ) {
- this.retryLimit = limit > 0 ? limit : 0;
- }
-
- /**
- * {@inheritDoc}
- */
- public synchronized RepositoryConnection getConnection() throws
RepositorySourceException {
- if (getName() == null) {
- throw new
RepositorySourceException(FederationI18n.propertyIsRequired.text("name"));
- }
- if (getExecutionContextFactoryJndiName() == null) {
- throw new
RepositorySourceException(FederationI18n.propertyIsRequired.text("execution context
factory JNDI name"));
- }
- if (getSecurityDomain() == null) {
- throw new
RepositorySourceException(FederationI18n.propertyIsRequired.text("security
domain"));
- }
- if (getConnectionFactoriesJndiName() == null) {
- throw new
RepositorySourceException(FederationI18n.propertyIsRequired.text("connection
factories JNDI name"));
- }
- // Find the repository ...
- FederatedRepository repository = getRepository();
- // Authenticate the user ...
- String username = this.username;
- Object credentials = this.password;
- RepositoryConnection connection = repository.createConnection(this, username,
credentials);
- if (connection == null) {
- I18n msg =
FederationI18n.unableToAuthenticateConnectionToFederatedRepository;
- throw new RepositorySourceException(msg.text(this.repositoryName,
username));
- }
- // Return the new connection ...
- return connection;
- }
-
- /**
- * Get the {@link FederatedRepository} instance that this source is using. This
method uses the following logic:
- * <ol>
- * <li>If a {@link FederatedRepository} already was obtained from a prior call,
the same instance is returned.</li>
- * <li>A {@link FederatedRepository} is created using a {@link
FederatedRepositoryConfig} is created from this instance's
- * properties and {@link ExecutionEnvironment} and {@link
RepositoryConnectionFactories} instances obtained from JNDI.</li>
- * <li></li>
- * <li></li>
- * </ol>
- *
- * @return the federated repository instance
- * @throws RepositorySourceException
- */
- protected synchronized FederatedRepository getRepository() throws
RepositorySourceException {
- if (repository == null) {
- String jndiName = this.getRepositoryJndiName();
- Context context = getContext();
- if (jndiName != null && jndiName.trim().length() != 0) {
- // Look for an existing repository in JNDI ...
- try {
- if (context == null) context = new InitialContext();
- repository = (FederatedRepository)context.lookup(jndiName);
- } catch (Throwable err) {
- I18n msg = FederationI18n.unableToFindFederatedRepositoryInJndi;
- throw new RepositorySourceException(msg.text(this.sourceName,
jndiName), err);
- }
- }
-
- if (repository == null) {
- // Find in JNDI the repository connection factories and the environment
...
- ExecutionEnvironment env = getExecutionEnvironment();
- RepositoryConnectionFactories factories =
getRepositoryConnectionFactories();
- // And create the configuration and the repository ...
- FederatedRepositoryConfig config = getRepositoryConfiguration(env,
factories);
- repository = new FederatedRepository(env, factories, config);
- }
- }
- return repository;
- }
-
- protected ExecutionEnvironment getExecutionEnvironment() {
- ExecutionContextFactory factory = null;
- Context context = getContext();
- String jndiName = getExecutionContextFactoryJndiName();
- if (jndiName != null && jndiName.trim().length() != 0) {
- try {
- if (context == null) context = new InitialContext();
- factory = (ExecutionContextFactory)context.lookup(jndiName);
- } catch (Throwable err) {
- I18n msg = FederationI18n.unableToFindExecutionContextFactoryInJndi;
- throw new RepositorySourceException(msg.text(this.sourceName, jndiName),
err);
- }
- }
- if (factory == null) {
- I18n msg = FederationI18n.unableToFindExecutionContextFactoryInJndi;
- throw new RepositorySourceException(msg.text(this.sourceName, jndiName));
- }
- String securityDomain = getSecurityDomain();
- CallbackHandler handler = createCallbackHandler();
- try {
- return factory.create(securityDomain, handler);
- } catch (LoginException e) {
- I18n msg = FederationI18n.unableToCreateExecutionContext;
- throw new RepositorySourceException(msg.text(this.sourceName, jndiName,
securityDomain), e);
- }
- }
-
- protected RepositoryConnectionFactories getRepositoryConnectionFactories() {
- RepositoryConnectionFactories factories = null;
- Context context = getContext();
- String jndiName = getConnectionFactoriesJndiName();
- if (jndiName != null && jndiName.trim().length() != 0) {
- try {
- if (context == null) context = new InitialContext();
- factories = (RepositoryConnectionFactories)context.lookup(jndiName);
- } catch (Throwable err) {
- I18n msg =
FederationI18n.unableToFindRepositoryConnectionFactoriesInJndi;
- throw new RepositorySourceException(msg.text(this.sourceName, jndiName),
err);
- }
- }
- if (factories == null) {
- I18n msg = FederationI18n.noRepositoryConnectionFactories;
- throw new RepositorySourceException(msg.text(this.repositoryName));
- }
- return factories;
- }
-
- protected CallbackHandler createCallbackHandler() {
- return new CallbackHandler() {
- public void handle( Callback[] callbacks ) {
- for (Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- NameCallback nameCallback = (NameCallback)callback;
-
nameCallback.setName(FederatedRepositorySource.this.getUsername());
- }
- if (callback instanceof PasswordCallback) {
- PasswordCallback passwordCallback = (PasswordCallback)callback;
-
passwordCallback.setPassword(FederatedRepositorySource.this.getPassword().toCharArray());
- }
- }
- }
- };
- }
-
- protected Context getContext() {
- return this.jndiContext;
- }
-
- protected synchronized void setContext( Context context ) {
- this.jndiContext = context;
- }
-
- /**
- * Create a {@link FederatedRepositoryConfig} instance from the current properties of
this instance. This method does
- * <i>not</i> modify the state of this instance.
- *
- * @param env the execution environment that should be used to read the
configuration; may not be null
- * @param factories the factories from which can be obtained the
RepositoryConnectionFactory instances for each name source;
- * may not be null
- * @return a configuration reflecting the current state of this instance
- */
- protected synchronized FederatedRepositoryConfig getRepositoryConfiguration(
ExecutionEnvironment env,
-
RepositoryConnectionFactories factories ) {
- Problems problems = new SimpleProblems();
- ValueFactories valueFactories = env.getValueFactories();
- PathFactory pathFactory = valueFactories.getPathFactory();
- NameFactory nameFactory = valueFactories.getNameFactory();
- ValueFactory<Long> longFactory = valueFactories.getLongFactory();
-
- // Create the configuration projection ...
- ProjectionParser projectionParser = ProjectionParser.getInstance();
- Projection.Rule[] rules = projectionParser.rulesFromStrings(env,
this.getConfigurationSourceProjectionRules());
- Projection configurationProjection = new
Projection(this.getConfigurationSourceName(), rules);
-
- // Create a federating command executor to execute the commands and merge the
results into a single set of
- // commands.
- final String configurationSourceName = configurationProjection.getSourceName();
- List<Projection> projections =
Collections.singletonList(configurationProjection);
- CommandExecutor executor = null;
- if (configurationProjection.getRules().size() == 1) {
- // There is just a single projection for the configuration repository, so
just use an executor that
- // translates the paths using the projection
- executor = new SingleProjectionCommandExecutor(env, configurationSourceName,
configurationProjection, factories);
- } else if (configurationProjection.getRules().size() == 0) {
- // There is no projection for the configuration repository, so just use a
no-op executor
- executor = new NoOpCommandExecutor(env, configurationSourceName);
- } else {
- // The configuration repository has more than one projection, so we need to
merge the results
- executor = new FederatingCommandExecutor(env, configurationSourceName, null,
projections, factories);
- }
- // Wrap the executor with a logging executor ...
- executor = new LoggingCommandExecutor(executor, Logger.getLogger(getClass()),
Logger.Level.INFO);
-
- // The configuration projection (via "executor") will convert this path
into a path that exists in the configuration
- // repository
- Path configNode = pathFactory.create("/dna:system/dna:federation");
-
- try {
- // Get the repository node ...
- BasicGetNodeCommand getRepository = new BasicGetNodeCommand(configNode);
- executor.execute(getRepository);
- if (getRepository.hasError()) {
- throw new
FederationException(FederationI18n.federatedRepositoryCannotBeFound.text(repositoryName));
- }
-
- // Add a command to get the projection defining the cache ...
- Path pathToCacheRegion = pathFactory.create(configNode,
nameFactory.create("dna:cache"));
- BasicGetNodeCommand getCacheRegion = new
BasicGetNodeCommand(pathToCacheRegion);
- executor.execute(getCacheRegion);
- Projection cacheProjection = createProjection(env,
- projectionParser,
- getCacheRegion.getPath(),
-
getCacheRegion.getProperties(),
- problems);
-
- if (getCacheRegion.hasError()) {
- I18n msg = FederationI18n.requiredNodeDoesNotExistRelativeToNode;
- throw new FederationException(msg.text("dna:cache",
configNode));
- }
-
- // Get the source projections for the repository ...
- Path projectionsNode = pathFactory.create(configNode,
nameFactory.create("dna:projections"));
- BasicGetChildrenCommand getProjections = new
BasicGetChildrenCommand(projectionsNode);
-
- executor.execute(getProjections);
- if (getProjections.hasError()) {
- I18n msg = FederationI18n.requiredNodeDoesNotExistRelativeToNode;
- throw new FederationException(msg.text("dna:projections",
configNode));
- }
-
- // Build the commands to get each of the projections (children of the
"dna:projections" node) ...
- List<Projection> sourceProjections = new
LinkedList<Projection>();
- if (getProjections.hasNoError() &&
!getProjections.getChildren().isEmpty()) {
- BasicCompositeCommand commands = new BasicCompositeCommand();
- for (Path.Segment child : getProjections.getChildren()) {
- final Path pathToSource = pathFactory.create(projectionsNode,
child);
- commands.add(new BasicGetNodeCommand(pathToSource));
- }
- // Now execute these commands ...
- executor.execute(commands);
-
- // Iterate over each region node obtained ...
- for (GraphCommand command : commands) {
- BasicGetNodeCommand getProjectionCommand =
(BasicGetNodeCommand)command;
- if (getProjectionCommand.hasNoError()) {
- Projection projection = createProjection(env,
- projectionParser,
-
getProjectionCommand.getPath(),
-
getProjectionCommand.getProperties(),
- problems);
- if (projection != null) sourceProjections.add(projection);
- }
- }
- }
-
- // Look for the default cache policy ...
- BasicCachePolicy cachePolicy = new BasicCachePolicy();
- Property timeToExpireProperty =
getRepository.getProperties().get(nameFactory.create(CACHE_POLICY_TIME_TO_EXPIRE_CONFIG_PROPERTY_NAME));
- Property timeToCacheProperty =
getRepository.getProperties().get(nameFactory.create(CACHE_POLICY_TIME_TO_CACHE_CONFIG_PROPERTY_NAME));
- if (timeToCacheProperty != null && !timeToCacheProperty.isEmpty()) {
-
cachePolicy.setTimeToCache(longFactory.create(timeToCacheProperty.getValues().next()));
- }
- if (timeToExpireProperty != null && !timeToExpireProperty.isEmpty())
{
-
cachePolicy.setTimeToExpire(longFactory.create(timeToExpireProperty.getValues().next()));
- }
- CachePolicy defaultCachePolicy = cachePolicy.isEmpty() ? null :
cachePolicy.getUnmodifiable();
- return new FederatedRepositoryConfig(repositoryName, cacheProjection,
sourceProjections, defaultCachePolicy);
- } catch (InvalidPathException err) {
- I18n msg = FederationI18n.federatedRepositoryCannotBeFound;
- throw new FederationException(msg.text(repositoryName));
- } catch (InterruptedException err) {
- I18n msg =
FederationI18n.interruptedWhileUsingFederationConfigurationRepository;
- throw new FederationException(msg.text(repositoryName));
- }
-
- }
-
- /**
- * Instantiate the {@link Projection} described by the supplied properties.
- *
- * @param env the execution environment that should be used to read the
configuration; may not be null
- * @param projectionParser the projection rule parser that should be used; may not be
null
- * @param path the path to the node where these properties were found; never null
- * @param properties the properties; never null
- * @param problems the problems container in which any problems should be reported;
never null
- * @return the region instance, or null if it could not be created
- */
- protected Projection createProjection( ExecutionEnvironment env,
- ProjectionParser projectionParser,
- Path path,
- Map<Name, Property> properties,
- Problems problems ) {
- ValueFactories valueFactories = env.getValueFactories();
- NameFactory nameFactory = valueFactories.getNameFactory();
- ValueFactory<String> stringFactory = valueFactories.getStringFactory();
-
- String sourceName = path.getLastSegment().getName().getLocalName();
-
- // Get the rules ...
- Projection.Rule[] projectionRules = null;
- Property projectionRulesProperty =
properties.get(nameFactory.create(PROJECTION_RULES_CONFIG_PROPERTY_NAME));
- if (projectionRulesProperty != null &&
!projectionRulesProperty.isEmpty()) {
- String[] projectionRuleStrs =
stringFactory.create(projectionRulesProperty.getValuesAsArray());
- if (projectionRuleStrs != null && projectionRuleStrs.length != 0) {
- projectionRules = projectionParser.rulesFromStrings(env,
projectionRuleStrs);
- }
- }
- if (problems.hasErrors()) return null;
-
- Projection region = new Projection(sourceName, projectionRules);
- return region;
- }
-
- /**
- * {@inheritDoc}
- */
public synchronized String getName() {
return sourceName;
}
@@ -476,7 +160,7 @@
* @see #setName(String)
*/
public synchronized void setName( String sourceName ) {
- if (this.sourceName == sourceName || this.sourceName != null &&
this.sourceName.equals(sourceName)) return;
+ if (this.sourceName == sourceName || this.sourceName != null &&
this.sourceName.equals(sourceName)) return; // unchanged
this.sourceName = sourceName;
changeRepositoryConfig();
}
@@ -513,7 +197,7 @@
* @see #setName(String)
*/
public void setRepositoryJndiName( String jndiName ) {
- if (this.repositoryJndiName == jndiName || this.repositoryJndiName != null
&& this.repositoryJndiName.equals(jndiName)) return;
+ if (this.repositoryJndiName == jndiName || this.repositoryJndiName != null
&& this.repositoryJndiName.equals(jndiName)) return; // unchanged
this.repositoryJndiName = jndiName;
changeRepositoryConfig();
}
@@ -554,7 +238,7 @@
*/
public void setConfigurationSourceName( String sourceName ) {
if (this.configurationSourceName == sourceName || this.configurationSourceName !=
null
- && this.configurationSourceName.equals(sourceName)) return;
+ && this.configurationSourceName.equals(sourceName)) return; //
unchanged
this.configurationSourceName = sourceName;
changeRepositoryConfig();
}
@@ -640,7 +324,7 @@
*/
public synchronized void setExecutionContextFactoryJndiName( String jndiName ) {
if (this.repositoryJndiName == jndiName || this.repositoryJndiName != null
&& this.repositoryJndiName.equals(jndiName)) return;
- this.executionContextFactoryJndiName = jndiName;
+ this.executionContextFactoryJndiName = jndiName; // unchanged
changeRepositoryConfig();
}
@@ -681,12 +365,15 @@
*/
public synchronized void setConnectionFactoriesJndiName( String jndiName ) {
if (this.connectionFactoriesJndiName == jndiName ||
this.connectionFactoriesJndiName != null
- && this.connectionFactoriesJndiName.equals(jndiName)) return;
+ && this.connectionFactoriesJndiName.equals(jndiName)) return; //
unchanged
this.connectionFactoriesJndiName = jndiName;
changeRepositoryConfig();
}
/**
+ * Get the name of the security domain that should be used by JAAS to identify the
application or security context. This
+ * should correspond to the JAAS login configuration located within the JAAS login
configuration file.
+ *
* @return securityDomain
*/
public String getSecurityDomain() {
@@ -694,10 +381,15 @@
}
/**
+ * Set the name of the security domain that should be used by JAAS to identify the
application or security context. This
+ * should correspond to the JAAS login configuration located within the JAAS login
configuration file.
+ *
* @param securityDomain Sets securityDomain to the specified value.
*/
public void setSecurityDomain( String securityDomain ) {
+ if (this.securityDomain != null &&
this.securityDomain.equals(securityDomain)) return; // unchanged
this.securityDomain = securityDomain;
+ changeRepositoryConfig();
}
/**
@@ -732,29 +424,12 @@
*/
public synchronized void setRepositoryName( String repositoryName ) {
ArgCheck.isNotEmpty(repositoryName, "repositoryName");
- if (this.repositoryName != null &&
this.repositoryName.equals(repositoryName)) return;
+ if (this.repositoryName != null &&
this.repositoryName.equals(repositoryName)) return; // unchanged
this.repositoryName = repositoryName;
changeRepositoryConfig();
}
/**
- * This method is called to signal that some aspect of the configuration has changed.
If a {@link #getRepository() repository}
- * instance has been created, it's configuration is
- * {@link #getRepositoryConfiguration(ExecutionEnvironment,
RepositoryConnectionFactories) rebuilt} and updated. Nothing is
- * done, however, if there is currently no {@link #getRepository() repository}.
- */
- protected synchronized void changeRepositoryConfig() {
- if (this.repository != null) {
- // Find in JNDI the repository connection factories and the environment ...
- ExecutionEnvironment env = getExecutionEnvironment();
- RepositoryConnectionFactories factories =
getRepositoryConnectionFactories();
- // Compute a new repository config and set it on the repository ...
- FederatedRepositoryConfig newConfig = getRepositoryConfiguration(env,
factories);
- this.repository.setConfiguration(newConfig);
- }
- }
-
- /**
* Get the username that should be used when authenticating and {@link
#getConnection() creating connections}.
* <p>
* This is an optional property, required only when authentication is to be used.
@@ -785,7 +460,9 @@
* @see #setName(String)
*/
public void setUsername( String username ) {
+ if (this.username != null && this.username.equals(username)) return; //
unchanged
this.username = username;
+ changeRepositoryConfig();
}
/**
@@ -818,12 +495,337 @@
* @see #setName(String)
*/
public void setPassword( String password ) {
+ if (this.password != null && this.password.equals(password)) return; //
unchanged
this.password = password;
+ changeRepositoryConfig();
}
/**
+ * This method is called to signal that some aspect of the configuration has changed.
If a {@link #getRepository() repository}
+ * instance has been created, it's configuration is
+ * {@link #getRepositoryConfiguration(ExecutionEnvironment,
RepositoryConnectionFactories) rebuilt} and updated. Nothing is
+ * done, however, if there is currently no {@link #getRepository() repository}.
+ */
+ protected synchronized void changeRepositoryConfig() {
+ if (this.repository != null) {
+ // Find in JNDI the repository connection factories and the environment ...
+ ExecutionEnvironment env = getExecutionEnvironment();
+ RepositoryConnectionFactories factories =
getRepositoryConnectionFactories();
+ // Compute a new repository config and set it on the repository ...
+ FederatedRepositoryConfig newConfig = getRepositoryConfiguration(env,
factories);
+ this.repository.setConfiguration(newConfig);
+ }
+ }
+
+ /**
* {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.AbstractRepositorySource#createConnection()
*/
+ @Override
+ protected synchronized RepositoryConnection createConnection() throws
RepositorySourceException {
+ if (getName() == null) {
+ throw new
RepositorySourceException(FederationI18n.propertyIsRequired.text("name"));
+ }
+ if (getExecutionContextFactoryJndiName() == null) {
+ throw new
RepositorySourceException(FederationI18n.propertyIsRequired.text("execution context
factory JNDI name"));
+ }
+ if (getSecurityDomain() == null) {
+ throw new
RepositorySourceException(FederationI18n.propertyIsRequired.text("security
domain"));
+ }
+ if (getConnectionFactoriesJndiName() == null) {
+ throw new
RepositorySourceException(FederationI18n.propertyIsRequired.text("connection
factories JNDI name"));
+ }
+ // Find the repository ...
+ FederatedRepository repository = getRepository();
+ // Authenticate the user ...
+ String username = this.username;
+ Object credentials = this.password;
+ RepositoryConnection connection = repository.createConnection(this, username,
credentials);
+ if (connection == null) {
+ I18n msg =
FederationI18n.unableToAuthenticateConnectionToFederatedRepository;
+ throw new RepositorySourceException(msg.text(this.repositoryName,
username));
+ }
+ // Return the new connection ...
+ return connection;
+ }
+
+ /**
+ * Get the {@link FederatedRepository} instance that this source is using. This
method uses the following logic:
+ * <ol>
+ * <li>If a {@link FederatedRepository} already was obtained from a prior call,
the same instance is returned.</li>
+ * <li>A {@link FederatedRepository} is created using a {@link
FederatedRepositoryConfig} is created from this instance's
+ * properties and {@link ExecutionEnvironment} and {@link
RepositoryConnectionFactories} instances obtained from JNDI.</li>
+ * <li></li>
+ * <li></li>
+ * </ol>
+ *
+ * @return the federated repository instance
+ * @throws RepositorySourceException
+ */
+ protected synchronized FederatedRepository getRepository() throws
RepositorySourceException {
+ if (repository == null) {
+ String jndiName = this.getRepositoryJndiName();
+ Context context = getContext();
+ if (jndiName != null && jndiName.trim().length() != 0) {
+ // Look for an existing repository in JNDI ...
+ try {
+ if (context == null) context = new InitialContext();
+ repository = (FederatedRepository)context.lookup(jndiName);
+ } catch (Throwable err) {
+ I18n msg = FederationI18n.unableToFindFederatedRepositoryInJndi;
+ throw new RepositorySourceException(msg.text(this.sourceName,
jndiName), err);
+ }
+ }
+
+ if (repository == null) {
+ // Find in JNDI the repository connection factories and the environment
...
+ ExecutionEnvironment env = getExecutionEnvironment();
+ RepositoryConnectionFactories factories =
getRepositoryConnectionFactories();
+ // And create the configuration and the repository ...
+ FederatedRepositoryConfig config = getRepositoryConfiguration(env,
factories);
+ repository = new FederatedRepository(env, factories, config);
+ }
+ }
+ return repository;
+ }
+
+ protected ExecutionEnvironment getExecutionEnvironment() {
+ ExecutionContextFactory factory = null;
+ Context context = getContext();
+ String jndiName = getExecutionContextFactoryJndiName();
+ if (jndiName != null && jndiName.trim().length() != 0) {
+ try {
+ if (context == null) context = new InitialContext();
+ factory = (ExecutionContextFactory)context.lookup(jndiName);
+ } catch (Throwable err) {
+ I18n msg = FederationI18n.unableToFindExecutionContextFactoryInJndi;
+ throw new RepositorySourceException(msg.text(this.sourceName, jndiName),
err);
+ }
+ }
+ if (factory == null) {
+ I18n msg = FederationI18n.unableToFindExecutionContextFactoryInJndi;
+ throw new RepositorySourceException(msg.text(this.sourceName, jndiName));
+ }
+ String securityDomain = getSecurityDomain();
+ CallbackHandler handler = createCallbackHandler();
+ try {
+ return factory.create(securityDomain, handler);
+ } catch (LoginException e) {
+ I18n msg = FederationI18n.unableToCreateExecutionContext;
+ throw new RepositorySourceException(msg.text(this.sourceName, jndiName,
securityDomain), e);
+ }
+ }
+
+ protected RepositoryConnectionFactories getRepositoryConnectionFactories() {
+ RepositoryConnectionFactories factories = null;
+ Context context = getContext();
+ String jndiName = getConnectionFactoriesJndiName();
+ if (jndiName != null && jndiName.trim().length() != 0) {
+ try {
+ if (context == null) context = new InitialContext();
+ factories = (RepositoryConnectionFactories)context.lookup(jndiName);
+ } catch (Throwable err) {
+ I18n msg =
FederationI18n.unableToFindRepositoryConnectionFactoriesInJndi;
+ throw new RepositorySourceException(msg.text(this.sourceName, jndiName),
err);
+ }
+ }
+ if (factories == null) {
+ I18n msg = FederationI18n.noRepositoryConnectionFactories;
+ throw new RepositorySourceException(msg.text(this.repositoryName));
+ }
+ return factories;
+ }
+
+ protected CallbackHandler createCallbackHandler() {
+ return new CallbackHandler() {
+ public void handle( Callback[] callbacks ) {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ NameCallback nameCallback = (NameCallback)callback;
+
nameCallback.setName(FederatedRepositorySource.this.getUsername());
+ }
+ if (callback instanceof PasswordCallback) {
+ PasswordCallback passwordCallback = (PasswordCallback)callback;
+
passwordCallback.setPassword(FederatedRepositorySource.this.getPassword().toCharArray());
+ }
+ }
+ }
+ };
+ }
+
+ protected Context getContext() {
+ return this.jndiContext;
+ }
+
+ protected synchronized void setContext( Context context ) {
+ this.jndiContext = context;
+ }
+
+ /**
+ * Create a {@link FederatedRepositoryConfig} instance from the current properties of
this instance. This method does
+ * <i>not</i> modify the state of this instance.
+ *
+ * @param env the execution environment that should be used to read the
configuration; may not be null
+ * @param factories the factories from which can be obtained the
RepositoryConnectionFactory instances for each name source;
+ * may not be null
+ * @return a configuration reflecting the current state of this instance
+ */
+ protected synchronized FederatedRepositoryConfig getRepositoryConfiguration(
ExecutionEnvironment env,
+
RepositoryConnectionFactories factories ) {
+ Problems problems = new SimpleProblems();
+ ValueFactories valueFactories = env.getValueFactories();
+ PathFactory pathFactory = valueFactories.getPathFactory();
+ NameFactory nameFactory = valueFactories.getNameFactory();
+ ValueFactory<Long> longFactory = valueFactories.getLongFactory();
+
+ // Create the configuration projection ...
+ ProjectionParser projectionParser = ProjectionParser.getInstance();
+ Projection.Rule[] rules = projectionParser.rulesFromStrings(env,
this.getConfigurationSourceProjectionRules());
+ Projection configurationProjection = new
Projection(this.getConfigurationSourceName(), rules);
+
+ // Create a federating command executor to execute the commands and merge the
results into a single set of
+ // commands.
+ final String configurationSourceName = configurationProjection.getSourceName();
+ List<Projection> projections =
Collections.singletonList(configurationProjection);
+ CommandExecutor executor = null;
+ if (configurationProjection.getRules().size() == 1) {
+ // There is just a single projection for the configuration repository, so
just use an executor that
+ // translates the paths using the projection
+ executor = new SingleProjectionCommandExecutor(env, configurationSourceName,
configurationProjection, factories);
+ } else if (configurationProjection.getRules().size() == 0) {
+ // There is no projection for the configuration repository, so just use a
no-op executor
+ executor = new NoOpCommandExecutor(env, configurationSourceName);
+ } else {
+ // The configuration repository has more than one projection, so we need to
merge the results
+ executor = new FederatingCommandExecutor(env, configurationSourceName, null,
projections, factories);
+ }
+ // Wrap the executor with a logging executor ...
+ executor = new LoggingCommandExecutor(executor, Logger.getLogger(getClass()),
Logger.Level.INFO);
+
+ // The configuration projection (via "executor") will convert this path
into a path that exists in the configuration
+ // repository
+ Path configNode = pathFactory.create("/dna:system/dna:federation");
+
+ try {
+ // Get the repository node ...
+ BasicGetNodeCommand getRepository = new BasicGetNodeCommand(configNode);
+ executor.execute(getRepository);
+ if (getRepository.hasError()) {
+ throw new
FederationException(FederationI18n.federatedRepositoryCannotBeFound.text(repositoryName));
+ }
+
+ // Add a command to get the projection defining the cache ...
+ Path pathToCacheRegion = pathFactory.create(configNode,
nameFactory.create("dna:cache"));
+ BasicGetNodeCommand getCacheRegion = new
BasicGetNodeCommand(pathToCacheRegion);
+ executor.execute(getCacheRegion);
+ Projection cacheProjection = createProjection(env,
+ projectionParser,
+ getCacheRegion.getPath(),
+
getCacheRegion.getProperties(),
+ problems);
+
+ if (getCacheRegion.hasError()) {
+ I18n msg = FederationI18n.requiredNodeDoesNotExistRelativeToNode;
+ throw new FederationException(msg.text("dna:cache",
configNode));
+ }
+
+ // Get the source projections for the repository ...
+ Path projectionsNode = pathFactory.create(configNode,
nameFactory.create("dna:projections"));
+ BasicGetChildrenCommand getProjections = new
BasicGetChildrenCommand(projectionsNode);
+
+ executor.execute(getProjections);
+ if (getProjections.hasError()) {
+ I18n msg = FederationI18n.requiredNodeDoesNotExistRelativeToNode;
+ throw new FederationException(msg.text("dna:projections",
configNode));
+ }
+
+ // Build the commands to get each of the projections (children of the
"dna:projections" node) ...
+ List<Projection> sourceProjections = new
LinkedList<Projection>();
+ if (getProjections.hasNoError() &&
!getProjections.getChildren().isEmpty()) {
+ BasicCompositeCommand commands = new BasicCompositeCommand();
+ for (Path.Segment child : getProjections.getChildren()) {
+ final Path pathToSource = pathFactory.create(projectionsNode,
child);
+ commands.add(new BasicGetNodeCommand(pathToSource));
+ }
+ // Now execute these commands ...
+ executor.execute(commands);
+
+ // Iterate over each region node obtained ...
+ for (GraphCommand command : commands) {
+ BasicGetNodeCommand getProjectionCommand =
(BasicGetNodeCommand)command;
+ if (getProjectionCommand.hasNoError()) {
+ Projection projection = createProjection(env,
+ projectionParser,
+
getProjectionCommand.getPath(),
+
getProjectionCommand.getProperties(),
+ problems);
+ if (projection != null) sourceProjections.add(projection);
+ }
+ }
+ }
+
+ // Look for the default cache policy ...
+ BasicCachePolicy cachePolicy = new BasicCachePolicy();
+ Property timeToExpireProperty =
getRepository.getProperties().get(nameFactory.create(CACHE_POLICY_TIME_TO_EXPIRE_CONFIG_PROPERTY_NAME));
+ Property timeToCacheProperty =
getRepository.getProperties().get(nameFactory.create(CACHE_POLICY_TIME_TO_CACHE_CONFIG_PROPERTY_NAME));
+ if (timeToCacheProperty != null && !timeToCacheProperty.isEmpty()) {
+
cachePolicy.setTimeToCache(longFactory.create(timeToCacheProperty.getValues().next()));
+ }
+ if (timeToExpireProperty != null && !timeToExpireProperty.isEmpty())
{
+
cachePolicy.setTimeToExpire(longFactory.create(timeToExpireProperty.getValues().next()));
+ }
+ CachePolicy defaultCachePolicy = cachePolicy.isEmpty() ? null :
cachePolicy.getUnmodifiable();
+ return new FederatedRepositoryConfig(repositoryName, cacheProjection,
sourceProjections, defaultCachePolicy);
+ } catch (InvalidPathException err) {
+ I18n msg = FederationI18n.federatedRepositoryCannotBeFound;
+ throw new FederationException(msg.text(repositoryName));
+ } catch (InterruptedException err) {
+ I18n msg =
FederationI18n.interruptedWhileUsingFederationConfigurationRepository;
+ throw new FederationException(msg.text(repositoryName));
+ }
+
+ }
+
+ /**
+ * Instantiate the {@link Projection} described by the supplied properties.
+ *
+ * @param env the execution environment that should be used to read the
configuration; may not be null
+ * @param projectionParser the projection rule parser that should be used; may not be
null
+ * @param path the path to the node where these properties were found; never null
+ * @param properties the properties; never null
+ * @param problems the problems container in which any problems should be reported;
never null
+ * @return the region instance, or null if it could not be created
+ */
+ protected Projection createProjection( ExecutionEnvironment env,
+ ProjectionParser projectionParser,
+ Path path,
+ Map<Name, Property> properties,
+ Problems problems ) {
+ ValueFactories valueFactories = env.getValueFactories();
+ NameFactory nameFactory = valueFactories.getNameFactory();
+ ValueFactory<String> stringFactory = valueFactories.getStringFactory();
+
+ String sourceName = path.getLastSegment().getName().getLocalName();
+
+ // Get the rules ...
+ Projection.Rule[] projectionRules = null;
+ Property projectionRulesProperty =
properties.get(nameFactory.create(PROJECTION_RULES_CONFIG_PROPERTY_NAME));
+ if (projectionRulesProperty != null &&
!projectionRulesProperty.isEmpty()) {
+ String[] projectionRuleStrs =
stringFactory.create(projectionRulesProperty.getValuesAsArray());
+ if (projectionRuleStrs != null && projectionRuleStrs.length != 0) {
+ projectionRules = projectionParser.rulesFromStrings(env,
projectionRuleStrs);
+ }
+ }
+ if (problems.hasErrors()) return null;
+
+ Projection region = new Projection(sourceName, projectionRules);
+ return region;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
public synchronized Reference getReference() {
String className = getClass().getName();
String factoryClassName = NamingContextObjectFactory.class.getName();
Modified:
trunk/connectors/dna-connector-federation/src/test/java/org/jboss/dna/connector/federation/FederatedRepositorySourceTest.java
===================================================================
---
trunk/connectors/dna-connector-federation/src/test/java/org/jboss/dna/connector/federation/FederatedRepositorySourceTest.java 2008-07-24
19:52:13 UTC (rev 369)
+++
trunk/connectors/dna-connector-federation/src/test/java/org/jboss/dna/connector/federation/FederatedRepositorySourceTest.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -24,7 +24,6 @@
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.notNullValue;
import static org.hamcrest.core.IsNull.nullValue;
-import static org.hamcrest.core.IsSame.sameInstance;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
@@ -43,6 +42,7 @@
import org.jboss.dna.spi.ExecutionContextFactory;
import org.jboss.dna.spi.graph.connection.BasicExecutionEnvironment;
import org.jboss.dna.spi.graph.connection.ExecutionEnvironment;
+import org.jboss.dna.spi.graph.connection.RepositoryConnection;
import org.jboss.dna.spi.graph.connection.RepositoryConnectionFactories;
import org.jboss.dna.spi.graph.connection.RepositorySourceException;
import org.jboss.dna.spi.graph.connection.SimpleRepository;
@@ -72,7 +72,7 @@
private SimpleRepositorySource configRepositorySource;
private ExecutionEnvironment env;
@Mock
- private FederatedRepositoryConnection connection;
+ private RepositoryConnection connection;
@Mock
private Context jndiContext;
@Mock
@@ -143,8 +143,8 @@
@Test
public void shouldCreateConnectionsByAuthenticateUsingFederationRepository() throws
Exception {
- connection = (FederatedRepositoryConnection)source.getConnection();
- assertThat(connection, is(sameInstance(connection)));
+ connection = source.getConnection();
+ assertThat(connection, is(notNullValue()));
}
@Test( expected = RepositorySourceException.class )
Modified:
trunk/connectors/dna-connector-federation/src/test/java/org/jboss/dna/connector/federation/FederatedRepositoryTest.java
===================================================================
---
trunk/connectors/dna-connector-federation/src/test/java/org/jboss/dna/connector/federation/FederatedRepositoryTest.java 2008-07-24
19:52:13 UTC (rev 369)
+++
trunk/connectors/dna-connector-federation/src/test/java/org/jboss/dna/connector/federation/FederatedRepositoryTest.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -52,7 +52,7 @@
@Mock
private RepositoryConnectionFactories connectionFactories;
- // private BasicRepositoryConnectionPool connectionPool;
+ // private RepositoryConnectionPool connectionPool;
@Before
public void beforeEach() {
Modified:
trunk/connectors/dna-connector-inmemory/src/main/java/org/jboss/dna/connector/inmemory/InMemoryRepositorySource.java
===================================================================
---
trunk/connectors/dna-connector-inmemory/src/main/java/org/jboss/dna/connector/inmemory/InMemoryRepositorySource.java 2008-07-24
19:52:13 UTC (rev 369)
+++
trunk/connectors/dna-connector-inmemory/src/main/java/org/jboss/dna/connector/inmemory/InMemoryRepositorySource.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -27,7 +27,6 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -41,13 +40,14 @@
import net.jcip.annotations.GuardedBy;
import org.jboss.dna.common.util.ArgCheck;
import org.jboss.dna.spi.cache.CachePolicy;
+import org.jboss.dna.spi.graph.connection.AbstractRepositorySource;
import org.jboss.dna.spi.graph.connection.RepositoryConnection;
-import org.jboss.dna.spi.graph.connection.RepositorySource;
+import org.jboss.dna.spi.graph.connection.RepositorySourceException;
/**
* @author Randall Hauch
*/
-public class InMemoryRepositorySource implements RepositorySource, ObjectFactory {
+public class InMemoryRepositorySource extends AbstractRepositorySource implements
ObjectFactory {
/**
* The initial version is 1
@@ -95,7 +95,6 @@
private String name;
@GuardedBy( "this" )
private String jndiName;
- private final AtomicInteger retryLimit = new AtomicInteger(0);
private UUID rootNodeUuid = UUID.randomUUID();
private CachePolicy defaultCachePolicy;
private String configurationName;
@@ -105,6 +104,7 @@
* Create a repository source instance.
*/
public InMemoryRepositorySource() {
+ super();
}
/**
@@ -225,22 +225,11 @@
/**
* {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.AbstractRepositorySource#createConnection()
*/
- public int getRetryLimit() {
- return retryLimit.get();
- }
-
- /**
- * {@inheritDoc}
- */
- public void setRetryLimit( int limit ) {
- retryLimit.set(limit);
- }
-
- /**
- * {@inheritDoc}
- */
- public synchronized RepositoryConnection getConnection() {
+ @Override
+ protected synchronized RepositoryConnection createConnection() throws
RepositorySourceException {
if (this.repository == null) {
repository = new InMemoryRepository(configurationName, this.rootNodeUuid);
}
Modified:
trunk/connectors/dna-connector-jbosscache/src/main/java/org/jboss/dna/connector/jbosscache/JBossCacheSource.java
===================================================================
---
trunk/connectors/dna-connector-jbosscache/src/main/java/org/jboss/dna/connector/jbosscache/JBossCacheSource.java 2008-07-24
19:52:13 UTC (rev 369)
+++
trunk/connectors/dna-connector-jbosscache/src/main/java/org/jboss/dna/connector/jbosscache/JBossCacheSource.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -27,7 +27,6 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -47,14 +46,14 @@
import org.jboss.dna.spi.cache.CachePolicy;
import org.jboss.dna.spi.graph.Name;
import org.jboss.dna.spi.graph.NameFactory;
+import org.jboss.dna.spi.graph.connection.AbstractRepositorySource;
import org.jboss.dna.spi.graph.connection.RepositoryConnection;
-import org.jboss.dna.spi.graph.connection.RepositorySource;
/**
* @author Randall Hauch
*/
@ThreadSafe
-public class JBossCacheSource implements RepositorySource, ObjectFactory {
+public class JBossCacheSource extends AbstractRepositorySource implements ObjectFactory
{
/**
*/
@@ -100,7 +99,6 @@
private String name;
@GuardedBy( "this" )
private String jndiName;
- private final AtomicInteger retryLimit = new AtomicInteger(0);
private UUID rootNodeUuid = UUID.randomUUID();
private CachePolicy defaultCachePolicy;
private String cacheConfigurationName;
@@ -261,21 +259,8 @@
/**
* {@inheritDoc}
*/
- public int getRetryLimit() {
- return retryLimit.get();
- }
-
- /**
- * {@inheritDoc}
- */
- public void setRetryLimit( int limit ) {
- retryLimit.set(limit);
- }
-
- /**
- * {@inheritDoc}
- */
- public synchronized RepositoryConnection getConnection() {
+ @Override
+ protected synchronized RepositoryConnection createConnection() {
if (this.cache == null) {
CacheFactory<Name, Object> factory = new DefaultCacheFactory<Name,
Object>();
cache = factory.createCache(cacheConfigurationName);
Modified:
trunk/dna-repository/src/main/java/org/jboss/dna/repository/RepositorySourceManager.java
===================================================================
---
trunk/dna-repository/src/main/java/org/jboss/dna/repository/RepositorySourceManager.java 2008-07-24
19:52:13 UTC (rev 369)
+++
trunk/dna-repository/src/main/java/org/jboss/dna/repository/RepositorySourceManager.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -32,7 +32,6 @@
import org.jboss.dna.spi.graph.connection.RepositoryConnection;
import org.jboss.dna.spi.graph.connection.RepositoryConnectionFactories;
import org.jboss.dna.spi.graph.connection.RepositoryConnectionFactory;
-import org.jboss.dna.spi.graph.connection.RepositoryConnectionPool;
import org.jboss.dna.spi.graph.connection.RepositorySource;
/**
@@ -139,10 +138,7 @@
try {
this.sourcesLock.readLock().lock();
for (RepositorySource source : this.sources) {
- if (source instanceof RepositoryConnectionPool) {
- RepositoryConnectionPool pool = (RepositoryConnectionPool)source;
- pool.shutdown();
- }
+ source.shutdown();
}
} finally {
this.sourcesLock.readLock().unlock();
@@ -164,12 +160,7 @@
try {
this.sourcesLock.readLock().lock();
for (RepositorySource source : this.sources) {
- if (source instanceof RepositoryConnectionPool) {
- RepositoryConnectionPool pool = (RepositoryConnectionPool)source;
- if (!pool.awaitTermination(timeout, unit)) {
- return false;
- }
- }
+ if (!source.awaitTermination(timeout, unit)) return false;
}
return true;
} finally {
@@ -191,12 +182,7 @@
try {
this.sourcesLock.readLock().lock();
for (RepositorySource source : this.sources) {
- if (source instanceof RepositoryConnectionPool) {
- RepositoryConnectionPool pool = (RepositoryConnectionPool)source;
- if (pool.isTerminating()) {
- return true;
- }
- }
+ if (source.isTerminating()) return true;
}
return false;
} finally {
@@ -214,12 +200,7 @@
try {
this.sourcesLock.readLock().lock();
for (RepositorySource source : this.sources) {
- if (source instanceof RepositoryConnectionPool) {
- RepositoryConnectionPool pool = (RepositoryConnectionPool)source;
- if (!pool.isTerminated()) {
- return false;
- }
- }
+ if (!source.isTerminated()) return false;
}
return true;
} finally {
@@ -309,12 +290,9 @@
this.sourcesLock.writeLock().lock();
for (RepositorySource existingSource : this.sources) {
if (existingSource.getName().equals(name)) {
- // Shut down the connection pool if it is one ...
- if (existingSource instanceof RepositoryConnectionPool) {
- RepositoryConnectionPool pool =
(RepositoryConnectionPool)existingSource;
- pool.shutdown();
- if (timeToAwait > 0l) pool.awaitTermination(timeToAwait,
unit);
- }
+ // Shut down the source ...
+ existingSource.shutdown();
+ if (timeToAwait > 0l) existingSource.awaitTermination(timeToAwait,
unit);
}
return existingSource;
}
Added:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/AbstractRepositorySource.java
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/AbstractRepositorySource.java
(rev 0)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/AbstractRepositorySource.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -0,0 +1,203 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.connection;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import net.jcip.annotations.ThreadSafe;
+
+/**
+ * An abstract implementation of {@link RepositorySource} that may serve as a foundation
for most implementations, since it
+ * automatically manages the {@link RepositoryConnection connections} using an internal
{@link ManagedRepositoryConnectionFactory pool}.
+ *
+ * @author Randall Hauch
+ */
+@ThreadSafe
+public abstract class AbstractRepositorySource implements RepositorySource {
+
+ /**
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The default limit is {@value} for retrying {@link RepositoryConnection connection}
calls to the underlying source.
+ */
+ public static final int DEFAULT_RETRY_LIMIT = 0;
+
+ private final AtomicInteger retryLimit = new AtomicInteger(DEFAULT_RETRY_LIMIT);
+ private final RepositoryConnectionPool connections;
+ private final RepositoryConnectionFactory connectionFactory;
+
+ /**
+ * Create a new instance of the repository source, relying upon the {@link
#createConnection()} method to do the actual
+ * creation of the {@link RepositoryConnection connections}.
+ */
+ protected AbstractRepositorySource() {
+ this(null);
+ }
+
+ /**
+ * Create a new instance of the repository source, relying upon the supplied factory
to do the actual creation of the
+ * {@link RepositoryConnection connections}. If the supplied factory is null, then
this class will use the
+ * {@link #createConnection()} method to do the actual creation of the {@link
RepositoryConnection connections}.
+ *
+ * @param factory the connection factory that creates the connections, or null if the
{@link #createConnection()} method
+ * should be used to create connections
+ */
+ protected AbstractRepositorySource( RepositoryConnectionFactory factory ) {
+ this.connectionFactory = factory != null ? factory : new ConnectionFactory();
+ this.connections = new RepositoryConnectionPool(this.connectionFactory);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.spi.graph.connection.RepositorySource#getRetryLimit()
+ */
+ public int getRetryLimit() {
+ return retryLimit.get();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.spi.graph.connection.RepositorySource#setRetryLimit(int)
+ */
+ public void setRetryLimit( int limit ) {
+ retryLimit.set(limit < 0 ? 0 : limit);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#isRunning()
+ */
+ public boolean isRunning() {
+ return this.connections.isRunning();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#isShutdown()
+ */
+ public boolean isShutdown() {
+ return this.connections.isShutdown();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.RepositoryConnectionFactory#getConnection()
+ */
+ public RepositoryConnection getConnection() throws RepositorySourceException,
InterruptedException {
+ return this.connections.getConnection();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#shutdown()
+ */
+ public void shutdown() {
+ this.connections.shutdown();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#shutdownNow()
+ */
+ public void shutdownNow() {
+ this.connections.shutdownNow();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#isTerminated()
+ */
+ public boolean isTerminated() {
+ return this.connections.isTerminated();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#isTerminating()
+ */
+ public boolean isTerminating() {
+ return this.connections.isTerminating();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#awaitTermination(long,
java.util.concurrent.TimeUnit)
+ */
+ public boolean awaitTermination( long timeout,
+ TimeUnit unit ) throws InterruptedException {
+ return this.connections.awaitTermination(timeout, unit);
+ }
+
+ /**
+ * Method to create a new {@link RepositoryConnection} instance. This method is
called only when this instance was
+ * {@link #AbstractRepositorySource(RepositoryConnectionFactory) constructed} with a
null {@link RepositoryConnectionFactory}
+ * reference. This makes it easy for subclasses to simply override this method can
not be required to implement a separate
+ * connection factory.
+ *
+ * @return the new connection
+ * @throws RepositorySourceException
+ * @throws InterruptedException
+ */
+ protected RepositoryConnection createConnection() throws RepositorySourceException,
InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Class that is used by the {@link AbstractRepositorySource} and it's pool to
create connections as needed, by delegating to
+ * the {@link AbstractRepositorySource}'s {@link
AbstractRepositorySource#createConnection()} method.
+ *
+ * @author Randall Hauch
+ */
+ protected class ConnectionFactory implements RepositoryConnectionFactory {
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.RepositoryConnectionFactory#getConnection()
+ */
+ public RepositoryConnection getConnection() throws RepositorySourceException,
InterruptedException {
+ return AbstractRepositorySource.this.createConnection();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionFactory#getName()
+ */
+ public String getName() {
+ return AbstractRepositorySource.this.getName();
+ }
+
+ }
+}
Property changes on:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/AbstractRepositorySource.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Deleted:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPool.java
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPool.java 2008-07-24
19:52:13 UTC (rev 369)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPool.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -1,967 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.dna.spi.graph.connection;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.transaction.xa.XAResource;
-import net.jcip.annotations.GuardedBy;
-import net.jcip.annotations.ThreadSafe;
-import org.jboss.dna.common.util.ArgCheck;
-import org.jboss.dna.common.util.Logger;
-import org.jboss.dna.spi.SpiI18n;
-import org.jboss.dna.spi.cache.CachePolicy;
-import org.jboss.dna.spi.graph.commands.GraphCommand;
-
-/**
- * @author Randall Hauch
- */
-@ThreadSafe
-public class BasicRepositoryConnectionPool implements RepositoryConnectionPool {
-
- /**
- * The core pool size for default-constructed pools is {@value} .
- */
- public static final int DEFAULT_CORE_POOL_SIZE = 1;
-
- /**
- * The maximum pool size for default-constructed pools is {@value} .
- */
- public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
-
- /**
- * The keep-alive time for connections in default-constructed pools is {@value}
seconds.
- */
- public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
-
- /**
- * Permission for checking shutdown
- */
- private static final RuntimePermission shutdownPerm = new
RuntimePermission("modifyThread");
-
- /**
- * The factory that this pool uses to create new connections.
- */
- private final RepositoryConnectionFactory connectionFactory;
-
- /**
- * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
- */
- private final ReentrantLock mainLock = new ReentrantLock();
-
- /**
- * Wait condition to support awaitTermination
- */
- private final Condition termination = mainLock.newCondition();
-
- /**
- * Set containing all connections that are available for use.
- */
- @GuardedBy( "mainLock" )
- private final BlockingQueue<ConnectionWrapper> availableConnections = new
LinkedBlockingQueue<ConnectionWrapper>();
-
- /**
- * The connections that are currently in use.
- */
- @GuardedBy( "mainLock" )
- private final Set<ConnectionWrapper> inUseConnections = new
HashSet<ConnectionWrapper>();
-
- /**
- * Timeout in nanoseconds for idle connections waiting to be used. Threads use this
timeout only when there are more than
- * corePoolSize present. Otherwise they wait forever to be used.
- */
- private volatile long keepAliveTime;
-
- /**
- * The target pool size, updated only while holding mainLock, but volatile to allow
concurrent readability even during
- * updates.
- */
- @GuardedBy( "mainLock" )
- private volatile int corePoolSize;
-
- /**
- * Maximum pool size, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
- */
- @GuardedBy( "mainLock" )
- private volatile int maximumPoolSize;
-
- /**
- * Current pool size, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
- */
- @GuardedBy( "mainLock" )
- private volatile int poolSize;
-
- /**
- * Lifecycle state, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
- */
- @GuardedBy( "mainLock" )
- private volatile int runState;
-
- // Special values for runState
- /** Normal, not-shutdown mode */
- static final int RUNNING = 0;
- /** Controlled shutdown mode */
- static final int SHUTDOWN = 1;
- /** Immediate shutdown mode */
- static final int STOP = 2;
- /** Final state */
- static final int TERMINATED = 3;
-
- /**
- * Flag specifying whether a connection should be validated before returning it from
the {@link #getConnection()} method.
- */
- private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
-
- /**
- * The time in nanoseconds that ping should wait before timing out and failing.
- */
- private final AtomicLong pingTimeout = new AtomicLong(0);
-
- /**
- * The number of times an attempt to obtain a connection should fail with invalid
connections before throwing an exception.
- */
- private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
-
- private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
-
- private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
-
- private final Logger logger = Logger.getLogger(this.getClass());
-
- /**
- * Create the pool to use the supplied connection factory, which is typically a
{@link RepositorySource}. This constructor
- * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link
#DEFAULT_MAXIMUM_POOL_SIZE default maximum pool
- * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in
seconds)}.
- *
- * @param connectionFactory the factory for connections
- * @throws IllegalArgumentException if the connection factory is null or any of the
supplied arguments are invalid
- */
- public BasicRepositoryConnectionPool( RepositoryConnectionFactory connectionFactory )
{
- this(connectionFactory, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE,
DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS,
- TimeUnit.SECONDS);
- }
-
- /**
- * Create the pool to use the supplied connection factory, which is typically a
{@link RepositorySource}.
- *
- * @param connectionFactory the factory for connections
- * @param corePoolSize the number of connections to keep in the pool, even if they
are idle.
- * @param maximumPoolSize the maximum number of connections to allow in the pool.
- * @param keepAliveTime when the number of connection is greater than the core, this
is the maximum time that excess idle
- * connections will be kept before terminating.
- * @param unit the time unit for the keepAliveTime argument.
- * @throws IllegalArgumentException if the connection factory is null or any of the
supplied arguments are invalid
- */
- public BasicRepositoryConnectionPool( RepositoryConnectionFactory connectionFactory,
- int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit ) {
- ArgCheck.isNonNegative(corePoolSize, "corePoolSize");
- ArgCheck.isPositive(maximumPoolSize, "maximumPoolSize");
- ArgCheck.isNonNegative(keepAliveTime, "keepAliveTime");
- ArgCheck.isNotNull(connectionFactory, "repository connection
factory");
- if (maximumPoolSize < corePoolSize) {
- throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
- }
- this.connectionFactory = connectionFactory;
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.setPingTimeout(100, TimeUnit.MILLISECONDS);
- }
-
- /**
- * {@inheritDoc}
- *
- * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionFactory#getName()
- */
- public String getName() {
- return this.connectionFactory.getName();
- }
-
- // -------------------------------------------------
- // Property settings ...
- // -------------------------------------------------
-
- /**
- * @return validateConnectionBeforeUse
- */
- public boolean getValidateConnectionBeforeUse() {
- return this.validateConnectionBeforeUse.get();
- }
-
- /**
- * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the
specified value.
- */
- public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
- this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
- }
-
- /**
- * @return pingTimeout
- */
- public long getPingTimeoutInNanos() {
- return this.pingTimeout.get();
- }
-
- /**
- * @param pingTimeout the time to wait for a ping to complete
- * @param unit the time unit of the time argument
- */
- public void setPingTimeout( long pingTimeout,
- TimeUnit unit ) {
- ArgCheck.isNonNegative(pingTimeout, "time");
- this.pingTimeout.set(unit.toNanos(pingTimeout));
- }
-
- /**
- * @return maxFailedAttemptsBeforeError
- */
- public int getMaxFailedAttemptsBeforeError() {
- return this.maxFailedAttemptsBeforeError.get();
- }
-
- /**
- * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the
specified value.
- */
- public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
- this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
- }
-
- /**
- * Sets the time limit for which connections may remain idle before being closed. If
there are more than the core number of
- * connections currently in the pool, after waiting this amount of time without being
used, excess threads will be terminated.
- * This overrides any value set in the constructor.
- *
- * @param time the time to wait. A time value of zero will cause excess connections
to terminate immediately after being
- * returned.
- * @param unit the time unit of the time argument
- * @throws IllegalArgumentException if time less than zero
- * @see #getKeepAliveTime
- */
- public void setKeepAliveTime( long time,
- TimeUnit unit ) {
- ArgCheck.isNonNegative(time, "time");
- this.keepAliveTime = unit.toNanos(time);
- }
-
- /**
- * Returns the connection keep-alive time, which is the amount of time which
connections in excess of the core pool size may
- * remain idle before being closed.
- *
- * @param unit the desired time unit of the result
- * @return the time limit
- * @see #setKeepAliveTime
- */
- public long getKeepAliveTime( TimeUnit unit ) {
- assert unit != null;
- return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
- }
-
- /**
- * @return maximumPoolSize
- */
- public int getMaximumPoolSize() {
- return this.maximumPoolSize;
- }
-
- /**
- * Sets the maximum allowed number of connections. This overrides any value set in
the constructor. If the new value is
- * smaller than the current value, excess existing but unused connections will be
closed.
- *
- * @param maximumPoolSize the new maximum
- * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link
#getCorePoolSize() core pool size}
- * @see #getMaximumPoolSize
- */
- public void setMaximumPoolSize( int maximumPoolSize ) {
- ArgCheck.isPositive(maximumPoolSize, "maximum pool size");
- if (maximumPoolSize < corePoolSize) {
- throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
- }
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- int extra = this.maximumPoolSize - maximumPoolSize;
- this.maximumPoolSize = maximumPoolSize;
- if (extra > 0 && poolSize > maximumPoolSize) {
- // Drain the extra connections from those available ...
- drainUnusedConnections(extra);
- }
- } finally {
- mainLock.unlock();
- }
- }
-
- /**
- * Returns the core number of connections.
- *
- * @return the core number of connections
- * @see #setCorePoolSize(int)
- */
- public int getCorePoolSize() {
- return this.corePoolSize;
- }
-
- /**
- * Sets the core number of connections. This overrides any value set in the
constructor. If the new value is smaller than the
- * current value, excess existing and unused connections will be closed. If larger,
new connections will, if needed, be
- * created.
- *
- * @param corePoolSize the new core size
- * @throws RepositorySourceException if there was an error obtaining the new
connection
- * @throws InterruptedException if the thread was interrupted during the operation
- * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than
zero
- * @see #getCorePoolSize()
- */
- public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException,
InterruptedException {
- ArgCheck.isNonNegative(corePoolSize, "core pool size");
- if (maximumPoolSize < corePoolSize) {
- throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
- }
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- int extra = this.corePoolSize - corePoolSize;
- this.corePoolSize = corePoolSize;
- if (extra < 0) {
- // Add connections ...
- addConnectionsIfUnderCorePoolSize();
- } else if (extra > 0 && poolSize > corePoolSize) {
- // Drain the extra connections from those available ...
- drainUnusedConnections(extra);
- }
- } finally {
- mainLock.unlock();
- }
- }
-
- // -------------------------------------------------
- // Statistics ...
- // -------------------------------------------------
-
- /**
- * Returns the current number of connections in the pool, including those that are
checked out (in use) and those that are not
- * being used.
- *
- * @return the number of connections
- */
- public int getPoolSize() {
- return poolSize;
- }
-
- /**
- * Returns the approximate number of connections that are currently checked out from
the pool.
- *
- * @return the number of checked-out connections
- */
- public int getInUseCount() {
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- return this.inUseConnections.size();
- } finally {
- mainLock.unlock();
- }
- }
-
- /**
- * Get the total number of connections that have been created by this pool.
- *
- * @return the total number of connections created by this pool
- */
- public long getTotalConnectionsCreated() {
- return this.totalConnectionsCreated.get();
- }
-
- /**
- * Get the total number of times connections have been {@link #getConnection()}
used.
- *
- * @return the total number
- */
- public long getTotalConnectionsUsed() {
- return this.totalConnectionsUsed.get();
- }
-
- // -------------------------------------------------
- // State management methods ...
- // -------------------------------------------------
-
- /**
- * Starts a core connection, causing it to idly wait for use. This overrides the
default policy of starting core connections
- * only when they are {@link #getConnection() needed}. This method will return
<tt>false</tt> if all core connections have
- * already been started.
- *
- * @return true if a connection was started
- * @throws RepositorySourceException if there was an error obtaining the new
connection
- * @throws InterruptedException if the thread was interrupted during the operation
- */
- public boolean prestartCoreConnection() throws RepositorySourceException,
InterruptedException {
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- return addConnectionIfUnderCorePoolSize();
- } finally {
- mainLock.unlock();
- }
- }
-
- /**
- * Starts all core connections, causing them to idly wait for use. This overrides the
default policy of starting core
- * connections only when they are {@link #getConnection() needed}.
- *
- * @return the number of connections started.
- * @throws RepositorySourceException if there was an error obtaining the new
connection
- * @throws InterruptedException if the thread was interrupted during the operation
- */
- public int prestartAllCoreConnections() throws RepositorySourceException,
InterruptedException {
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- return addConnectionsIfUnderCorePoolSize();
- } finally {
- mainLock.unlock();
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#shutdown()
- */
- public void shutdown() {
- // Fail if caller doesn't have modifyThread permission. We
- // explicitly check permissions directly because we can't trust
- // implementations of SecurityManager to correctly override
- // the "check access" methods such that our documented
- // security policy is implemented.
- SecurityManager security = System.getSecurityManager();
- if (security != null)
java.security.AccessController.checkPermission(shutdownPerm);
-
- this.logger.debug("Shutting down repository connection pool for {0}",
getName());
- boolean fullyTerminated = false;
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- int state = this.runState;
- if (state == RUNNING) {
- // don't override shutdownNow
- this.runState = SHUTDOWN;
- }
-
- // Kill the maintenance thread ...
-
- // Remove and close all available connections ...
- if (!this.availableConnections.isEmpty()) {
- // Drain the extra connections from those available ...
- drainUnusedConnections(this.availableConnections.size());
- }
-
- // If there are no connections being used, trigger full termination now ...
- if (this.inUseConnections.isEmpty()) {
- fullyTerminated = true;
- this.logger.trace("Signalling termination of repository connection
pool for {0}", getName());
- runState = TERMINATED;
- termination.signalAll();
- this.logger.debug("Terminated repository connection pool for
{0}", getName());
- }
- // Otherwise the last connection that is closed will transition the runState
to TERMINATED ...
- } finally {
- mainLock.unlock();
- }
- if (fullyTerminated) terminated();
- }
-
- /**
- * {@inheritDoc}
- *
- * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#shutdownNow()
- */
- public void shutdownNow() {
- // Almost the same code as shutdown()
- SecurityManager security = System.getSecurityManager();
- if (security != null)
java.security.AccessController.checkPermission(shutdownPerm);
-
- this.logger.debug("Shutting down (immediately) repository connection pool
for {0}", getName());
- boolean fullyTerminated = false;
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- int state = this.runState;
- if (state != TERMINATED) {
- // don't override shutdownNow
- this.runState = STOP;
- }
-
- // Kill the maintenance thread ...
-
- // Remove and close all available connections ...
- if (!this.availableConnections.isEmpty()) {
- // Drain the extra connections from those available ...
- drainUnusedConnections(this.availableConnections.size());
- }
-
- // If there are connections being used, close them now ...
- if (!this.inUseConnections.isEmpty()) {
- for (ConnectionWrapper connectionInUse : this.inUseConnections) {
- try {
- this.logger.trace("Closing repository connection to
{0}", getName());
- connectionInUse.getOriginal().close();
- } catch (InterruptedException e) {
- // Ignore this ...
- }
- }
- this.poolSize -= this.inUseConnections.size();
- // The last connection that is closed will transition the runState to
TERMINATED ...
- } else {
- // There are no connections in use, so trigger full termination now ...
- fullyTerminated = true;
- this.logger.trace("Signalling termination of repository connection
pool for {0}", getName());
- runState = TERMINATED;
- termination.signalAll();
- this.logger.debug("Terminated repository connection pool for
{0}", getName());
- }
-
- } finally {
- mainLock.unlock();
- }
- if (fullyTerminated) terminated();
- }
-
- /**
- * {@inheritDoc}
- *
- * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#isRunning()
- */
- public boolean isRunning() {
- return runState == RUNNING;
- }
-
- /**
- * {@inheritDoc}
- *
- * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#isShutdown()
- */
- public boolean isShutdown() {
- return runState != RUNNING;
- }
-
- /**
- * {@inheritDoc}
- *
- * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#isTerminating()
- */
- public boolean isTerminating() {
- return runState == STOP;
- }
-
- /**
- * {@inheritDoc}
- *
- * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#isTerminated()
- */
- public boolean isTerminated() {
- return runState == TERMINATED;
- }
-
- /**
- * {@inheritDoc}
- *
- * @see
org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#awaitTermination(long,
java.util.concurrent.TimeUnit)
- */
- public boolean awaitTermination( long timeout,
- TimeUnit unit ) throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- for (;;) {
- // this.logger.debug("---> Run state = {}; condition = {}",
runState, termination);
- if (runState == TERMINATED) return true;
- if (nanos <= 0) return false;
- nanos = termination.awaitNanos(nanos);
- // this.logger.debug("---> Done waiting: run state = {};
condition = {}", runState, termination);
- }
- } finally {
- mainLock.unlock();
- }
- }
-
- /**
- * Method invoked when the pool has terminated. Default implementation does nothing.
Note: To properly nest multiple
- * overridings, subclasses should generally invoke
<tt>super.terminated</tt> within this method.
- */
- protected void terminated() {
- }
-
- /**
- * Invokes <tt>shutdown</tt> when this pool is no longer referenced.
- */
- @Override
- protected void finalize() {
- shutdown();
- }
-
- // -------------------------------------------------
- // Connection management methods ...
- // -------------------------------------------------
-
- /**
- * {@inheritDoc}
- */
- public RepositoryConnection getConnection() throws RepositorySourceException,
InterruptedException {
- int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
- ConnectionWrapper connection = null;
- // Do this until we get a good connection ...
- int attemptsRemaining = attemptsAllowed;
- while (connection == null && attemptsRemaining > 0) {
- --attemptsRemaining;
- ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- // If we're shutting down the pool, then just close the connection
...
- if (this.runState != RUNNING) {
- throw new
IllegalStateException(SpiI18n.repositoryConnectionPoolIsNotRunning.text());
- }
- // If there are fewer total connections than the core size ...
- if (this.poolSize < this.corePoolSize) {
- // Immediately create a wrapped connection and return it ...
- connection = newWrappedConnection();
- }
- // Peek to see if there is a connection available ...
- else if (this.availableConnections.peek() != null) {
- // There is, so take it and return it ...
- connection = this.availableConnections.take();
- }
- // There is no connection available. If there are fewer total connections
than the maximum size ...
- else if (this.poolSize < this.maximumPoolSize) {
- // Immediately create a wrapped connection and return it ...
- connection = newWrappedConnection();
- }
- if (connection != null) {
- this.inUseConnections.add(connection);
- }
- } finally {
- mainLock.unlock();
- }
- if (connection == null) {
- // There are not enough connections, so wait in line for the next
available connection ...
- this.logger.trace("Waiting for a repository connection from pool
{0}", getName());
- connection = this.availableConnections.take();
- mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (connection != null) {
- this.inUseConnections.add(connection);
- }
- } finally {
- mainLock.unlock();
- }
- this.logger.trace("Recieved a repository connection from pool
{0}", getName());
- }
- if (connection != null && this.validateConnectionBeforeUse.get()) {
- connection = validateConnection(connection);
- }
- }
- if (connection == null) {
- // We were unable to obtain a usable connection, so fail ...
- throw new
RepositorySourceException(SpiI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
- }
- this.totalConnectionsUsed.incrementAndGet();
- return connection;
- }
-
- /**
- * This method is automatically called by the {@link ConnectionWrapper} when it is
{@link ConnectionWrapper#close() closed}.
- *
- * @param wrapper the wrapper to the connection that is being returned to the pool
- */
- protected void returnConnection( ConnectionWrapper wrapper ) {
- assert wrapper != null;
- ConnectionWrapper wrapperToClose = null;
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- // Remove the connection from the in-use set ...
- boolean removed = this.inUseConnections.remove(wrapper);
- assert removed;
-
- // If we're shutting down the pool, then just close the connection ...
- if (this.runState != RUNNING) {
- wrapperToClose = wrapper;
- }
- // If there are more connections than the maximum size...
- else if (this.poolSize > this.maximumPoolSize) {
- // Immediately close this connection ...
- wrapperToClose = wrapper;
- }
- // Attempt to make the connection available (this should generally work,
unless there is an upper limit
- // to the number of available connections) ...
- else if (!this.availableConnections.offer(new
ConnectionWrapper(wrapper.getOriginal()))) {
- // The pool of available connection is full, so release it ...
- wrapperToClose = wrapper;
- }
- } finally {
- mainLock.unlock();
- }
- // Close the connection if we're supposed to (do it outside of the main
lock)...
- if (wrapperToClose != null) {
- try {
- closeConnection(wrapper);
- } catch (InterruptedException e) {
- // catch this, as there's not much we can do and the caller
doesn't care or know how to handle it
- this.logger.trace(e, "Interrupted while closing a repository
connection");
- }
- }
- }
-
- /**
- * Validate the supplied connection, returning the connection if valid or null if the
connection is not valid.
- *
- * @param connection the connection to be validated; may not be null
- * @return the validated connection, or null if the connection did not validate and
was removed from the pool
- */
- protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) {
- assert connection != null;
- ConnectionWrapper invalidConnection = null;
- try {
- if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
- invalidConnection = connection;
- }
- } catch (InterruptedException e) {
- // catch this, as there's not much we can do and the caller doesn't
care or know how to handle it
- this.logger.trace(e, "Interrupted while pinging a repository
connection");
- invalidConnection = connection;
- } finally {
- if (invalidConnection != null) {
- connection = null;
- returnConnection(invalidConnection);
- }
- }
- return connection;
- }
-
- /**
- * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does
not check whether creating the new
- * connection would violate the {@link #maximumPoolSize maximum pool size} nor does
it add the new connection to the
- * {@link #availableConnections available connections} (as the caller may want it
immediately), but it does increment the
- * {@link #poolSize pool size}.
- *
- * @return the connection wrapper with a new connection
- * @throws RepositorySourceException if there was an error obtaining the new
connection
- * @throws InterruptedException if the thread was interrupted during the operation
- */
- @GuardedBy( "mainLock" )
- protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException,
InterruptedException {
- RepositoryConnection connection = this.connectionFactory.getConnection();
- ++this.poolSize;
- this.totalConnectionsCreated.incrementAndGet();
- return new ConnectionWrapper(connection);
- }
-
- /**
- * Close a connection that is in the pool but no longer in the {@link
#availableConnections available connections}. This
- * method does decrement the {@link #poolSize pool size}.
- *
- * @param wrapper the wrapper for the connection to be closed
- * @throws InterruptedException if the thread was interrupted during the operation
- */
- protected void closeConnection( ConnectionWrapper wrapper ) throws
InterruptedException {
- assert wrapper != null;
- RepositoryConnection original = wrapper.getOriginal();
- assert original != null;
- try {
- this.logger.debug("Closing repository connection to {0}",
getName());
- original.close();
- } finally {
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- // No matter what reduce the pool size count
- --this.poolSize;
- // And if shutting down and this was the last connection being used...
- if (this.runState == SHUTDOWN && this.poolSize <= 0) {
- // then signal anybody that has called
"awaitTermination(...)"
- this.logger.trace("Signalling termination of repository
connection pool for {0}", getName());
- this.runState = TERMINATED;
- this.termination.signalAll();
- this.logger.trace("Terminated repository connection pool for
{0}", getName());
-
- // fall through to call terminate() outside of lock.
- }
- } finally {
- mainLock.unlock();
- }
- }
- }
-
- @GuardedBy( "mainLock" )
- protected int drainUnusedConnections( int count ) {
- if (count <= 0) return 0;
- this.logger.trace("Draining up to {0} unused repository connections to
{1}", count, getName());
- // Drain the extra connections from those available ...
- Collection<ConnectionWrapper> extraConnections = new
LinkedList<ConnectionWrapper>();
- this.availableConnections.drainTo(extraConnections, count);
- for (ConnectionWrapper connection : extraConnections) {
- try {
- this.logger.trace("Closing repository connection to {0}",
getName());
- connection.getOriginal().close();
- } catch (InterruptedException e) {
- // Ignore this ...
- }
- }
- int numClosed = extraConnections.size();
- this.poolSize -= numClosed;
- this.logger.trace("Drained {0} unused connections", numClosed);
- return numClosed;
- }
-
- @GuardedBy( "mainLock" )
- protected boolean addConnectionIfUnderCorePoolSize() throws
RepositorySourceException, InterruptedException {
- // Add connection ...
- if (this.poolSize < this.corePoolSize) {
- this.availableConnections.offer(newWrappedConnection());
- this.logger.trace("Added connection to {0} in undersized pool",
getName());
- return true;
- }
- return false;
- }
-
- @GuardedBy( "mainLock" )
- protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException,
InterruptedException {
- // Add connections ...
- int n = 0;
- while (this.poolSize < this.corePoolSize) {
- this.availableConnections.offer(newWrappedConnection());
- ++n;
- }
- this.logger.trace("Added {0} connection(s) to {1} in undersized pool",
n, getName());
- return n;
- }
-
- protected class ConnectionWrapper implements RepositoryConnection {
-
- private final RepositoryConnection original;
- private final long timeCreated;
- private long lastUsed;
- private boolean closed = false;
-
- protected ConnectionWrapper( RepositoryConnection connection ) {
- assert connection != null;
- this.original = connection;
- this.timeCreated = System.currentTimeMillis();
- }
-
- /**
- * @return original
- */
- protected RepositoryConnection getOriginal() {
- return this.original;
- }
-
- /**
- * @return lastUsed
- */
- public long getTimeLastUsed() {
- return this.lastUsed;
- }
-
- /**
- * @return timeCreated
- */
- public long getTimeCreated() {
- return this.timeCreated;
- }
-
- /**
- * {@inheritDoc}
- */
- public String getSourceName() {
- return this.original.getSourceName();
- }
-
- /**
- * {@inheritDoc}
- */
- public XAResource getXAResource() {
- if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
- return this.original.getXAResource();
- }
-
- /**
- * {@inheritDoc}
- */
- public CachePolicy getDefaultCachePolicy() {
- if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
- return this.original.getDefaultCachePolicy();
- }
-
- /**
- * {@inheritDoc}
- */
- public void execute( ExecutionEnvironment env,
- GraphCommand... commands ) throws RepositorySourceException,
InterruptedException {
- if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
- this.original.execute(env, commands);
- }
-
- /**
- * {@inheritDoc}
- */
- public boolean ping( long time,
- TimeUnit unit ) throws InterruptedException {
- if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
- return this.original.ping(time, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- public void close() throws InterruptedException {
- if (!closed) {
- this.lastUsed = System.currentTimeMillis();
- this.original.close();
- this.closed = true;
- returnConnection(this);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public void setListener( RepositorySourceListener listener ) {
- if (!closed) this.original.setListener(listener);
- }
-
- }
-
-}
Copied:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/ManagedRepositoryConnectionFactory.java
(from rev 369,
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java)
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/ManagedRepositoryConnectionFactory.java
(rev 0)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/ManagedRepositoryConnectionFactory.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.connection;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Randall Hauch
+ */
+public interface ManagedRepositoryConnectionFactory extends RepositoryConnectionFactory
{
+
+ /**
+ * Initiates an orderly shutdown in which connections that are currently in use are
allowed to be used and closed as normal,
+ * but no new connections will be created. Invocation has no additional effect if
already shut down.
+ * <p>
+ * Once the pool has been shutdown, it may not be used to {@link #getConnection() get
connections}.
+ * </p>
+ *
+ * @throws SecurityException if a security manager exists and shutting down this pool
may manipulate threads that the caller
+ * is not permitted to modify because it does not hold {@link
java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+ * or the security manager's <tt>checkAccess</tt> method
denies access.
+ * @see #shutdownNow()
+ */
+ void shutdown();
+
+ /**
+ * Attempts to close all connections, including those connections currently in use,
and prevent the use of other connections.
+ *
+ * @throws SecurityException if a security manager exists and shutting down this pool
may manipulate threads that the caller
+ * is not permitted to modify because it does not hold {@link
java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+ * or the security manager's <tt>checkAccess</tt> method
denies access.
+ * @see #shutdown()
+ */
+ void shutdownNow();
+
+ /**
+ * Return whether this connection pool is running and is able to {@link
#getConnection() provide connections}. Note that this
+ * method is effectively <code>!isShutdown()</code>.
+ *
+ * @return true if this pool is running, or false otherwise
+ * @see #isShutdown()
+ * @see #isTerminated()
+ * @see #isTerminating()
+ */
+ boolean isRunning();
+
+ /**
+ * Return whether this connection pool is in the process of shutting down or has
already been shut down. A result of
+ * <code>true</code> signals that the pool may no longer be used. Note
that this method is effectively
+ * <code>!isRunning()</code>.
+ *
+ * @return true if this pool has been shut down, or false otherwise
+ * @see #isShutdown()
+ * @see #isTerminated()
+ * @see #isTerminating()
+ */
+ boolean isShutdown();
+
+ /**
+ * Returns true if this pool is in the process of terminating after {@link
#shutdown()} or {@link #shutdownNow()} has been
+ * called but has not completely terminated. This method may be useful for debugging.
A return of <tt>true</tt> reported a
+ * sufficient period after shutdown may indicate that submitted tasks have ignored or
suppressed interruption, causing this
+ * executor not to properly terminate.
+ *
+ * @return true if terminating but not yet terminated, or false otherwise
+ * @see #isTerminated()
+ */
+ boolean isTerminating();
+
+ /**
+ * Return true if this pool has completed its termination and no longer has any open
connections.
+ *
+ * @return true if terminated, or false otherwise
+ * @see #isTerminating()
+ */
+ boolean isTerminated();
+
+ /**
+ * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to
wait until all connections in use at the
+ * time those methods were called have been closed normally. This method accepts a
maximum time duration, after which it will
+ * return even if all connections have not been closed.
+ *
+ * @param timeout the maximum time to wait for all connections to be closed and
returned to the pool
+ * @param unit the time unit for <code>timeout</code>
+ * @return true if the pool was terminated in the supplied time (or was already
terminated), or false if the timeout occurred
+ * before all the connections were closed
+ * @throws InterruptedException if the thread was interrupted
+ */
+ boolean awaitTermination( long timeout,
+ TimeUnit unit ) throws InterruptedException;
+
+}
Deleted:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java 2008-07-24
19:52:13 UTC (rev 369)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -1,111 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.dna.spi.graph.connection;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author Randall Hauch
- */
-public interface RepositoryConnectionPool extends RepositoryConnectionFactory {
-
- /**
- * Initiates an orderly shutdown in which connections that are currently in use are
allowed to be used and closed as normal,
- * but no new connections will be created. Invocation has no additional effect if
already shut down.
- * <p>
- * Once the pool has been shutdown, it may not be used to {@link #getConnection() get
connections}.
- * </p>
- *
- * @throws SecurityException if a security manager exists and shutting down this pool
may manipulate threads that the caller
- * is not permitted to modify because it does not hold {@link
java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
- * or the security manager's <tt>checkAccess</tt> method
denies access.
- * @see #shutdownNow()
- */
- void shutdown();
-
- /**
- * Attempts to close all connections, including those connections currently in use,
and prevent the use of other connections.
- *
- * @throws SecurityException if a security manager exists and shutting down this pool
may manipulate threads that the caller
- * is not permitted to modify because it does not hold {@link
java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
- * or the security manager's <tt>checkAccess</tt> method
denies access.
- * @see #shutdown()
- */
- void shutdownNow();
-
- /**
- * Return whether this connection pool is running and is able to {@link
#getConnection() provide connections}. Note that this
- * method is effectively <code>!isShutdown()</code>.
- *
- * @return true if this pool is running, or false otherwise
- * @see #isShutdown()
- * @see #isTerminated()
- * @see #isTerminating()
- */
- boolean isRunning();
-
- /**
- * Return whether this connection pool is in the process of shutting down or has
already been shut down. A result of
- * <code>true</code> signals that the pool may no longer be used. Note
that this method is effectively
- * <code>!isRunning()</code>.
- *
- * @return true if this pool has been shut down, or false otherwise
- * @see #isShutdown()
- * @see #isTerminated()
- * @see #isTerminating()
- */
- boolean isShutdown();
-
- /**
- * Returns true if this pool is in the process of terminating after {@link
#shutdown()} or {@link #shutdownNow()} has been
- * called but has not completely terminated. This method may be useful for debugging.
A return of <tt>true</tt> reported a
- * sufficient period after shutdown may indicate that submitted tasks have ignored or
suppressed interruption, causing this
- * executor not to properly terminate.
- *
- * @return true if terminating but not yet terminated, or false otherwise
- * @see #isTerminated()
- */
- boolean isTerminating();
-
- /**
- * Return true if this pool has completed its termination and no longer has any open
connections.
- *
- * @return true if terminated, or false otherwise
- * @see #isTerminating()
- */
- boolean isTerminated();
-
- /**
- * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to
wait until all connections in use at the
- * time those methods were called have been closed normally. This method accepts a
maximum time duration, after which it will
- * return even if all connections have not been closed.
- *
- * @param timeout the maximum time to wait for all connections to be closed and
returned to the pool
- * @param unit the time unit for <code>timeout</code>
- * @return true if the pool was terminated in the supplied time (or was already
terminated), or false if the timeout occurred
- * before all the connections were closed
- * @throws InterruptedException if the thread was interrupted
- */
- boolean awaitTermination( long timeout,
- TimeUnit unit ) throws InterruptedException;
-
-}
Copied:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java
(from rev 368,
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPool.java)
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java
(rev 0)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -0,0 +1,967 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.connection;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.transaction.xa.XAResource;
+import net.jcip.annotations.GuardedBy;
+import net.jcip.annotations.ThreadSafe;
+import org.jboss.dna.common.util.ArgCheck;
+import org.jboss.dna.common.util.Logger;
+import org.jboss.dna.spi.SpiI18n;
+import org.jboss.dna.spi.cache.CachePolicy;
+import org.jboss.dna.spi.graph.commands.GraphCommand;
+
+/**
+ * @author Randall Hauch
+ */
+@ThreadSafe
+public class RepositoryConnectionPool implements ManagedRepositoryConnectionFactory {
+
+ /**
+ * The core pool size for default-constructed pools is {@value} .
+ */
+ public static final int DEFAULT_CORE_POOL_SIZE = 1;
+
+ /**
+ * The maximum pool size for default-constructed pools is {@value} .
+ */
+ public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
+
+ /**
+ * The keep-alive time for connections in default-constructed pools is {@value}
seconds.
+ */
+ public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
+
+ /**
+ * Permission for checking shutdown
+ */
+ private static final RuntimePermission shutdownPerm = new
RuntimePermission("modifyThread");
+
+ /**
+ * The factory that this pool uses to create new connections.
+ */
+ private final RepositoryConnectionFactory connectionFactory;
+
+ /**
+ * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
+ */
+ private final ReentrantLock mainLock = new ReentrantLock();
+
+ /**
+ * Wait condition to support awaitTermination
+ */
+ private final Condition termination = mainLock.newCondition();
+
+ /**
+ * Set containing all connections that are available for use.
+ */
+ @GuardedBy( "mainLock" )
+ private final BlockingQueue<ConnectionWrapper> availableConnections = new
LinkedBlockingQueue<ConnectionWrapper>();
+
+ /**
+ * The connections that are currently in use.
+ */
+ @GuardedBy( "mainLock" )
+ private final Set<ConnectionWrapper> inUseConnections = new
HashSet<ConnectionWrapper>();
+
+ /**
+ * Timeout in nanoseconds for idle connections waiting to be used. Threads use this
timeout only when there are more than
+ * corePoolSize present. Otherwise they wait forever to be used.
+ */
+ private volatile long keepAliveTime;
+
+ /**
+ * The target pool size, updated only while holding mainLock, but volatile to allow
concurrent readability even during
+ * updates.
+ */
+ @GuardedBy( "mainLock" )
+ private volatile int corePoolSize;
+
+ /**
+ * Maximum pool size, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
+ */
+ @GuardedBy( "mainLock" )
+ private volatile int maximumPoolSize;
+
+ /**
+ * Current pool size, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
+ */
+ @GuardedBy( "mainLock" )
+ private volatile int poolSize;
+
+ /**
+ * Lifecycle state, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
+ */
+ @GuardedBy( "mainLock" )
+ private volatile int runState;
+
+ // Special values for runState
+ /** Normal, not-shutdown mode */
+ static final int RUNNING = 0;
+ /** Controlled shutdown mode */
+ static final int SHUTDOWN = 1;
+ /** Immediate shutdown mode */
+ static final int STOP = 2;
+ /** Final state */
+ static final int TERMINATED = 3;
+
+ /**
+ * Flag specifying whether a connection should be validated before returning it from
the {@link #getConnection()} method.
+ */
+ private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
+
+ /**
+ * The time in nanoseconds that ping should wait before timing out and failing.
+ */
+ private final AtomicLong pingTimeout = new AtomicLong(0);
+
+ /**
+ * The number of times an attempt to obtain a connection should fail with invalid
connections before throwing an exception.
+ */
+ private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
+
+ private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
+
+ private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
+
+ private final Logger logger = Logger.getLogger(this.getClass());
+
+ /**
+ * Create the pool to use the supplied connection factory, which is typically a
{@link RepositorySource}. This constructor
+ * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link
#DEFAULT_MAXIMUM_POOL_SIZE default maximum pool
+ * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in
seconds)}.
+ *
+ * @param connectionFactory the factory for connections
+ * @throws IllegalArgumentException if the connection factory is null or any of the
supplied arguments are invalid
+ */
+ public RepositoryConnectionPool( RepositoryConnectionFactory connectionFactory ) {
+ this(connectionFactory, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE,
DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS,
+ TimeUnit.SECONDS);
+ }
+
+ /**
+ * Create the pool to use the supplied connection factory, which is typically a
{@link RepositorySource}.
+ *
+ * @param connectionFactory the factory for connections
+ * @param corePoolSize the number of connections to keep in the pool, even if they
are idle.
+ * @param maximumPoolSize the maximum number of connections to allow in the pool.
+ * @param keepAliveTime when the number of connection is greater than the core, this
is the maximum time that excess idle
+ * connections will be kept before terminating.
+ * @param unit the time unit for the keepAliveTime argument.
+ * @throws IllegalArgumentException if the connection factory is null or any of the
supplied arguments are invalid
+ */
+ public RepositoryConnectionPool( RepositoryConnectionFactory connectionFactory,
+ int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit ) {
+ ArgCheck.isNonNegative(corePoolSize, "corePoolSize");
+ ArgCheck.isPositive(maximumPoolSize, "maximumPoolSize");
+ ArgCheck.isNonNegative(keepAliveTime, "keepAliveTime");
+ ArgCheck.isNotNull(connectionFactory, "repository connection
factory");
+ if (maximumPoolSize < corePoolSize) {
+ throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
+ }
+ this.connectionFactory = connectionFactory;
+ this.corePoolSize = corePoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ this.keepAliveTime = unit.toNanos(keepAliveTime);
+ this.setPingTimeout(100, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionFactory#getName()
+ */
+ public String getName() {
+ return this.connectionFactory.getName();
+ }
+
+ // -------------------------------------------------
+ // Property settings ...
+ // -------------------------------------------------
+
+ /**
+ * @return validateConnectionBeforeUse
+ */
+ public boolean getValidateConnectionBeforeUse() {
+ return this.validateConnectionBeforeUse.get();
+ }
+
+ /**
+ * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the
specified value.
+ */
+ public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
+ this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
+ }
+
+ /**
+ * @return pingTimeout
+ */
+ public long getPingTimeoutInNanos() {
+ return this.pingTimeout.get();
+ }
+
+ /**
+ * @param pingTimeout the time to wait for a ping to complete
+ * @param unit the time unit of the time argument
+ */
+ public void setPingTimeout( long pingTimeout,
+ TimeUnit unit ) {
+ ArgCheck.isNonNegative(pingTimeout, "time");
+ this.pingTimeout.set(unit.toNanos(pingTimeout));
+ }
+
+ /**
+ * @return maxFailedAttemptsBeforeError
+ */
+ public int getMaxFailedAttemptsBeforeError() {
+ return this.maxFailedAttemptsBeforeError.get();
+ }
+
+ /**
+ * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the
specified value.
+ */
+ public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
+ this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
+ }
+
+ /**
+ * Sets the time limit for which connections may remain idle before being closed. If
there are more than the core number of
+ * connections currently in the pool, after waiting this amount of time without being
used, excess threads will be terminated.
+ * This overrides any value set in the constructor.
+ *
+ * @param time the time to wait. A time value of zero will cause excess connections
to terminate immediately after being
+ * returned.
+ * @param unit the time unit of the time argument
+ * @throws IllegalArgumentException if time less than zero
+ * @see #getKeepAliveTime
+ */
+ public void setKeepAliveTime( long time,
+ TimeUnit unit ) {
+ ArgCheck.isNonNegative(time, "time");
+ this.keepAliveTime = unit.toNanos(time);
+ }
+
+ /**
+ * Returns the connection keep-alive time, which is the amount of time which
connections in excess of the core pool size may
+ * remain idle before being closed.
+ *
+ * @param unit the desired time unit of the result
+ * @return the time limit
+ * @see #setKeepAliveTime
+ */
+ public long getKeepAliveTime( TimeUnit unit ) {
+ assert unit != null;
+ return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * @return maximumPoolSize
+ */
+ public int getMaximumPoolSize() {
+ return this.maximumPoolSize;
+ }
+
+ /**
+ * Sets the maximum allowed number of connections. This overrides any value set in
the constructor. If the new value is
+ * smaller than the current value, excess existing but unused connections will be
closed.
+ *
+ * @param maximumPoolSize the new maximum
+ * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link
#getCorePoolSize() core pool size}
+ * @see #getMaximumPoolSize
+ */
+ public void setMaximumPoolSize( int maximumPoolSize ) {
+ ArgCheck.isPositive(maximumPoolSize, "maximum pool size");
+ if (maximumPoolSize < corePoolSize) {
+ throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
+ }
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ int extra = this.maximumPoolSize - maximumPoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ if (extra > 0 && poolSize > maximumPoolSize) {
+ // Drain the extra connections from those available ...
+ drainUnusedConnections(extra);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the core number of connections.
+ *
+ * @return the core number of connections
+ * @see #setCorePoolSize(int)
+ */
+ public int getCorePoolSize() {
+ return this.corePoolSize;
+ }
+
+ /**
+ * Sets the core number of connections. This overrides any value set in the
constructor. If the new value is smaller than the
+ * current value, excess existing and unused connections will be closed. If larger,
new connections will, if needed, be
+ * created.
+ *
+ * @param corePoolSize the new core size
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than
zero
+ * @see #getCorePoolSize()
+ */
+ public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException,
InterruptedException {
+ ArgCheck.isNonNegative(corePoolSize, "core pool size");
+ if (maximumPoolSize < corePoolSize) {
+ throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
+ }
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ int extra = this.corePoolSize - corePoolSize;
+ this.corePoolSize = corePoolSize;
+ if (extra < 0) {
+ // Add connections ...
+ addConnectionsIfUnderCorePoolSize();
+ } else if (extra > 0 && poolSize > corePoolSize) {
+ // Drain the extra connections from those available ...
+ drainUnusedConnections(extra);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ // -------------------------------------------------
+ // Statistics ...
+ // -------------------------------------------------
+
+ /**
+ * Returns the current number of connections in the pool, including those that are
checked out (in use) and those that are not
+ * being used.
+ *
+ * @return the number of connections
+ */
+ public int getPoolSize() {
+ return poolSize;
+ }
+
+ /**
+ * Returns the approximate number of connections that are currently checked out from
the pool.
+ *
+ * @return the number of checked-out connections
+ */
+ public int getInUseCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ return this.inUseConnections.size();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Get the total number of connections that have been created by this pool.
+ *
+ * @return the total number of connections created by this pool
+ */
+ public long getTotalConnectionsCreated() {
+ return this.totalConnectionsCreated.get();
+ }
+
+ /**
+ * Get the total number of times connections have been {@link #getConnection()}
used.
+ *
+ * @return the total number
+ */
+ public long getTotalConnectionsUsed() {
+ return this.totalConnectionsUsed.get();
+ }
+
+ // -------------------------------------------------
+ // State management methods ...
+ // -------------------------------------------------
+
+ /**
+ * Starts a core connection, causing it to idly wait for use. This overrides the
default policy of starting core connections
+ * only when they are {@link #getConnection() needed}. This method will return
<tt>false</tt> if all core connections have
+ * already been started.
+ *
+ * @return true if a connection was started
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ */
+ public boolean prestartCoreConnection() throws RepositorySourceException,
InterruptedException {
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ return addConnectionIfUnderCorePoolSize();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Starts all core connections, causing them to idly wait for use. This overrides the
default policy of starting core
+ * connections only when they are {@link #getConnection() needed}.
+ *
+ * @return the number of connections started.
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ */
+ public int prestartAllCoreConnections() throws RepositorySourceException,
InterruptedException {
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ return addConnectionsIfUnderCorePoolSize();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#shutdown()
+ */
+ public void shutdown() {
+ // Fail if caller doesn't have modifyThread permission. We
+ // explicitly check permissions directly because we can't trust
+ // implementations of SecurityManager to correctly override
+ // the "check access" methods such that our documented
+ // security policy is implemented.
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
java.security.AccessController.checkPermission(shutdownPerm);
+
+ this.logger.debug("Shutting down repository connection pool for {0}",
getName());
+ boolean fullyTerminated = false;
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ int state = this.runState;
+ if (state == RUNNING) {
+ // don't override shutdownNow
+ this.runState = SHUTDOWN;
+ }
+
+ // Kill the maintenance thread ...
+
+ // Remove and close all available connections ...
+ if (!this.availableConnections.isEmpty()) {
+ // Drain the extra connections from those available ...
+ drainUnusedConnections(this.availableConnections.size());
+ }
+
+ // If there are no connections being used, trigger full termination now ...
+ if (this.inUseConnections.isEmpty()) {
+ fullyTerminated = true;
+ this.logger.trace("Signalling termination of repository connection
pool for {0}", getName());
+ runState = TERMINATED;
+ termination.signalAll();
+ this.logger.debug("Terminated repository connection pool for
{0}", getName());
+ }
+ // Otherwise the last connection that is closed will transition the runState
to TERMINATED ...
+ } finally {
+ mainLock.unlock();
+ }
+ if (fullyTerminated) terminated();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#shutdownNow()
+ */
+ public void shutdownNow() {
+ // Almost the same code as shutdown()
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
java.security.AccessController.checkPermission(shutdownPerm);
+
+ this.logger.debug("Shutting down (immediately) repository connection pool
for {0}", getName());
+ boolean fullyTerminated = false;
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ int state = this.runState;
+ if (state != TERMINATED) {
+ // don't override shutdownNow
+ this.runState = STOP;
+ }
+
+ // Kill the maintenance thread ...
+
+ // Remove and close all available connections ...
+ if (!this.availableConnections.isEmpty()) {
+ // Drain the extra connections from those available ...
+ drainUnusedConnections(this.availableConnections.size());
+ }
+
+ // If there are connections being used, close them now ...
+ if (!this.inUseConnections.isEmpty()) {
+ for (ConnectionWrapper connectionInUse : this.inUseConnections) {
+ try {
+ this.logger.trace("Closing repository connection to
{0}", getName());
+ connectionInUse.getOriginal().close();
+ } catch (InterruptedException e) {
+ // Ignore this ...
+ }
+ }
+ this.poolSize -= this.inUseConnections.size();
+ // The last connection that is closed will transition the runState to
TERMINATED ...
+ } else {
+ // There are no connections in use, so trigger full termination now ...
+ fullyTerminated = true;
+ this.logger.trace("Signalling termination of repository connection
pool for {0}", getName());
+ runState = TERMINATED;
+ termination.signalAll();
+ this.logger.debug("Terminated repository connection pool for
{0}", getName());
+ }
+
+ } finally {
+ mainLock.unlock();
+ }
+ if (fullyTerminated) terminated();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#isRunning()
+ */
+ public boolean isRunning() {
+ return runState == RUNNING;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#isShutdown()
+ */
+ public boolean isShutdown() {
+ return runState != RUNNING;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#isTerminating()
+ */
+ public boolean isTerminating() {
+ return runState == STOP;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#isTerminated()
+ */
+ public boolean isTerminated() {
+ return runState == TERMINATED;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.ManagedRepositoryConnectionFactory#awaitTermination(long,
java.util.concurrent.TimeUnit)
+ */
+ public boolean awaitTermination( long timeout,
+ TimeUnit unit ) throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ for (;;) {
+ // this.logger.debug("---> Run state = {}; condition = {}",
runState, termination);
+ if (runState == TERMINATED) return true;
+ if (nanos <= 0) return false;
+ nanos = termination.awaitNanos(nanos);
+ // this.logger.debug("---> Done waiting: run state = {};
condition = {}", runState, termination);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Method invoked when the pool has terminated. Default implementation does nothing.
Note: To properly nest multiple
+ * overridings, subclasses should generally invoke
<tt>super.terminated</tt> within this method.
+ */
+ protected void terminated() {
+ }
+
+ /**
+ * Invokes <tt>shutdown</tt> when this pool is no longer referenced.
+ */
+ @Override
+ protected void finalize() {
+ shutdown();
+ }
+
+ // -------------------------------------------------
+ // Connection management methods ...
+ // -------------------------------------------------
+
+ /**
+ * {@inheritDoc}
+ */
+ public RepositoryConnection getConnection() throws RepositorySourceException,
InterruptedException {
+ int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
+ ConnectionWrapper connection = null;
+ // Do this until we get a good connection ...
+ int attemptsRemaining = attemptsAllowed;
+ while (connection == null && attemptsRemaining > 0) {
+ --attemptsRemaining;
+ ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ // If we're shutting down the pool, then just close the connection
...
+ if (this.runState != RUNNING) {
+ throw new
IllegalStateException(SpiI18n.repositoryConnectionPoolIsNotRunning.text());
+ }
+ // If there are fewer total connections than the core size ...
+ if (this.poolSize < this.corePoolSize) {
+ // Immediately create a wrapped connection and return it ...
+ connection = newWrappedConnection();
+ }
+ // Peek to see if there is a connection available ...
+ else if (this.availableConnections.peek() != null) {
+ // There is, so take it and return it ...
+ connection = this.availableConnections.take();
+ }
+ // There is no connection available. If there are fewer total connections
than the maximum size ...
+ else if (this.poolSize < this.maximumPoolSize) {
+ // Immediately create a wrapped connection and return it ...
+ connection = newWrappedConnection();
+ }
+ if (connection != null) {
+ this.inUseConnections.add(connection);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ if (connection == null) {
+ // There are not enough connections, so wait in line for the next
available connection ...
+ this.logger.trace("Waiting for a repository connection from pool
{0}", getName());
+ connection = this.availableConnections.take();
+ mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (connection != null) {
+ this.inUseConnections.add(connection);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ this.logger.trace("Recieved a repository connection from pool
{0}", getName());
+ }
+ if (connection != null && this.validateConnectionBeforeUse.get()) {
+ connection = validateConnection(connection);
+ }
+ }
+ if (connection == null) {
+ // We were unable to obtain a usable connection, so fail ...
+ throw new
RepositorySourceException(SpiI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
+ }
+ this.totalConnectionsUsed.incrementAndGet();
+ return connection;
+ }
+
+ /**
+ * This method is automatically called by the {@link ConnectionWrapper} when it is
{@link ConnectionWrapper#close() closed}.
+ *
+ * @param wrapper the wrapper to the connection that is being returned to the pool
+ */
+ protected void returnConnection( ConnectionWrapper wrapper ) {
+ assert wrapper != null;
+ ConnectionWrapper wrapperToClose = null;
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ // Remove the connection from the in-use set ...
+ boolean removed = this.inUseConnections.remove(wrapper);
+ assert removed;
+
+ // If we're shutting down the pool, then just close the connection ...
+ if (this.runState != RUNNING) {
+ wrapperToClose = wrapper;
+ }
+ // If there are more connections than the maximum size...
+ else if (this.poolSize > this.maximumPoolSize) {
+ // Immediately close this connection ...
+ wrapperToClose = wrapper;
+ }
+ // Attempt to make the connection available (this should generally work,
unless there is an upper limit
+ // to the number of available connections) ...
+ else if (!this.availableConnections.offer(new
ConnectionWrapper(wrapper.getOriginal()))) {
+ // The pool of available connection is full, so release it ...
+ wrapperToClose = wrapper;
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ // Close the connection if we're supposed to (do it outside of the main
lock)...
+ if (wrapperToClose != null) {
+ try {
+ closeConnection(wrapper);
+ } catch (InterruptedException e) {
+ // catch this, as there's not much we can do and the caller
doesn't care or know how to handle it
+ this.logger.trace(e, "Interrupted while closing a repository
connection");
+ }
+ }
+ }
+
+ /**
+ * Validate the supplied connection, returning the connection if valid or null if the
connection is not valid.
+ *
+ * @param connection the connection to be validated; may not be null
+ * @return the validated connection, or null if the connection did not validate and
was removed from the pool
+ */
+ protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) {
+ assert connection != null;
+ ConnectionWrapper invalidConnection = null;
+ try {
+ if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
+ invalidConnection = connection;
+ }
+ } catch (InterruptedException e) {
+ // catch this, as there's not much we can do and the caller doesn't
care or know how to handle it
+ this.logger.trace(e, "Interrupted while pinging a repository
connection");
+ invalidConnection = connection;
+ } finally {
+ if (invalidConnection != null) {
+ connection = null;
+ returnConnection(invalidConnection);
+ }
+ }
+ return connection;
+ }
+
+ /**
+ * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does
not check whether creating the new
+ * connection would violate the {@link #maximumPoolSize maximum pool size} nor does
it add the new connection to the
+ * {@link #availableConnections available connections} (as the caller may want it
immediately), but it does increment the
+ * {@link #poolSize pool size}.
+ *
+ * @return the connection wrapper with a new connection
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ */
+ @GuardedBy( "mainLock" )
+ protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException,
InterruptedException {
+ RepositoryConnection connection = this.connectionFactory.getConnection();
+ ++this.poolSize;
+ this.totalConnectionsCreated.incrementAndGet();
+ return new ConnectionWrapper(connection);
+ }
+
+ /**
+ * Close a connection that is in the pool but no longer in the {@link
#availableConnections available connections}. This
+ * method does decrement the {@link #poolSize pool size}.
+ *
+ * @param wrapper the wrapper for the connection to be closed
+ * @throws InterruptedException if the thread was interrupted during the operation
+ */
+ protected void closeConnection( ConnectionWrapper wrapper ) throws
InterruptedException {
+ assert wrapper != null;
+ RepositoryConnection original = wrapper.getOriginal();
+ assert original != null;
+ try {
+ this.logger.debug("Closing repository connection to {0}",
getName());
+ original.close();
+ } finally {
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ // No matter what reduce the pool size count
+ --this.poolSize;
+ // And if shutting down and this was the last connection being used...
+ if (this.runState == SHUTDOWN && this.poolSize <= 0) {
+ // then signal anybody that has called
"awaitTermination(...)"
+ this.logger.trace("Signalling termination of repository
connection pool for {0}", getName());
+ this.runState = TERMINATED;
+ this.termination.signalAll();
+ this.logger.trace("Terminated repository connection pool for
{0}", getName());
+
+ // fall through to call terminate() outside of lock.
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+ }
+
+ @GuardedBy( "mainLock" )
+ protected int drainUnusedConnections( int count ) {
+ if (count <= 0) return 0;
+ this.logger.trace("Draining up to {0} unused repository connections to
{1}", count, getName());
+ // Drain the extra connections from those available ...
+ Collection<ConnectionWrapper> extraConnections = new
LinkedList<ConnectionWrapper>();
+ this.availableConnections.drainTo(extraConnections, count);
+ for (ConnectionWrapper connection : extraConnections) {
+ try {
+ this.logger.trace("Closing repository connection to {0}",
getName());
+ connection.getOriginal().close();
+ } catch (InterruptedException e) {
+ // Ignore this ...
+ }
+ }
+ int numClosed = extraConnections.size();
+ this.poolSize -= numClosed;
+ this.logger.trace("Drained {0} unused connections", numClosed);
+ return numClosed;
+ }
+
+ @GuardedBy( "mainLock" )
+ protected boolean addConnectionIfUnderCorePoolSize() throws
RepositorySourceException, InterruptedException {
+ // Add connection ...
+ if (this.poolSize < this.corePoolSize) {
+ this.availableConnections.offer(newWrappedConnection());
+ this.logger.trace("Added connection to {0} in undersized pool",
getName());
+ return true;
+ }
+ return false;
+ }
+
+ @GuardedBy( "mainLock" )
+ protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException,
InterruptedException {
+ // Add connections ...
+ int n = 0;
+ while (this.poolSize < this.corePoolSize) {
+ this.availableConnections.offer(newWrappedConnection());
+ ++n;
+ }
+ this.logger.trace("Added {0} connection(s) to {1} in undersized pool",
n, getName());
+ return n;
+ }
+
+ protected class ConnectionWrapper implements RepositoryConnection {
+
+ private final RepositoryConnection original;
+ private final long timeCreated;
+ private long lastUsed;
+ private boolean closed = false;
+
+ protected ConnectionWrapper( RepositoryConnection connection ) {
+ assert connection != null;
+ this.original = connection;
+ this.timeCreated = System.currentTimeMillis();
+ }
+
+ /**
+ * @return original
+ */
+ protected RepositoryConnection getOriginal() {
+ return this.original;
+ }
+
+ /**
+ * @return lastUsed
+ */
+ public long getTimeLastUsed() {
+ return this.lastUsed;
+ }
+
+ /**
+ * @return timeCreated
+ */
+ public long getTimeCreated() {
+ return this.timeCreated;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getSourceName() {
+ return this.original.getSourceName();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public XAResource getXAResource() {
+ if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
+ return this.original.getXAResource();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public CachePolicy getDefaultCachePolicy() {
+ if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
+ return this.original.getDefaultCachePolicy();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void execute( ExecutionEnvironment env,
+ GraphCommand... commands ) throws RepositorySourceException,
InterruptedException {
+ if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
+ this.original.execute(env, commands);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean ping( long time,
+ TimeUnit unit ) throws InterruptedException {
+ if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
+ return this.original.ping(time, unit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void close() throws InterruptedException {
+ if (!closed) {
+ this.lastUsed = System.currentTimeMillis();
+ this.original.close();
+ this.closed = true;
+ returnConnection(this);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setListener( RepositorySourceListener listener ) {
+ if (!closed) this.original.setListener(listener);
+ }
+
+ }
+
+}
Modified:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositorySource.java
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositorySource.java 2008-07-24
19:52:13 UTC (rev 369)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositorySource.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -38,29 +38,28 @@
* repository (e.g., in a configuration area), and needs to be reinstantiated.
* </p>
* <p>
- * Objects that implement this <code>RepositorySource</code> interface are
typically registered with a naming service such as
- * Java Naming and Directory Interface<sup><font
size=-3>TM</font></sup> (JNDI). This interface extends both
- * {@link Referenceable} and {@link Serializable} so that such objects can be stored in
any JNDI naming context and enable proper
- * system recovery,
+ * Objects that implement this <code>RepositorySource</code> interface are
typically registered with a naming service such as Java
+ * Naming and Directory Interface<sup><font
size=-3>TM</font></sup> (JNDI). This interface extends both {@link
Referenceable} and
+ * {@link Serializable} so that such objects can be stored in any JNDI naming context and
enable proper system recovery,
* </p>
*
* @author Randall Hauch
*/
-public interface RepositorySource extends RepositoryConnectionFactory, Referenceable,
Serializable {
+public interface RepositorySource extends ManagedRepositoryConnectionFactory,
Referenceable, Serializable {
/**
- * Get the maximum number of retries that may be performed on a given operation when
using
- * {@link #getConnection() connections} created by this source. This value does not
constitute a minimum number of retries; in
- * fact, the connection user is not required to retry any operations.
+ * Get the maximum number of retries that may be performed on a given operation when
using {@link #getConnection()
+ * connections} created by this source. This value does not constitute a minimum
number of retries; in fact, the connection
+ * user is not required to retry any operations.
*
* @return the maximum number of allowable retries, or 0 if the source has no limit
*/
int getRetryLimit();
/**
- * Set the maximum number of retries that may be performed on a given operation when
using
- * {@link #getConnection() connections} created by this source. This value does not
constitute a minimum number of retries; in
- * fact, the connection user is not required to retry any operations.
+ * Set the maximum number of retries that may be performed on a given operation when
using {@link #getConnection()
+ * connections} created by this source. This value does not constitute a minimum
number of retries; in fact, the connection
+ * user is not required to retry any operations.
*
* @param limit the maximum number of allowable retries, or 0 if the source has no
limit
*/
Deleted:
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPoolTest.java
===================================================================
---
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPoolTest.java 2008-07-24
19:52:13 UTC (rev 369)
+++
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPoolTest.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -1,218 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.dna.spi.graph.connection;
-
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.core.IsNull.notNullValue;
-import static
org.jboss.dna.spi.graph.connection.RepositorySourceLoadHarness.runLoadTest;
-import static org.junit.Assert.assertThat;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * @author Randall Hauch
- */
-public class BasicRepositoryConnectionPoolTest {
-
- private BasicRepositoryConnectionPool pool;
- private TimeDelayingRepositorySource repositorySource;
- private ExecutionEnvironment env;
-
- @Before
- public void beforeEach() {
- this.repositorySource = new TimeDelayingRepositorySource("source 1");
- this.pool = new BasicRepositoryConnectionPool(this.repositorySource, 1, 1, 100,
TimeUnit.SECONDS);
- this.env = null;
- }
-
- @After
- public void afterEach() throws Exception {
- pool.shutdown();
- pool.awaitTermination(2, TimeUnit.SECONDS);
- }
-
- @Test
- public void shouldBeCreatedInRunningState() {
- assertThat(pool.isShutdown(), is(false));
- assertThat(pool.isTerminating(), is(false));
- assertThat(pool.isTerminated(), is(false));
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
- }
-
- @Test
- public void shouldShutdownWhenNoConnectionsWereCreatedOrUsed() throws
InterruptedException {
- assertThat(pool.isShutdown(), is(false));
- assertThat(pool.isTerminating(), is(false));
- assertThat(pool.isTerminated(), is(false));
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
-
- for (int i = 0; i != 4; ++i) {
- pool.shutdown();
- assertThat(pool.isShutdown() || pool.isTerminating() || pool.isTerminated(),
is(true));
- pool.awaitTermination(2, TimeUnit.SECONDS);
- assertThat(pool.isTerminated(), is(true));
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
- }
- }
-
- @Test
- public void shouldCreateConnectionAndRecoverWhenClosed() throws
RepositorySourceException, InterruptedException {
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
-
- RepositoryConnection conn = pool.getConnection();
- assertThat(conn, is(notNullValue()));
- assertThat(pool.getTotalConnectionsCreated(), is(1l));
- assertThat(pool.getTotalConnectionsUsed(), is(1l));
- assertThat(pool.getPoolSize(), is(1));
-
- conn.close();
-
- assertThat(pool.getTotalConnectionsCreated(), is(1l));
- assertThat(pool.getTotalConnectionsUsed(), is(1l));
- assertThat(pool.getPoolSize(), is(1));
- }
-
- @Test
- public void shouldAllowShutdownToBeCalledMultipleTimesEvenWhenShutdown()
- throws RepositorySourceException, InterruptedException {
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
-
- RepositoryConnection conn = pool.getConnection();
- assertThat(conn, is(notNullValue()));
- assertThat(pool.getTotalConnectionsCreated(), is(1l));
- assertThat(pool.getTotalConnectionsUsed(), is(1l));
- assertThat(pool.getPoolSize(), is(1));
-
- conn.close();
-
- assertThat(pool.getTotalConnectionsCreated(), is(1l));
- assertThat(pool.getTotalConnectionsUsed(), is(1l));
- assertThat(pool.getPoolSize(), is(1));
-
- pool.shutdown();
- pool.shutdown();
- pool.awaitTermination(2, TimeUnit.SECONDS);
- assertThat(pool.isTerminated(), is(true));
- assertThat(pool.getTotalConnectionsCreated(), is(1l));
- assertThat(pool.getTotalConnectionsUsed(), is(1l));
- pool.shutdown();
- pool.awaitTermination(2, TimeUnit.SECONDS);
- pool.shutdown();
- pool.awaitTermination(2, TimeUnit.SECONDS);
- }
-
- @Test( expected = IllegalStateException.class )
- public void shouldNotCreateConnectionIfPoolIsNotRunning() throws
RepositorySourceException, InterruptedException {
- pool.shutdown();
- pool.awaitTermination(2, TimeUnit.SECONDS);
- assertThat(pool.isTerminated(), is(true));
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
- pool.getConnection(); // this should fail with illegal state
- }
-
- @Test
- public void shouldAllowConnectionsToBeClosedMoreThanOnceWithNoIllEffects()
- throws RepositorySourceException, InterruptedException {
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
-
- RepositoryConnection conn = pool.getConnection();
- assertThat(conn, is(notNullValue()));
- assertThat(pool.getTotalConnectionsCreated(), is(1l));
- assertThat(pool.getTotalConnectionsUsed(), is(1l));
- assertThat(pool.getPoolSize(), is(1));
-
- conn.close();
- conn.close();
- }
-
- @Test
- public void shouldBlockClientsWhenNotEnoughConnections() throws Exception {
- int numConnectionsInPool = 1;
- int numClients = 2;
- BasicRepositoryConnectionPool pool = new
BasicRepositoryConnectionPool(repositorySource);
- pool.setCorePoolSize(numConnectionsInPool);
- pool.setMaximumPoolSize(numConnectionsInPool);
- RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(10);
- runLoadTest(env, pool, numClients, 100, TimeUnit.MILLISECONDS,
operationFactory);
- pool.shutdown();
- pool.awaitTermination(4, TimeUnit.SECONDS);
- }
-
- @Test
- public void shouldLimitClientsToRunSequentiallyWithOneConnectionInPool() throws
Exception {
- int numConnectionsInPool = 1;
- int numClients = 3;
- BasicRepositoryConnectionPool pool = new
BasicRepositoryConnectionPool(repositorySource);
- pool.setCorePoolSize(numConnectionsInPool);
- pool.setMaximumPoolSize(numConnectionsInPool);
- RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(10);
- runLoadTest(env, pool, numClients, 100, TimeUnit.MILLISECONDS,
operationFactory);
- pool.shutdown();
- pool.awaitTermination(4, TimeUnit.SECONDS);
- }
-
- @Test
- public void shouldClientsToRunConncurrentlyWithTwoConnectionsInPool() throws
Exception {
- int numConnectionsInPool = 2;
- int numClients = 10;
- BasicRepositoryConnectionPool pool = new
BasicRepositoryConnectionPool(repositorySource);
- pool.setCorePoolSize(numConnectionsInPool);
- pool.setMaximumPoolSize(numConnectionsInPool);
- RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(10);
- runLoadTest(env, pool, numClients, 100, TimeUnit.MILLISECONDS,
operationFactory);
- pool.shutdown();
- pool.awaitTermination(4, TimeUnit.SECONDS);
- }
-
- @Ignore( "doesn't run on hudson" )
- @Test
- public void shouldClientsToRunConncurrentlyWithMultipleConnectionInPool() throws
Exception {
- int numConnectionsInPool = 10;
- int numClients = 50;
- BasicRepositoryConnectionPool pool = new
BasicRepositoryConnectionPool(repositorySource);
- pool.setCorePoolSize(numConnectionsInPool);
- pool.setMaximumPoolSize(numConnectionsInPool);
- RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(20);
- List<Future<Integer>> results = runLoadTest(env, pool, numClients,
200, TimeUnit.MILLISECONDS, operationFactory);
- int total = 0;
- for (Future<Integer> result : results) {
- assertThat(result.isDone(), is(true));
- if (result.isDone()) total += result.get();
- }
- assertThat(total, is(20 * numClients));
- pool.shutdown();
- pool.awaitTermination(4, TimeUnit.SECONDS);
- }
-
-}
Copied:
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java
(from rev 368,
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPoolTest.java)
===================================================================
---
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java
(rev 0)
+++
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -0,0 +1,218 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.connection;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static
org.jboss.dna.spi.graph.connection.RepositorySourceLoadHarness.runLoadTest;
+import static org.junit.Assert.assertThat;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author Randall Hauch
+ */
+public class RepositoryConnectionPoolTest {
+
+ private RepositoryConnectionPool pool;
+ private TimeDelayingRepositorySource repositorySource;
+ private ExecutionEnvironment env;
+
+ @Before
+ public void beforeEach() {
+ this.repositorySource = new TimeDelayingRepositorySource("source 1");
+ this.pool = new RepositoryConnectionPool(this.repositorySource, 1, 1, 100,
TimeUnit.SECONDS);
+ this.env = null;
+ }
+
+ @After
+ public void afterEach() throws Exception {
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void shouldBeCreatedInRunningState() {
+ assertThat(pool.isShutdown(), is(false));
+ assertThat(pool.isTerminating(), is(false));
+ assertThat(pool.isTerminated(), is(false));
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+ }
+
+ @Test
+ public void shouldShutdownWhenNoConnectionsWereCreatedOrUsed() throws
InterruptedException {
+ assertThat(pool.isShutdown(), is(false));
+ assertThat(pool.isTerminating(), is(false));
+ assertThat(pool.isTerminated(), is(false));
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+
+ for (int i = 0; i != 4; ++i) {
+ pool.shutdown();
+ assertThat(pool.isShutdown() || pool.isTerminating() || pool.isTerminated(),
is(true));
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ assertThat(pool.isTerminated(), is(true));
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+ }
+ }
+
+ @Test
+ public void shouldCreateConnectionAndRecoverWhenClosed() throws
RepositorySourceException, InterruptedException {
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+
+ RepositoryConnection conn = pool.getConnection();
+ assertThat(conn, is(notNullValue()));
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+
+ conn.close();
+
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+ }
+
+ @Test
+ public void shouldAllowShutdownToBeCalledMultipleTimesEvenWhenShutdown()
+ throws RepositorySourceException, InterruptedException {
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+
+ RepositoryConnection conn = pool.getConnection();
+ assertThat(conn, is(notNullValue()));
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+
+ conn.close();
+
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+
+ pool.shutdown();
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ assertThat(pool.isTerminated(), is(true));
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ }
+
+ @Test( expected = IllegalStateException.class )
+ public void shouldNotCreateConnectionIfPoolIsNotRunning() throws
RepositorySourceException, InterruptedException {
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ assertThat(pool.isTerminated(), is(true));
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+ pool.getConnection(); // this should fail with illegal state
+ }
+
+ @Test
+ public void shouldAllowConnectionsToBeClosedMoreThanOnceWithNoIllEffects()
+ throws RepositorySourceException, InterruptedException {
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+
+ RepositoryConnection conn = pool.getConnection();
+ assertThat(conn, is(notNullValue()));
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+
+ conn.close();
+ conn.close();
+ }
+
+ @Test
+ public void shouldBlockClientsWhenNotEnoughConnections() throws Exception {
+ int numConnectionsInPool = 1;
+ int numClients = 2;
+ RepositoryConnectionPool pool = new RepositoryConnectionPool(repositorySource);
+ pool.setCorePoolSize(numConnectionsInPool);
+ pool.setMaximumPoolSize(numConnectionsInPool);
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(10);
+ runLoadTest(env, pool, numClients, 100, TimeUnit.MILLISECONDS,
operationFactory);
+ pool.shutdown();
+ pool.awaitTermination(4, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void shouldLimitClientsToRunSequentiallyWithOneConnectionInPool() throws
Exception {
+ int numConnectionsInPool = 1;
+ int numClients = 3;
+ RepositoryConnectionPool pool = new RepositoryConnectionPool(repositorySource);
+ pool.setCorePoolSize(numConnectionsInPool);
+ pool.setMaximumPoolSize(numConnectionsInPool);
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(10);
+ runLoadTest(env, pool, numClients, 100, TimeUnit.MILLISECONDS,
operationFactory);
+ pool.shutdown();
+ pool.awaitTermination(4, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void shouldClientsToRunConncurrentlyWithTwoConnectionsInPool() throws
Exception {
+ int numConnectionsInPool = 2;
+ int numClients = 10;
+ RepositoryConnectionPool pool = new RepositoryConnectionPool(repositorySource);
+ pool.setCorePoolSize(numConnectionsInPool);
+ pool.setMaximumPoolSize(numConnectionsInPool);
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(10);
+ runLoadTest(env, pool, numClients, 100, TimeUnit.MILLISECONDS,
operationFactory);
+ pool.shutdown();
+ pool.awaitTermination(4, TimeUnit.SECONDS);
+ }
+
+ @Ignore( "doesn't run on hudson" )
+ @Test
+ public void shouldClientsToRunConncurrentlyWithMultipleConnectionInPool() throws
Exception {
+ int numConnectionsInPool = 10;
+ int numClients = 50;
+ RepositoryConnectionPool pool = new RepositoryConnectionPool(repositorySource);
+ pool.setCorePoolSize(numConnectionsInPool);
+ pool.setMaximumPoolSize(numConnectionsInPool);
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(20);
+ List<Future<Integer>> results = runLoadTest(env, pool, numClients,
200, TimeUnit.MILLISECONDS, operationFactory);
+ int total = 0;
+ for (Future<Integer> result : results) {
+ assertThat(result.isDone(), is(true));
+ if (result.isDone()) total += result.get();
+ }
+ assertThat(total, is(20 * numClients));
+ pool.shutdown();
+ pool.awaitTermination(4, TimeUnit.SECONDS);
+ }
+
+}
Property changes on:
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/SimpleRepositorySource.java
===================================================================
---
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/SimpleRepositorySource.java 2008-07-24
19:52:13 UTC (rev 369)
+++
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/SimpleRepositorySource.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import javax.naming.Reference;
import javax.transaction.xa.XAResource;
+import net.jcip.annotations.ThreadSafe;
import org.jboss.dna.spi.cache.CachePolicy;
import org.jboss.dna.spi.graph.InvalidPathException;
import org.jboss.dna.spi.graph.Name;
@@ -37,48 +38,27 @@
import org.jboss.dna.spi.graph.commands.GetChildrenCommand;
import org.jboss.dna.spi.graph.commands.GetPropertiesCommand;
import org.jboss.dna.spi.graph.commands.GraphCommand;
-import org.jboss.dna.spi.graph.connection.ExecutionEnvironment;
-import org.jboss.dna.spi.graph.connection.RepositoryConnection;
-import org.jboss.dna.spi.graph.connection.RepositorySource;
-import org.jboss.dna.spi.graph.connection.RepositorySourceException;
-import org.jboss.dna.spi.graph.connection.RepositorySourceListener;
/**
* A {@link RepositorySource} for a {@link SimpleRepository simple repository}.
*
* @author Randall Hauch
*/
-public class SimpleRepositorySource implements RepositorySource {
+@ThreadSafe
+public class SimpleRepositorySource extends AbstractRepositorySource {
private static final long serialVersionUID = 1L;
- public static final int DEFAULT_RETRY_LIMIT = 5;
-
private String repositoryName;
private String name;
- private int retryLimit = DEFAULT_RETRY_LIMIT;
- /**
- * {@inheritDoc}
- *
- * @see org.jboss.dna.spi.graph.connection.RepositorySource#getRetryLimit()
- */
- public int getRetryLimit() {
- return retryLimit;
+ public SimpleRepositorySource() {
+ super();
}
/**
* {@inheritDoc}
*
- * @see org.jboss.dna.spi.graph.connection.RepositorySource#setRetryLimit(int)
- */
- public void setRetryLimit( int limit ) {
- retryLimit = limit;
- }
-
- /**
- * {@inheritDoc}
- *
* @see org.jboss.dna.spi.graph.connection.RepositoryConnectionFactory#getName()
*/
public String getName() {
@@ -118,20 +98,6 @@
/**
* {@inheritDoc}
*
- * @see
org.jboss.dna.spi.graph.connection.RepositoryConnectionFactory#getConnection()
- */
- public RepositoryConnection getConnection() throws RepositorySourceException {
- String reposName = this.repositoryName;
- SimpleRepository repository = SimpleRepository.get(reposName);
- if (repository == null) {
- throw new RepositorySourceException(this.getName(), "Unable to find
repository \"" + reposName + "\"");
- }
- return new Connection(repository);
- }
-
- /**
- * {@inheritDoc}
- *
* @see java.lang.Object#hashCode()
*/
@Override
@@ -156,6 +122,21 @@
return false;
}
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.AbstractRepositorySource#createConnection()
+ */
+ @Override
+ protected synchronized RepositoryConnection createConnection() throws
RepositorySourceException {
+ String reposName = this.getRepositoryName();
+ SimpleRepository repository = SimpleRepository.get(reposName);
+ if (repository == null) {
+ throw new RepositorySourceException(this.getName(), "Unable to find
repository \"" + reposName + "\"");
+ }
+ return new Connection(repository);
+ }
+
protected class Connection implements RepositoryConnection {
private RepositorySourceListener listener;
Modified:
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/TimeDelayingRepositorySource.java
===================================================================
---
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/TimeDelayingRepositorySource.java 2008-07-24
19:52:13 UTC (rev 369)
+++
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/TimeDelayingRepositorySource.java 2008-07-25
19:37:40 UTC (rev 370)
@@ -36,16 +36,17 @@
import org.jboss.dna.spi.graph.commands.GraphCommand;
/**
+ * A simple {@link RepositorySource} that simulates an imaginary source with a built-in
delay mechanism.
+ *
* @author Randall Hauch
*/
@ThreadSafe
-public class TimeDelayingRepositorySource implements RepositorySource {
+public class TimeDelayingRepositorySource extends AbstractRepositorySource {
/**
*/
private static final long serialVersionUID = -2756725117087437347L;
private String name;
- private final AtomicInteger retryLimit = new AtomicInteger(0);
private final AtomicInteger connectionsOpenedCount = new AtomicInteger(0);
private final AtomicInteger connectionsClosedCount = new AtomicInteger(0);
private final Set<Connection> openConnections = new
CopyOnWriteArraySet<Connection>();
@@ -56,6 +57,7 @@
private CachePolicy defaultCachePolicy;
public TimeDelayingRepositorySource( String identifier ) {
+ super();
this.name = identifier;
}
@@ -73,20 +75,6 @@
this.name = name;
}
- /**
- * {@inheritDoc}
- */
- public int getRetryLimit() {
- return this.retryLimit.get();
- }
-
- /**
- * {@inheritDoc}
- */
- public void setRetryLimit( int limit ) {
- this.retryLimit.set(limit);
- }
-
public CachePolicy getDefaultCachePolicy() {
return defaultCachePolicy;
}
@@ -135,8 +123,11 @@
/**
* {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.AbstractRepositorySource#createConnection()
*/
- public RepositoryConnection getConnection() throws RepositorySourceException {
+ @Override
+ protected RepositoryConnection createConnection() throws RepositorySourceException {
int connectionNumber = this.connectionsOpenedCount.incrementAndGet();
String connectionName = "Connection " + connectionNumber;
XAResource xaResource = newXaResource(connectionName);