Author: rhauch
Date: 2008-06-30 17:42:54 -0400 (Mon, 30 Jun 2008)
New Revision: 320
Added:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPool.java
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPoolTest.java
Removed:
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederatedSource.java
trunk/dna-repository/src/test/java/org/jboss/dna/repository/federation/FederatedSourceTest.java
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java
Modified:
trunk/
trunk/dna-repository/src/main/java/org/jboss/dna/repository/RepositoryI18n.java
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederatedRepository.java
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederatedRepositoryConnection.java
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederationService.java
trunk/dna-repository/src/main/java/org/jboss/dna/repository/util/SimpleExecutionContext.java
trunk/dna-repository/src/main/resources/org/jboss/dna/repository/RepositoryI18n.properties
trunk/dna-repository/src/test/java/org/jboss/dna/repository/federation/FederatedRepositoryTest.java
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/PathFactory.java
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/PropertyType.java
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/impl/BasicGetPropertiesCommand.java
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperation.java
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperations.java
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/impl/PathValueFactory.java
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/impl/StandardValueFactories.java
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositorySourceLoadHarness.java
Log:
DNA-115: Create federation service - Completed another round of refactoring to simplify
configuration that also allows distribution/clustering
Property changes on: trunk
___________________________________________________________________
Name: svn:ignore
- target
+ .metadata
target
Modified: trunk/dna-repository/src/main/java/org/jboss/dna/repository/RepositoryI18n.java
===================================================================
---
trunk/dna-repository/src/main/java/org/jboss/dna/repository/RepositoryI18n.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-repository/src/main/java/org/jboss/dna/repository/RepositoryI18n.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -31,6 +31,7 @@
*/
public final class RepositoryI18n {
+ // Services and Repository
public static I18n invalidStateString;
public static I18n serviceShutdowAndMayNotBeStarted;
public static I18n serviceShutdowAndMayNotBePaused;
@@ -45,6 +46,7 @@
public static I18n errorFindingPropertyNameInPropertyChangedEvent;
public static I18n errorFindingPropertyNameInPropertyRemovedEvent;
+ // Rules
public static I18n unableToObtainJsr94RuleAdministrator;
public static I18n errorUsingJsr94RuleAdministrator;
public static I18n unableToObtainJsr94ServiceProvider;
@@ -66,6 +68,7 @@
public static I18n observationServiceName;
public static I18n ruleServiceName;
+ // Sequencing
public static I18n sequencingServiceName;
public static I18n unableToChangeExecutionContextWhileRunning;
public static I18n unableToStartSequencingServiceWithoutExecutionContext;
@@ -80,6 +83,7 @@
public static I18n sequencingPropertyOnNode;
public static I18n writingOutputSequencedFromPropertyOnNodes;
+ // Properties
public static I18n errorReadingPropertiesFromContainerNode;
public static I18n requiredPropertyOnNodeWasExpectedToBeStringValue;
public static I18n optionalPropertyOnNodeWasExpectedToBeStringValue;
@@ -95,14 +99,20 @@
public static I18n errorGettingNodeRelativeToNode;
public static I18n unknownPropertyValueType;
+ // Path expressions
public static I18n pathExpressionIsInvalid;
public static I18n pathExpressionMayNotBeBlank;
public static I18n pathExpressionHasInvalidSelect;
public static I18n pathExpressionHasInvalidMatch;
+ // Observation
public static I18n
errorUnregisteringWorkspaceListenerWhileShuttingDownObservationService;
+ // General
public static I18n invalidRepositoryNodePath;
+ public static I18n unableToLoadClassUsingClasspath;
+ public static I18n unableToInstantiateClassUsingClasspath;
+ public static I18n unableToAccessClassUsingClasspath;
// XML Sequencer
public static I18n errorSequencingXmlDocument;
@@ -114,9 +124,11 @@
public static I18n interruptedWhileConnectingToFederationConfigurationRepository;
public static I18n interruptedWhileUsingFederationConfigurationRepository;
public static I18n
interruptedWhileClosingConnectionToFederationConfigurationRepository;
+ public static I18n unableToFindRepositorySourceByName;
public static I18n unableToCreateConnectionToFederatedRepository;
public static I18n unableToAuthenticateConnectionToFederatedRepository;
public static I18n repositoryHasBeenShutDown;
+ public static I18n repositoryPathInFederationBindingIsNotAbsolute;
static {
try {
Modified:
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederatedRepository.java
===================================================================
---
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederatedRepository.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederatedRepository.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -25,21 +25,20 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import net.jcip.annotations.ThreadSafe;
import org.jboss.dna.common.util.ArgCheck;
import org.jboss.dna.repository.RepositoryI18n;
import org.jboss.dna.repository.services.AbstractServiceAdministrator;
import org.jboss.dna.repository.services.ServiceAdministrator;
import org.jboss.dna.spi.cache.CachePolicy;
+import org.jboss.dna.spi.graph.connection.ExecutionEnvironment;
import org.jboss.dna.spi.graph.connection.RepositoryConnection;
-import org.jboss.dna.spi.graph.connection.RepositoryConnectionPool;
+import org.jboss.dna.spi.graph.connection.RepositorySource;
import org.jboss.dna.spi.graph.connection.RepositorySourceListener;
/**
* The component in the {@link FederationService} that represents a single federated
repository. The federated repository manages
- * a set of {@link FederatedSource federated sources}, and provides the logic of
interacting with those sources and presenting a
+ * a set of {@link RepositorySource federated sources}, and provides the logic of
interacting with those sources and presenting a
* single unified graph.
*
* @author Randall Hauch
@@ -96,35 +95,32 @@
private final ServiceAdministrator administrator = new Administrator();
private final String name;
- private final FederationService service;
- private final Lock sourcesWriteLock = new ReentrantLock();
- private final List<FederatedSource> sources = new
CopyOnWriteArrayList<FederatedSource>();
+ private final ExecutionEnvironment env;
+ private final RepositoryConnectionFactories connectionFactories;
+ private final CopyOnWriteArrayList<FederatedRegion> regions = new
CopyOnWriteArrayList<FederatedRegion>();
private final CopyOnWriteArrayList<RepositorySourceListener> listeners = new
CopyOnWriteArrayList<RepositorySourceListener>();
private CachePolicy defaultCachePolicy;
/**
* Create a federated repository instance, as managed by the supplied {@link
FederationService}.
*
- * @param service the federation service that is managing this instance
- * @param name the name of the repository
- * @throws IllegalArgumentException if the service is null or the name is null or
blank
+ * @param repositoryName the name of the repository
+ * @param env the execution environment
+ * @param connectionFactories the set of connection factories that should be used
+ * @throws IllegalArgumentException if any of the parameters are null, or if the name
is blank
*/
- public FederatedRepository( FederationService service,
- String name ) {
- ArgCheck.isNotNull(service, "service");
- ArgCheck.isNotEmpty(name, "name");
- this.name = name;
- this.service = service;
+ public FederatedRepository( String repositoryName,
+ ExecutionEnvironment env,
+ RepositoryConnectionFactories connectionFactories ) {
+ ArgCheck.isNotNull(connectionFactories, "connectionFactories");
+ ArgCheck.isNotNull(env, "env");
+ ArgCheck.isNotEmpty(repositoryName, "repositoryName");
+ this.name = repositoryName;
+ this.env = env;
+ this.connectionFactories = connectionFactories;
}
/**
- * @return service
- */
- protected FederationService getService() {
- return this.service;
- }
-
- /**
* Get the name of this repository
*
* @return name
@@ -141,9 +137,25 @@
}
/**
+ * @return the execution environment
+ */
+ public ExecutionEnvironment getExecutionEnvironment() {
+ return env;
+ }
+
+ /**
+ * @return connectionFactories
+ */
+ protected RepositoryConnectionFactories getConnectionFactories() {
+ return connectionFactories;
+ }
+
+ /**
* Utility method called by the administrator.
*/
protected void startRepository() {
+ // Look for the sources in the repository, creating any that are missing
+ // Look for the
// Do not establish connections to the sources; these will be established as
needed
}
@@ -152,17 +164,7 @@
* Utility method called by the administrator.
*/
protected void shutdownRepository() {
- // Close all connections to the sources. This is done inside the sources write
lock.
- try {
- this.sourcesWriteLock.lock();
- for (FederatedSource source : this.sources) {
- source.getConnectionPool().shutdown();
- }
- } finally {
- this.sourcesWriteLock.unlock();
- }
- // Connections to this repository check before doing anything with this, so just
remove it from the service ...
- this.service.removeRepository(this);
+ // Connections to this repository check before doing anything with this, so no
need to do anything to them ...
}
/**
@@ -176,18 +178,7 @@
*/
protected boolean awaitTermination( long timeout,
TimeUnit unit ) throws InterruptedException {
- // Check whether all source pools are shut down. This is done inside the sources
write lock.
- try {
- this.sourcesWriteLock.lock();
- for (FederatedSource source : this.sources) {
- if (!source.getConnectionPool().awaitTermination(timeout, unit)) {
- return false;
- }
- }
- return true;
- } finally {
- this.sourcesWriteLock.unlock();
- }
+ return true;
}
/**
@@ -201,17 +192,7 @@
* @see #isTerminated()
*/
public boolean isTerminating() {
- try {
- this.sourcesWriteLock.lock();
- for (FederatedSource source : this.sources) {
- if (source.getConnectionPool().isTerminating()) {
- return true;
- }
- }
- return false;
- } finally {
- this.sourcesWriteLock.unlock();
- }
+ return false;
}
/**
@@ -221,143 +202,36 @@
* @see #isTerminating()
*/
public boolean isTerminated() {
- try {
- this.sourcesWriteLock.lock();
- for (FederatedSource source : this.sources) {
- if (!source.getConnectionPool().isTerminated()) {
- return false;
- }
- }
- return true;
- } finally {
- this.sourcesWriteLock.unlock();
- }
+ return false;
}
/**
- * Get an unmodifiable collection of {@link FederatedSource federated sources}.
- * <p>
- * This method can safely be called while the federation repository is in use.
- * </p>
+ * Return the unmodifiable list of bindings.
*
- * @return the sources
+ * @return the bindings
*/
- public List<FederatedSource> getSources() {
- return Collections.unmodifiableList(this.sources);
+ public List<FederatedRegion> getRegions() {
+ return Collections.unmodifiableList(regions);
}
/**
- * Add the supplied federated source. This method returns false if the source is
null.
- * <p>
- * This method can safely be called while the federation repository is in use.
- * </p>
+ * Add the supplied federation region to this repository, if it is not already in the
repository. . This method does not
+ * attempt to check whether this region would result in a duplicate region.
*
- * @param source the source to add
- * @return true if the source is added, or false if the reference is null or if there
is already an existing source with the
- * supplied name.
+ * @param region the region to be added
+ * @return true if the region was added, or false if there was already a duplicate
region
+ * @throws IllegalArgumentException if the binding reference is null
*/
- public boolean addSource( FederatedSource source ) {
- if (source == null) return false;
- try {
- this.sourcesWriteLock.lock();
- for (FederatedSource existingSource : this.sources) {
- if (existingSource.getName().equals(source.getName())) return false;
- }
- this.sources.add(source);
- } finally {
- this.sourcesWriteLock.unlock();
- }
- return true;
+ protected boolean addRegionIfAbsent( FederatedRegion region ) {
+ ArgCheck.isNotNull(region, "region");
+ return this.regions.addIfAbsent(region);
}
- /**
- * Add the supplied federated source. This method returns false if the source is
null.
- * <p>
- * This method can safely be called while the federation repository is in use.
- * </p>
- *
- * @param source the source to add
- * @param index the index at which the source should be added
- * @return true if the source is added, or false if the reference is null or if there
is already an existing source with the
- * supplied name.
- * @throws IndexOutOfBoundsException if the index is out of bounds
- */
- public boolean addSource( FederatedSource source,
- int index ) {
- if (source == null) return false;
- try {
- this.sourcesWriteLock.lock();
- for (FederatedSource existingSource : this.sources) {
- if (existingSource.getName().equals(source.getName())) return false;
- }
- this.sources.add(index, source);
- } finally {
- this.sourcesWriteLock.unlock();
- }
- return true;
+ protected boolean removeBinding( FederatedRegion region ) {
+ return this.regions.remove(region);
}
/**
- * Remove from this federated repository the supplied source (or a source with the
same name as that supplied). This call
- * shuts down the connections in the source in an orderly fashion, allowing those
connection currently in use to be used and
- * closed normally, but preventing further connections from being used.
- * <p>
- * This method can safely be called while the federation repository is in use.
- * </p>
- *
- * @param source the source to be removed
- * @param timeToAwait the amount of time to wait while all of the source's
connections are closed, or non-positive if the call
- * should not wait at all
- * @param unit the time unit to be used for <code>timeToAwait</code>
- * @return true if the source was removed, or false if the source was not a source
for this repository.
- * @throws InterruptedException if the thread is interrupted while awaiting closing
of the connections
- */
- public boolean removeSource( FederatedSource source,
- long timeToAwait,
- TimeUnit unit ) throws InterruptedException {
- // Use the name; don't use the object equality ...
- return removeSource(source.getName(), timeToAwait, unit) != null;
- }
-
- /**
- * Remove from this federated repository the source with the supplied name. This call
shuts down the connections in the source
- * in an orderly fashion, allowing those connection currently in use to be used and
closed normally, but preventing further
- * connections from being used.
- * <p>
- * This method can safely be called while the federation repository is in use.
- * </p>
- *
- * @param name the name of the source to be removed
- * @param timeToAwait the amount of time to wait while all of the source's
connections are closed, or non-positive if the call
- * should not wait at all
- * @param unit the time unit to be used for <code>timeToAwait</code>
- * @return the source with the supplied name that was removed, or null if no existing
source matching the supplied name could
- * be found
- * @throws InterruptedException if the thread is interrupted while awaiting closing
of the connections
- */
- public FederatedSource removeSource( String name,
- long timeToAwait,
- TimeUnit unit ) throws InterruptedException {
- try {
- this.sourcesWriteLock.lock();
- for (FederatedSource existingSource : this.sources) {
- if (existingSource.getName().equals(name)) {
- boolean removed = this.sources.remove(existingSource);
- assert removed;
- // Shut down the connection pool for the source ...
- RepositoryConnectionPool pool = existingSource.getConnectionPool();
- pool.shutdown();
- if (timeToAwait > 0l) pool.awaitTermination(timeToAwait, unit);
- return existingSource;
- }
- }
- } finally {
- this.sourcesWriteLock.unlock();
- }
- return null;
- }
-
- /**
* Add a listener that is to receive notifications to changes to content within this
repository. This method does nothing if
* the supplied listener is null.
*
Modified:
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederatedRepositoryConnection.java
===================================================================
---
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederatedRepositoryConnection.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederatedRepositoryConnection.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -40,7 +40,8 @@
protected static final RepositorySourceListener NO_OP_LISTENER = new
RepositorySourceListener() {
- public void notify( String sourceName, Object... events ) {
+ public void notify( String sourceName,
+ Object... events ) {
// do nothing
}
};
@@ -49,7 +50,8 @@
private final FederatedRepositorySource source;
private RepositorySourceListener listener = NO_OP_LISTENER;
- protected FederatedRepositoryConnection( FederatedRepository repository,
FederatedRepositorySource source ) {
+ protected FederatedRepositoryConnection( FederatedRepository repository,
+ FederatedRepositorySource source ) {
assert source != null;
assert repository != null;
this.source = source;
@@ -106,14 +108,16 @@
/**
* {@inheritDoc}
*/
- public boolean ping( long time, TimeUnit unit ) {
+ public boolean ping( long time,
+ TimeUnit unit ) {
return this.repository.getAdministrator().isStarted();
}
/**
* {@inheritDoc}
*/
- public void execute( ExecutionEnvironment env, GraphCommand... commands ) throws
RepositorySourceException {
+ public void execute( ExecutionEnvironment env,
+ GraphCommand... commands ) throws RepositorySourceException {
if (!this.repository.getAdministrator().isStarted()) {
throw new
RepositorySourceException(RepositoryI18n.repositoryHasBeenShutDown.text(this.repository.getName()));
}
Deleted:
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederatedSource.java
===================================================================
---
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederatedSource.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederatedSource.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -1,146 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.dna.repository.federation;
-
-import java.util.concurrent.TimeUnit;
-import net.jcip.annotations.ThreadSafe;
-import org.jboss.dna.common.util.ArgCheck;
-import org.jboss.dna.spi.graph.connection.RepositoryConnection;
-import org.jboss.dna.spi.graph.connection.RepositoryConnectionFactory;
-import org.jboss.dna.spi.graph.connection.RepositoryConnectionPool;
-import org.jboss.dna.spi.graph.connection.RepositorySource;
-import org.jboss.dna.spi.graph.connection.RepositorySourceException;
-
-/**
- * A component that represents a {@link RepositorySource repository source} (with its
state) being federated in a
- * {@link FederatedRepository}.
- *
- * @author Randall Hauch
- */
-@ThreadSafe
-public class FederatedSource implements RepositoryConnectionFactory {
-
- private final RepositorySource source;
- private final RepositoryConnectionPool connectionPool;
-
- public FederatedSource( RepositorySource source ) {
- ArgCheck.isNotNull(source, "source");
- this.source = source;
- this.connectionPool = new RepositoryConnectionPool(source);
- }
-
- protected FederatedSource( RepositorySource source, RepositoryConnectionPool
connectionPool ) {
- ArgCheck.isNotNull(source, "source");
- this.source = source;
- this.connectionPool = connectionPool;
- }
-
- /**
- * Get the name for this federated repository source.
- *
- * @return the name; never null or empty
- */
- public String getName() {
- return this.source.getName();
- }
-
- /**
- * Get the RepositorySource repository source information for this federated source.
- *
- * @return the repository source; never null
- */
- public RepositorySource getRepositorySource() {
- return this.source;
- }
-
- /**
- * Get the connection pool used by this federated source.
- *
- * @return the connection pool; never null
- */
- protected RepositoryConnectionPool getConnectionPool() {
- return this.connectionPool;
- }
-
- /**
- * {@inheritDoc}
- */
- public RepositoryConnection getConnection() throws RepositorySourceException,
InterruptedException {
- return this.connectionPool.getConnection();
- }
-
- /**
- * Determine whether the federated source is available by attempting to connect to
the source and
- * {@link RepositoryConnection#ping(long, TimeUnit) pinging} the source.
- *
- * @param timeToWait the time the caller is willing to wait to establish a
connection
- * @param unit the time unit for the <code>timeToWait</code> parameter;
may not be null
- * @return true if the federated source is available, or false if the source is not
available in the allotted time period
- */
- public boolean isAvailable( long timeToWait, TimeUnit unit ) {
- RepositoryConnection connection = null;
- try {
- connection = this.connectionPool.getConnection();
- return connection.ping(timeToWait, unit);
- } catch (IllegalStateException e) {
- // The connection pool is not running, so return false ..
- return false;
- } catch (InterruptedException e) {
- // Consider an attempt to get a connection and being interrupted as NOT being
connected
- return false;
- } catch (Throwable e) {
- // Consider any other failure, including RepositorySourceException, as
meaning not available
- return false;
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (InterruptedException e) {
- // Consider an attempt to get a connection and being interrupted as
NOT being connected
- return false;
- }
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int hashCode() {
- return source.getName().hashCode();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean equals( Object obj ) {
- if (obj instanceof FederatedSource) {
- FederatedSource that = (FederatedSource)obj;
- if (!this.source.getName().equals(that.source.getName())) return false;
- return true;
- }
- return false;
- }
-
-}
Modified:
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederationService.java
===================================================================
---
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederationService.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-repository/src/main/java/org/jboss/dna/repository/federation/FederationService.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -21,22 +21,40 @@
*/
package org.jboss.dna.repository.federation;
+import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.ThreadSafe;
+import org.jboss.dna.common.collection.Problems;
+import org.jboss.dna.common.collection.SimpleProblems;
import org.jboss.dna.common.component.ClassLoaderFactory;
import org.jboss.dna.common.component.StandardClassLoaderFactory;
import org.jboss.dna.common.i18n.I18n;
import org.jboss.dna.common.util.ArgCheck;
+import org.jboss.dna.common.util.Reflection;
import org.jboss.dna.repository.RepositoryI18n;
import org.jboss.dna.repository.services.AbstractServiceAdministrator;
import org.jboss.dna.repository.services.AdministeredService;
import org.jboss.dna.repository.services.ServiceAdministrator;
-import org.jboss.dna.repository.util.ExecutionContext;
+import org.jboss.dna.spi.graph.Name;
+import org.jboss.dna.spi.graph.NameFactory;
+import org.jboss.dna.spi.graph.Path;
+import org.jboss.dna.spi.graph.PathFactory;
+import org.jboss.dna.spi.graph.Property;
+import org.jboss.dna.spi.graph.ValueFactories;
+import org.jboss.dna.spi.graph.ValueFactory;
+import org.jboss.dna.spi.graph.commands.GraphCommand;
+import org.jboss.dna.spi.graph.commands.impl.BasicCompositeCommand;
+import org.jboss.dna.spi.graph.commands.impl.BasicGetChildrenCommand;
+import org.jboss.dna.spi.graph.commands.impl.BasicGetNodeCommand;
+import org.jboss.dna.spi.graph.connection.ExecutionEnvironment;
import org.jboss.dna.spi.graph.connection.RepositoryConnection;
+import org.jboss.dna.spi.graph.connection.RepositoryConnectionFactory;
import org.jboss.dna.spi.graph.connection.RepositorySource;
/**
@@ -45,6 +63,9 @@
@ThreadSafe
public class FederationService implements AdministeredService {
+ protected static final String CLASSNAME_PROPERTY_NAME = "dna:classname";
+ protected static final String CLASSPATH_PROPERTY_NAME = "dna:classpath";
+
/**
* The administrative component for this service.
*
@@ -92,8 +113,9 @@
}
private final ClassLoaderFactory classLoaderFactory;
- private final ExecutionContext executionContext;
- private final RepositorySource configurationSource;
+ private final ExecutionEnvironment env;
+ private final RepositorySourceManager sources;
+ private final FederatedRegion configurationRegion;
private final Administrator administrator = new Administrator();
private final ConcurrentMap<String, FederatedRepository> repositories = new
ConcurrentHashMap<String, FederatedRepository>();
private RepositoryConnection configurationConnection;
@@ -101,22 +123,26 @@
/**
* Create a federation service instance
*
- * @param configurationSource the repository source that contains the configuration
for this federation service (including the
- * respositories and the sources used by the federated repositories)
- * @param executionContext the context in which this service should run
+ * @param sources the source manager
+ * @param configurationRegion the repository region defining where the service can
find configuration information for the
+ * different repositories that it is to manage
+ * @param env the execution environment in which this service should run
* @param classLoaderFactory the class loader factory used to instantiate {@link
RepositorySource} instances; may be null if
* this instance should use a default factory that attempts to load classes
first from the
* {@link Thread#getContextClassLoader() thread's current context class
loader} and then from the class loader that
* loaded this class.
* @throws IllegalArgumentException if the bootstrap source is null or the execution
context is null
*/
- public FederationService( RepositorySource configurationSource,
- ExecutionContext executionContext,
+ public FederationService( RepositorySourceManager sources,
+ FederatedRegion configurationRegion,
+ ExecutionEnvironment env,
ClassLoaderFactory classLoaderFactory ) {
- ArgCheck.isNotNull(configurationSource, "configurationSource");
- ArgCheck.isNotNull(executionContext, "executionContext");
- this.configurationSource = configurationSource;
- this.executionContext = executionContext;
+ ArgCheck.isNotNull(configurationRegion, "configurationRegion");
+ ArgCheck.isNotNull(sources, "sources");
+ ArgCheck.isNotNull(env, "env");
+ this.sources = sources;
+ this.configurationRegion = configurationRegion;
+ this.env = env;
this.classLoaderFactory = classLoaderFactory != null ? classLoaderFactory : new
StandardClassLoaderFactory();
}
@@ -128,22 +154,27 @@
}
/**
- * Get the source for the repository containing the configuration for this federation
service.
- *
- * @return the configuration repository source; never null
+ * @return configurationRegion
*/
- public RepositorySource getConfigurationSource() {
- return this.configurationSource;
+ public FederatedRegion getConfigurationRegion() {
+ return configurationRegion;
}
/**
- * @return executionContext
+ * @return sources
*/
- public ExecutionContext getExecutionContext() {
- return this.executionContext;
+ public RepositorySourceManager getRepositorySourceManager() {
+ return sources;
}
/**
+ * @return env
+ */
+ public ExecutionEnvironment getExecutionEnvironment() {
+ return env;
+ }
+
+ /**
* @return classLoaderFactory
*/
public ClassLoaderFactory getClassLoaderFactory() {
@@ -157,115 +188,137 @@
protected synchronized void startService() {
if (this.configurationConnection == null) {
+ Problems problems = new SimpleProblems();
+
+ //
------------------------------------------------------------------------------------
+ // Establish a connection to the configuration source ...
+ //
------------------------------------------------------------------------------------
+ final String configurationSourceName = configurationRegion.getSourceName();
+ RepositoryConnectionFactory factory =
sources.getConnectionFactory(configurationSourceName);
+ if (factory == null) {
+ I18n msg = RepositoryI18n.unableToFindRepositorySourceByName;
+ throw new FederationException(msg.text(configurationSourceName));
+ }
try {
- this.configurationConnection = this.configurationSource.getConnection();
+ this.configurationConnection = factory.getConnection();
} catch (InterruptedException err) {
I18n msg =
RepositoryI18n.interruptedWhileConnectingToFederationConfigurationRepository;
- throw new FederationException(msg.text(configurationSource.getName()));
+ throw new FederationException(msg.text(configurationSourceName));
}
- // // Read the configuration and obtain the RepositorySource instances for
each of the
- // // federated repositories. Each repository configuration is rooted at
- // // "/dna:repositories/[repositoryName]", and under this node at
"dna:federation"
- // // is the RepositorySource for the integrated repository, and under the
- // // "dna:sources/[sourceName]" are the nodes representing the
inital RepositorySource
- // // instances for each of the sources.
- // //
- // // The integrated repository for each federated repository contains the
complete unified
- // // graph merged from all of the sources. It also contains the
configuration for the federated
- // // repository at "/dna:system/dna:federation", including under
"dna:sources" a node for each of the current
- // // sources (e.g.,
"/dna:system/dna:federation/dna:sources/[sourceName]"). If this area of the
- // // graph does not yet exist, the sources are copied from the
- // // "/dna:repositories/[repositoryName]/dna:sources" area of the
service's configuration repository.
- // // However, after that, the federated repository manages its own sources.
- //
- // ValueFactories valueFactories = executionContext.getValueFactories();
- // PathFactory pathFactory = valueFactories.getPathFactory();
- //
- // // Get the list of repositories in the configuration, and create a
FederatedRepository for each one ...
- // try {
- // Path repositoriesNode = pathFactory.create("dna:repositories");
- // BasicGetChildrenCommand getRepositories = new
BasicGetChildrenCommand(repositoriesNode);
- // configurationConnection.execute(executionContext, getRepositories);
- //
- // // For each repository ...
- // for (Path.Segment child : getRepositories.getChildren()) {
- //
- // // Get the repository's name ...
- // final String repositoryName = child.getUnencodedString();
- // final Path pathToRepository = pathFactory.create(repositoriesNode,
child);
- //
- // // Record the initial sources ...
- // final Path.Segment sourcesSegment =
pathFactory.createSegment("dna:sources");
- // final Path pathToRepositorySourcesNode =
pathFactory.create(pathToRepository, sourcesSegment);
- // BasicRecordBranchCommand getSources = new
BasicRecordBranchCommand(pathToRepositorySourcesNode,
- // NodeConflictBehavior.DO_NOT_REPLACE);
- //
- // // Get the source of the integrated repository ...
- // final Path.Segment integratedRepositorySegment =
pathFactory.createSegment("dna:federatedRepository");
- // final Path pathToIntegratedRepositoryNode =
pathFactory.create(pathToRepository, integratedRepositorySegment);
- // BasicGetNodeCommand getSource = new
BasicGetNodeCommand(pathToIntegratedRepositoryNode);
- // configurationConnection.execute(executionContext, getSource);
- // RepositorySource integratedRepositorySource =
createRepositorySource(valueFactories,
- // getSource.getProperties());
- //
- // // Copy these to the federated repository ...
- // RepositoryConnection integratedConnection = integratedReposi
- // // Look for the
- // // Read the initial sources ...
- //
- // // Get the repository source information for the integrated repository
...
- //
- // // Look for existing sources and load them ...
- //
- // // Otherwise, read the intial sources from the
- //
- // }
- // } catch (InterruptedException err) {
- // I18n msg =
RepositoryI18n.interruptedWhileUsingFederationConfigurationRepository;
- // throw new FederationException(msg.text(configurationSource.getName()));
- // }
- // // TODO
+
+ //
------------------------------------------------------------------------------------
+ // Read the configuration ...
+ //
------------------------------------------------------------------------------------
+ ValueFactories valueFactories = env.getValueFactories();
+ PathFactory pathFactory = valueFactories.getPathFactory();
+
+ // Read the configuration and the repository sources, located as child
nodes/branches under "/dna:sources",
+ // and then instantiate and register each in the "sources" manager
+ try {
+ Path sourcesNode = pathFactory.create("/dna:sources");
+ BasicGetChildrenCommand getSources = new
BasicGetChildrenCommand(sourcesNode);
+ configurationConnection.execute(env, getSources);
+
+ // Build the commands to get each of the children ...
+ List<Path.Segment> children = getSources.getChildren();
+ if (children.isEmpty()) {
+ BasicCompositeCommand commands = new BasicCompositeCommand();
+ for (Path.Segment child : getSources.getChildren()) {
+ final Path pathToSource = pathFactory.create(sourcesNode,
child);
+ commands.add(new BasicGetNodeCommand(pathToSource));
+ }
+ configurationConnection.execute(env, commands);
+
+ // Iterate over each source node obtained ...
+ for (GraphCommand command : commands) {
+ BasicGetNodeCommand getSourceCommand =
(BasicGetNodeCommand)command;
+ RepositorySource source =
createRepositorySource(getSourceCommand.getPath(),
+
getSourceCommand.getProperties(),
+ problems);
+ if (source != null) sources.addSource(source);
+ }
+ }
+ } catch (InterruptedException err) {
+ I18n msg =
RepositoryI18n.interruptedWhileUsingFederationConfigurationRepository;
+ throw new FederationException(msg.text(configurationSourceName));
+ }
+
}
}
- // protected RepositorySource createRepositorySource( ValueFactories values,
- // Iterable<Property> properties ) {
- // // Put the properties in a map so we can find them by name ...
- // Map<Name, Property> byName = new HashMap<Name, Property>();
- // for (Property property : properties) {
- // byName.put(property.getName(), property);
- // }
- //
- // // Get the concrete class ...
- // Name classnameName = values.getNameFactory().create("dna:className");
- // Property classProperty = byName.get(classnameName);
- // if (classProperty == null) return null;
- // if (classProperty.isEmpty()) return null;
- // String className =
values.getStringFactory().create(classProperty.getValues().next());
- // if (className == null) return null;
- //
- // Name classpathName = values.getNameFactory().create("dna:classpath");
- // Property classpathProperty = byName.get(classpathName);
- // String[] classpath = null;
- // if (classpathProperty != null) {
- // classpath =
values.getStringFactory().create(classpathProperty.getValuesAsArray());
- // }
- //
- // // Load the class and look for the constructors ...
- // RepositorySource source = null;
- // try {
- // ClassLoader loader = getClassLoaderFactory().getClassLoader(classpath);
- // Class<?> sourceClass = loader.loadClass(className);
- // source = (RepositorySource)sourceClass.newInstance();
- // } catch (ClassNotFoundException err) {
- // return null;
- // } catch (InstantiationException err) {
- //
- // } catch (IllegalAccessException err) {
- // }
- // return null;
- // }
+ /**
+ * Instantiate the {@link RepositorySource} described by the supplied properties.
+ *
+ * @param path the path to the node where these properties were found; never null
+ * @param properties the properties; never null
+ * @param problems the problems container in which any problems should be reported;
never null
+ * @return the repository source instance, or null if it could not be created
+ */
+ @SuppressWarnings( "null" )
+ protected RepositorySource createRepositorySource( Path path,
+ Map<Name, Property>
properties,
+ Problems problems ) {
+ ValueFactories valueFactories = env.getValueFactories();
+ NameFactory nameFactory = valueFactories.getNameFactory();
+ ValueFactory<String> stringFactory = valueFactories.getStringFactory();
+ // Get the classname and classpath ...
+ Property classnameProperty =
properties.get(nameFactory.create(CLASSNAME_PROPERTY_NAME));
+ Property classpathProperty =
properties.get(nameFactory.create(CLASSPATH_PROPERTY_NAME));
+ if (classnameProperty == null) {
+ problems.addError(RepositoryI18n.requiredPropertyIsMissingFromNode,
CLASSNAME_PROPERTY_NAME, path);
+ }
+ if (classpathProperty == null) {
+ problems.addError(RepositoryI18n.requiredPropertyIsMissingFromNode,
CLASSPATH_PROPERTY_NAME, path);
+ }
+ if (problems.hasErrors()) return null;
+
+ // Create the instance ...
+ String classname = stringFactory.create(classnameProperty.getValues().next());
+ String[] classpath = stringFactory.create(classpathProperty.getValuesAsArray());
+ ClassLoader classLoader = this.classLoaderFactory.getClassLoader(classpath);
+ RepositorySource source = null;
+ try {
+ Class<?> sourceClass = classLoader.loadClass(classname);
+ source = (RepositorySource)sourceClass.newInstance();
+ } catch (ClassNotFoundException err) {
+ problems.addError(err, RepositoryI18n.unableToLoadClassUsingClasspath,
classname, classpath);
+ } catch (IllegalAccessException err) {
+ problems.addError(err, RepositoryI18n.unableToAccessClassUsingClasspath,
classname, classpath);
+ } catch (Throwable err) {
+ problems.addError(err, RepositoryI18n.unableToInstantiateClassUsingClasspath,
classname, classpath);
+ }
+
+ // Now set all the properties that we can, ignoring any property that doesn't
fit pattern ...
+ Reflection reflection = new Reflection(source.getClass());
+ for (Map.Entry<Name, Property> entry : properties.entrySet()) {
+ Name propertyName = entry.getKey();
+ Property property = entry.getValue();
+ String javaPropertyName = propertyName.getLocalName();
+ if (property.isEmpty()) continue;
+ Object value = null;
+ if (property.isSingle()) {
+ value = property.getValues().next();
+ } else if (property.isMultiple()) {
+ value = property.getValuesAsArray();
+ }
+ try {
+ reflection.invokeSetterMethodOnTarget(javaPropertyName, source, value);
+ } catch (SecurityException err) {
+ // Do nothing ... assume not a JavaBean property
+ } catch (NoSuchMethodException err) {
+ // Do nothing ... assume not a JavaBean property
+ } catch (IllegalArgumentException err) {
+ // Do nothing ... assume not a JavaBean property
+ } catch (IllegalAccessException err) {
+ // Do nothing ... assume not a JavaBean property
+ } catch (InvocationTargetException err) {
+ // Do nothing ... assume not a JavaBean property
+ }
+ }
+ return source;
+ }
+
/**
* Get the federated repository object with the given name. The resulting repository
will be started and ready to use.
*
@@ -273,14 +326,17 @@
* @return the repository instance
*/
protected FederatedRepository getRepository( String name ) {
+ if (this.configurationConnection == null) startService();
+
// Look for an existing repository ...
FederatedRepository repository = this.repositories.get(name);
if (repository == null) {
// Look up the node representing the repository in the configuration ...
- // New up a repository and configure it ...
- repository = new FederatedRepository(this, name);
- // Now register it, being careful to not overwrite any added since previous
call ..
+ // // New up a repository and configure it ...
+ // Repository
+ repository = new FederatedRepository(name, env, sources);
+ // Now register it, being careful to not overwrite any added since
"get" call above ..
FederatedRepository existingRepository = this.repositories.putIfAbsent(name,
repository);
if (existingRepository != null) repository = existingRepository;
}
@@ -294,8 +350,8 @@
try {
this.configurationConnection.close();
} catch (InterruptedException err) {
- throw new FederationException(
-
RepositoryI18n.interruptedWhileClosingConnectionToFederationConfigurationRepository.text(this.configurationSource.getName()));
+ I18n msg =
RepositoryI18n.interruptedWhileClosingConnectionToFederationConfigurationRepository;
+ throw new
FederationException(msg.text(configurationRegion.getSourceName()));
}
// Now shut down all repositories ...
for (String repositoryName : this.repositories.keySet()) {
Modified:
trunk/dna-repository/src/main/java/org/jboss/dna/repository/util/SimpleExecutionContext.java
===================================================================
---
trunk/dna-repository/src/main/java/org/jboss/dna/repository/util/SimpleExecutionContext.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-repository/src/main/java/org/jboss/dna/repository/util/SimpleExecutionContext.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -25,19 +25,15 @@
import org.jboss.dna.spi.graph.NamespaceRegistry;
import org.jboss.dna.spi.graph.PropertyFactory;
import org.jboss.dna.spi.graph.ValueFactories;
-import org.jboss.dna.spi.graph.impl.BasicPropertyFactory;
-import org.jboss.dna.spi.graph.impl.StandardValueFactories;
+import org.jboss.dna.spi.graph.connection.BasicExecutionEnvironment;
/**
* @author Randall Hauch
*/
-public class SimpleExecutionContext implements ExecutionContext {
+public class SimpleExecutionContext extends BasicExecutionEnvironment implements
ExecutionContext {
private final JcrTools tools = new JcrTools();
- private final PropertyFactory propertyFactory;
private final SessionFactory sessionFactory;
- private final ValueFactories valueFactories;
- private final NamespaceRegistry namespaceRegistry;
public SimpleExecutionContext( SessionFactory sessionFactory,
String repositoryWorkspaceForNamespaceRegistry ) {
@@ -53,38 +49,14 @@
NamespaceRegistry namespaceRegistry,
ValueFactories valueFactories,
PropertyFactory propertyFactory ) {
+ super(namespaceRegistry, valueFactories, propertyFactory);
ArgCheck.isNotNull(sessionFactory, "session factory");
- ArgCheck.isNotNull(namespaceRegistry, "namespace registry");
this.sessionFactory = sessionFactory;
- this.namespaceRegistry = namespaceRegistry;
- this.valueFactories = valueFactories != null ? valueFactories : new
StandardValueFactories(this.namespaceRegistry);
- this.propertyFactory = propertyFactory != null ? propertyFactory : new
BasicPropertyFactory(this.valueFactories);
}
/**
* {@inheritDoc}
*/
- public NamespaceRegistry getNamespaceRegistry() {
- return this.namespaceRegistry;
- }
-
- /**
- * {@inheritDoc}
- */
- public ValueFactories getValueFactories() {
- return this.valueFactories;
- }
-
- /**
- * {@inheritDoc}
- */
- public PropertyFactory getPropertyFactory() {
- return this.propertyFactory;
- }
-
- /**
- * {@inheritDoc}
- */
public SessionFactory getSessionFactory() {
return this.sessionFactory;
}
Modified:
trunk/dna-repository/src/main/resources/org/jboss/dna/repository/RepositoryI18n.properties
===================================================================
---
trunk/dna-repository/src/main/resources/org/jboss/dna/repository/RepositoryI18n.properties 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-repository/src/main/resources/org/jboss/dna/repository/RepositoryI18n.properties 2008-06-30
21:42:54 UTC (rev 320)
@@ -91,6 +91,9 @@
errorUnregisteringWorkspaceListenerWhileShuttingDownObservationService = Error
unregistering workspace listener while shutting down observation service
invalidRepositoryNodePath = The repository node path "{0}" is not valid: {1}
+unableToLoadClassUsingClasspath = Unable to load class "{0}" using classpath
"{1}"
+unableToInstantiateClassUsingClasspath = Unable to instantiate class "{0}"
using classpath "{1}"
+unableToAccessClassUsingClasspath = Unable to access class "{0}" using
classpath "{1}"
errorSequencingXmlDocument = An error was received while sequencing XML: {0}
fatalErrorSequencingXmlDocument = A fatal error was received while sequencing XML: {0}
@@ -101,6 +104,8 @@
interruptedWhileConnectingToFederationConfigurationRepository = Interrupted while
connecting to federation configuration repository "{0}"
interruptedWhileUsingFederationConfigurationRepository = Interrupted while using
federation configuration repository "{0}"
interruptedWhileClosingConnectionToFederationConfigurationRepository = Interrupted while
closing connection to federation configuration repository "{0}"
+unableToFindRepositorySourceByName = Unable to find the repository source
"{0}"
unableToCreateConnectionToFederatedRepository = Unable to create a connection to the
repository "{0}". Check the Federation Service configuration.
unableToAuthenticateConnectionToFederatedRepository = Unable to authenticate
"{1}" for repository "{0}"
repositoryHasBeenShutDown = The "{0}" repository has been shut down and may no
longer be used.
+repositoryPathInFederationBindingIsNotAbsolute = The repository path in a federation
binding must be absolute, but was "{0}"
Modified:
trunk/dna-repository/src/test/java/org/jboss/dna/repository/federation/FederatedRepositoryTest.java
===================================================================
---
trunk/dna-repository/src/test/java/org/jboss/dna/repository/federation/FederatedRepositoryTest.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-repository/src/test/java/org/jboss/dna/repository/federation/FederatedRepositoryTest.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -27,13 +27,9 @@
import static org.junit.Assert.assertThat;
import static org.junit.matchers.JUnitMatchers.hasItems;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.stub;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import java.util.concurrent.TimeUnit;
-import org.jboss.dna.spi.graph.connection.RepositoryConnection;
+import org.jboss.dna.spi.graph.connection.BasicExecutionEnvironment;
+import org.jboss.dna.spi.graph.connection.ExecutionEnvironment;
import org.jboss.dna.spi.graph.connection.RepositorySourceListener;
-import org.jboss.dna.spi.graph.connection.TimeDelayingRepositorySource;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
@@ -44,28 +40,24 @@
*/
public class FederatedRepositoryTest {
+ private ExecutionEnvironment env;
private FederatedRepository repository;
private String name;
@Mock
- private FederationService service;
- @Mock
private RepositorySourceListener listener1;
@Mock
private RepositorySourceListener listener2;
@Mock
- private FederatedSource source1;
- @Mock
- private FederatedSource source2;
+ private RepositoryConnectionFactories connectionFactories;
- // private RepositoryConnectionPool connectionPool;
+ // private BasicRepositoryConnectionPool connectionPool;
@Before
public void beforeEach() {
MockitoAnnotations.initMocks(this);
+ env = new BasicExecutionEnvironment();
name = "Test repository";
- repository = new FederatedRepository(service, name);
- stub(source1.getName()).toReturn("soure 1");
- stub(source2.getName()).toReturn("soure 2");
+ repository = new FederatedRepository(name, env, connectionFactories);
}
@Test
@@ -74,11 +66,6 @@
}
@Test
- public void shouldHaveFederationServicePassedIntoConstructor() {
- assertThat(repository.getService(), is(sameInstance(service)));
- }
-
- @Test
public void shouldHaveAdministrator() {
assertThat(repository.getAdministrator(), is(notNullValue()));
}
@@ -150,73 +137,32 @@
}
@Test
- public void shouldHaveNoSourcesAfterInitialization() {
- assertThat(repository.getSources(), is(notNullValue()));
- assertThat(repository.getSources().isEmpty(), is(true));
+ public void shouldHaveNoFederationRegionsAfterInitialization() {
+ assertThat(repository.getRegions(), is(notNullValue()));
+ assertThat(repository.getRegions().isEmpty(), is(true));
}
@Test
- public void shouldAddSourceThatIsNotAlreadyRegistered() {
- assertThat(repository.getSources(), hasItems(new FederatedSource[] {}));
- assertThat(repository.addSource(source1), is(true));
- assertThat(repository.getSources(), hasItems(source1));
- assertThat(repository.addSource(source2), is(true));
- assertThat(repository.getSources(), hasItems(source1, source2));
+ public void shouldAddFederationRegionThatIsNotAlreadyRegistered() {
+ FederatedRegion region1 = mock(FederatedRegion.class);
+ FederatedRegion region2 = mock(FederatedRegion.class);
+
+ assertThat(repository.getRegions().isEmpty(), is(true));
+ assertThat(repository.addRegionIfAbsent(region1), is(true));
+ assertThat(repository.getRegions(), hasItems(region1));
+ assertThat(repository.addRegionIfAbsent(region2), is(true));
+ assertThat(repository.getRegions(), hasItems(region1, region2));
}
@Test
public void shouldNotAddSourceThatIsAlreadyRegistered() {
- String source1Name = source1.getName();
- FederatedSource source1a = mock(FederatedSource.class);
- stub(source1a.getName()).toReturn(source1Name);
+ FederatedRegion region1 = mock(FederatedRegion.class);
- assertThat(repository.getSources(), hasItems(new FederatedSource[] {}));
- assertThat(repository.addSource(source1), is(true));
- assertThat(repository.getSources(), hasItems(source1));
- assertThat(repository.addSource(source2), is(true));
- assertThat(repository.getSources(), hasItems(source1, source2));
- assertThat(repository.addSource(source1a), is(false));
- assertThat(repository.getSources(), hasItems(source1, source2));
+ assertThat(repository.getRegions().isEmpty(), is(true));
+ assertThat(repository.addRegionIfAbsent(region1), is(true));
+ assertThat(repository.getRegions(), hasItems(region1));
+ assertThat(repository.addRegionIfAbsent(region1), is(false));
+ assertThat(repository.getRegions(), hasItems(region1));
}
- @Test
- public void shouldShutdownAndRemoveRepositoryFromFederationService() {
- repository.getAdministrator().shutdown();
- verify(service, times(1)).removeRepository(repository);
- }
-
- @Test
- public void shouldShutdownAllSourceConnectionPoolsWhenShuttingDownRepository() throws
Exception {
- // Create the source instances that wait during termination ...
- TimeDelayingRepositorySource timeDelaySource1 = new
TimeDelayingRepositorySource("time delay source 1");
- source1 = new FederatedSource(timeDelaySource1);
- TimeDelayingRepositorySource timeDelaySource2 = new
TimeDelayingRepositorySource("time delay source 2");
- source2 = new FederatedSource(timeDelaySource2);
- repository.addSource(source1);
- repository.addSource(source2);
- assertThat(repository.getSources(), hasItems(source1, source2));
-
- // Get a connection from one source ...
- RepositoryConnection connection = source2.getConnection();
- assertThat(connection, is(notNullValue()));
-
- // Shut down the repository, which will shut down each of the sources ...
- repository.getAdministrator().shutdown();
- assertThat(repository.getAdministrator().isShutdown(), is(true));
- assertThat(repository.getAdministrator().isTerminated(), is(false));
-
- // Source 1 should be shut down AND terminated ...
- assertThat(source1.getConnectionPool().isShutdown(), is(true));
- assertThat(source1.getConnectionPool().isTerminated(), is(true));
-
- // Source 2 should be shutdown but not terminated, since we still have a
connection ...
- assertThat(source2.getConnectionPool().isShutdown(), is(true));
- assertThat(source2.getConnectionPool().isTerminated(), is(false));
-
- // Close the connection ...
- connection.close();
- assertThat(repository.getAdministrator().awaitTermination(1, TimeUnit.SECONDS),
is(true));
- assertThat(repository.getAdministrator().isShutdown(), is(true));
- assertThat(repository.getAdministrator().isTerminated(), is(true));
- }
}
Deleted:
trunk/dna-repository/src/test/java/org/jboss/dna/repository/federation/FederatedSourceTest.java
===================================================================
---
trunk/dna-repository/src/test/java/org/jboss/dna/repository/federation/FederatedSourceTest.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-repository/src/test/java/org/jboss/dna/repository/federation/FederatedSourceTest.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -1,179 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.dna.repository.federation;
-
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.core.IsNull.notNullValue;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.stub;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import java.util.concurrent.TimeUnit;
-import org.jboss.dna.spi.graph.connection.RepositoryConnection;
-import org.jboss.dna.spi.graph.connection.RepositoryConnectionPool;
-import org.jboss.dna.spi.graph.connection.RepositorySource;
-import org.jboss.dna.spi.graph.connection.RepositorySourceException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.MockitoAnnotations;
-import org.mockito.MockitoAnnotations.Mock;
-
-/**
- * @author Randall Hauch
- */
-public class FederatedSourceTest {
-
- private FederatedSource source;
- @Mock
- private RepositoryConnection connection;
- @Mock
- private RepositorySource repositorySource;
-
- /**
- * @throws java.lang.Exception
- */
- @Before
- public void beforeEach() throws Exception {
- MockitoAnnotations.initMocks(this);
- this.repositorySource = mock(RepositorySource.class);
- this.source = new FederatedSource(this.repositorySource);
- this.connection = mock(RepositoryConnection.class);
- }
-
- @After
- public void afterEach() throws Exception {
- RepositoryConnectionPool pool = source.getConnectionPool();
- pool.shutdown();
- pool.awaitTermination(1, TimeUnit.SECONDS);
- }
-
- @Test
- public void shouldConstructWithNonNullRepositorySource() {
- assertThat(source, is(notNullValue()));
- }
-
- @Test( expected = IllegalArgumentException.class )
- public void shouldFailToConstructWithNullRepositorySource() {
- new FederatedSource(null);
- }
-
- @Test
- public void shouldHaveConnectionPoolAfterConstruction() {
- assertThat(source.getConnectionPool(), is(notNullValue()));
- }
-
- @Test
- public void shouldGetTheNameFromTheRepositorySource() {
- String theName = "My Repository";
- stub(repositorySource.getName()).toReturn(theName);
- assertThat(source.getName(), is(theName));
- verify(repositorySource, times(1)).getName();
-
- // Change the name of the source, and try again ...
- theName = theName + " part deux";
- stub(repositorySource.getName()).toReturn(theName);
- assertThat(source.getName(), is(theName));
- verify(repositorySource, times(2)).getName();
- }
-
- @Test
- public void shouldCheckAvailabilityByCreatingConnectionAndPing() throws Exception {
- // Set up the connection mock to return value from ping, and then
- // set up the source mock to return the connection ...
- stub(this.repositorySource.getConnection()).toReturn(connection);
- stub(connection.ping(1, TimeUnit.SECONDS)).toReturn(true);
- assertThat(source.isAvailable(1, TimeUnit.SECONDS), is(true));
- }
-
- @Test
- public void shouldConsiderSourceToNotBeAvailabilityWhenUnableToPingConnection()
throws Exception {
- // Set up the connection mock to return value from ping, and then
- // set up the source mock to return the connection ...
- stub(this.repositorySource.getConnection()).toReturn(connection);
- stub(connection.ping(1, TimeUnit.SECONDS)).toReturn(false);
- assertThat(source.isAvailable(1, TimeUnit.SECONDS), is(false));
- }
-
- @Test
- public void shouldConsiderSourceToNotBeAvailabilityWhenPingThrowsException() throws
Exception {
- // Set up the connection mock to return value from ping, and then
- // set up the source mock to return the connection ...
- stub(this.repositorySource.getConnection()).toReturn(connection);
- stub(connection.ping(1, TimeUnit.SECONDS)).toThrow(new
NullPointerException("sorry"));
- assertThat(source.isAvailable(1, TimeUnit.SECONDS), is(false));
- }
-
- @Test
- public void
shouldConsiderSourceToNotBeAvailabilityWhenGetConnectionThrowsRepositorySourceException()
throws Exception {
- // Set up the connection mock to return value from ping, and then
- // set up the source mock to return the connection ...
- stub(this.repositorySource.getConnection()).toThrow(new
RepositorySourceException("sorry"));
- assertThat(source.isAvailable(1, TimeUnit.SECONDS), is(false));
- }
-
- @Test
- public void
shouldConsiderSourceToNotBeAvailabilityWhenGetConnectionThrowsIllegalStateException()
throws Exception {
- stub(this.repositorySource.getConnection()).toThrow(new
IllegalStateException("sorry"));
- assertThat(source.isAvailable(1, TimeUnit.SECONDS), is(false));
- }
-
- @Test
- public void
shouldConsiderSourceToNotBeAvailabilityWhenGetConnectionThrowsInterruptedException()
throws Exception {
- stub(this.repositorySource.getConnection()).toThrow(new
InterruptedException("sorry"));
- assertThat(source.isAvailable(1, TimeUnit.SECONDS), is(false));
- }
-
- @Test
- public void
shouldConsiderSourceToNotBeAvailabilityWhenGetConnectionThrowsAnyException() throws
Exception {
- stub(this.repositorySource.getConnection()).toThrow(new
NullPointerException("sorry"));
- assertThat(source.isAvailable(1, TimeUnit.SECONDS), is(false));
- }
-
- @Test
- public void shouldConsiderEqualAnyFederatedSourcesWithSameName() {
- RepositorySource repositorySource2 = mock(RepositorySource.class);
- FederatedSource source2 = new FederatedSource(repositorySource2);
-
- String theName = "Some name";
- stub(repositorySource.getName()).toReturn(theName);
- stub(repositorySource2.getName()).toReturn(theName);
- assertThat(source.equals(source2), is(true));
- }
-
- @Test
- public void shouldNotConsiderEqualAnyFederatedSourcesWithDifferentNames() {
- RepositorySource repositorySource2 = mock(RepositorySource.class);
- FederatedSource source2 = new FederatedSource(repositorySource2);
-
- String theName = "Some name";
- stub(repositorySource.getName()).toReturn(theName + " ");
- stub(repositorySource2.getName()).toReturn(theName);
- assertThat(source.equals(source2), is(false));
-
- stub(repositorySource.getName()).toReturn(theName.toLowerCase());
- stub(repositorySource2.getName()).toReturn(theName.toUpperCase());
- assertThat(source.equals(source2), is(false));
- }
-
-}
Modified: trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/PathFactory.java
===================================================================
--- trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/PathFactory.java 2008-06-30
21:34:41 UTC (rev 319)
+++ trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/PathFactory.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -87,6 +87,18 @@
Path createRelativePath( Path.Segment... segments );
/**
+ * Create a path by appending the supplied relative path to the supplied parent path.
The resulting path will be
+ * {@link Path#isAbsolute() absolute} if the supplied parent path is absolute.
+ *
+ * @param parentPath the path that is to provide the basis for the new path
+ * @param childPath the path that should be appended to the parent path
+ * @return the new path
+ * @throws IllegalArgumentException if the parent path reference or the child path
reference is null
+ */
+ Path create( Path parentPath,
+ Path childPath );
+
+ /**
* Create a path by appending the supplied names to the parent path.
*
* @param parentPath the path that is to provide the basis for the new path
Modified: trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/PropertyType.java
===================================================================
--- trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/PropertyType.java 2008-06-30
21:34:41 UTC (rev 319)
+++ trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/PropertyType.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -23,8 +23,11 @@
import java.math.BigDecimal;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.List;
import net.jcip.annotations.Immutable;
import org.jboss.dna.spi.SpiI18n;
@@ -48,6 +51,15 @@
URI("URI", ValueComparators.URI_COMPARATOR, URI.class),
OBJECT("Object", ValueComparators.OBJECT_COMPARATOR, Object.class);
+ private static final List<PropertyType> ALL_PROPERTY_TYPES;
+ static {
+ List<PropertyType> types = new ArrayList<PropertyType>();
+ for (PropertyType type : PropertyType.values()) {
+ types.add(type);
+ }
+ ALL_PROPERTY_TYPES = Collections.unmodifiableList(types);
+ }
+
private final String name;
private final Comparator<?> comparator;
private final Class<?> valueClass;
@@ -101,4 +113,13 @@
}
return OBJECT;
}
+
+ /**
+ * Return an iterator over all the property type enumeration literals.
+ *
+ * @return an immutable iterator
+ */
+ public static Iterator<PropertyType> iterator() {
+ return ALL_PROPERTY_TYPES.iterator();
+ }
}
Modified:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/impl/BasicGetPropertiesCommand.java
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/impl/BasicGetPropertiesCommand.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/impl/BasicGetPropertiesCommand.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -67,10 +67,14 @@
*
* @return the map of property name to values
*/
- public Iterable<Property> getProperties() {
+ public Iterable<Property> getPropertyIterator() {
return this.properties.values();
}
+ public Map<Name, Property> getProperties() {
+ return this.properties;
+ }
+
/**
* {@inheritDoc}
*/
Copied:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPool.java
(from rev 307,
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java)
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPool.java
(rev 0)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPool.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -0,0 +1,967 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.connection;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.transaction.xa.XAResource;
+import net.jcip.annotations.GuardedBy;
+import net.jcip.annotations.ThreadSafe;
+import org.jboss.dna.common.util.ArgCheck;
+import org.jboss.dna.common.util.Logger;
+import org.jboss.dna.spi.SpiI18n;
+import org.jboss.dna.spi.cache.CachePolicy;
+import org.jboss.dna.spi.graph.commands.GraphCommand;
+
+/**
+ * @author Randall Hauch
+ */
+@ThreadSafe
+public class BasicRepositoryConnectionPool implements RepositoryConnectionPool {
+
+ /**
+ * The core pool size for default-constructed pools is {@value} .
+ */
+ public static final int DEFAULT_CORE_POOL_SIZE = 1;
+
+ /**
+ * The maximum pool size for default-constructed pools is {@value} .
+ */
+ public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
+
+ /**
+ * The keep-alive time for connections in default-constructed pools is {@value}
seconds.
+ */
+ public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
+
+ /**
+ * Permission for checking shutdown
+ */
+ private static final RuntimePermission shutdownPerm = new
RuntimePermission("modifyThread");
+
+ /**
+ * The factory that this pool uses to create new connections.
+ */
+ private final RepositoryConnectionFactory connectionFactory;
+
+ /**
+ * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
+ */
+ private final ReentrantLock mainLock = new ReentrantLock();
+
+ /**
+ * Wait condition to support awaitTermination
+ */
+ private final Condition termination = mainLock.newCondition();
+
+ /**
+ * Set containing all connections that are available for use.
+ */
+ @GuardedBy( "mainLock" )
+ private final BlockingQueue<ConnectionWrapper> availableConnections = new
LinkedBlockingQueue<ConnectionWrapper>();
+
+ /**
+ * The connections that are currently in use.
+ */
+ @GuardedBy( "mainLock" )
+ private final Set<ConnectionWrapper> inUseConnections = new
HashSet<ConnectionWrapper>();
+
+ /**
+ * Timeout in nanoseconds for idle connections waiting to be used. Threads use this
timeout only when there are more than
+ * corePoolSize present. Otherwise they wait forever to be used.
+ */
+ private volatile long keepAliveTime;
+
+ /**
+ * The target pool size, updated only while holding mainLock, but volatile to allow
concurrent readability even during
+ * updates.
+ */
+ @GuardedBy( "mainLock" )
+ private volatile int corePoolSize;
+
+ /**
+ * Maximum pool size, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
+ */
+ @GuardedBy( "mainLock" )
+ private volatile int maximumPoolSize;
+
+ /**
+ * Current pool size, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
+ */
+ @GuardedBy( "mainLock" )
+ private volatile int poolSize;
+
+ /**
+ * Lifecycle state, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
+ */
+ @GuardedBy( "mainLock" )
+ private volatile int runState;
+
+ // Special values for runState
+ /** Normal, not-shutdown mode */
+ static final int RUNNING = 0;
+ /** Controlled shutdown mode */
+ static final int SHUTDOWN = 1;
+ /** Immediate shutdown mode */
+ static final int STOP = 2;
+ /** Final state */
+ static final int TERMINATED = 3;
+
+ /**
+ * Flag specifying whether a connection should be validated before returning it from
the {@link #getConnection()} method.
+ */
+ private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
+
+ /**
+ * The time in nanoseconds that ping should wait before timing out and failing.
+ */
+ private final AtomicLong pingTimeout = new AtomicLong(0);
+
+ /**
+ * The number of times an attempt to obtain a connection should fail with invalid
connections before throwing an exception.
+ */
+ private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
+
+ private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
+
+ private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
+
+ private final Logger logger = Logger.getLogger(this.getClass());
+
+ /**
+ * Create the pool to use the supplied connection factory, which is typically a
{@link RepositorySource}. This constructor
+ * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link
#DEFAULT_MAXIMUM_POOL_SIZE default maximum pool
+ * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in
seconds)}.
+ *
+ * @param connectionFactory the factory for connections
+ * @throws IllegalArgumentException if the connection factory is null or any of the
supplied arguments are invalid
+ */
+ public BasicRepositoryConnectionPool( RepositoryConnectionFactory connectionFactory )
{
+ this(connectionFactory, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE,
DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS,
+ TimeUnit.SECONDS);
+ }
+
+ /**
+ * Create the pool to use the supplied connection factory, which is typically a
{@link RepositorySource}.
+ *
+ * @param connectionFactory the factory for connections
+ * @param corePoolSize the number of connections to keep in the pool, even if they
are idle.
+ * @param maximumPoolSize the maximum number of connections to allow in the pool.
+ * @param keepAliveTime when the number of connection is greater than the core, this
is the maximum time that excess idle
+ * connections will be kept before terminating.
+ * @param unit the time unit for the keepAliveTime argument.
+ * @throws IllegalArgumentException if the connection factory is null or any of the
supplied arguments are invalid
+ */
+ public BasicRepositoryConnectionPool( RepositoryConnectionFactory connectionFactory,
+ int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit ) {
+ ArgCheck.isNonNegative(corePoolSize, "corePoolSize");
+ ArgCheck.isPositive(maximumPoolSize, "maximumPoolSize");
+ ArgCheck.isNonNegative(keepAliveTime, "keepAliveTime");
+ ArgCheck.isNotNull(connectionFactory, "repository connection
factory");
+ if (maximumPoolSize < corePoolSize) {
+ throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
+ }
+ this.connectionFactory = connectionFactory;
+ this.corePoolSize = corePoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ this.keepAliveTime = unit.toNanos(keepAliveTime);
+ this.setPingTimeout(100, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionFactory#getName()
+ */
+ public String getName() {
+ return this.connectionFactory.getName();
+ }
+
+ // -------------------------------------------------
+ // Property settings ...
+ // -------------------------------------------------
+
+ /**
+ * @return validateConnectionBeforeUse
+ */
+ public boolean getValidateConnectionBeforeUse() {
+ return this.validateConnectionBeforeUse.get();
+ }
+
+ /**
+ * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the
specified value.
+ */
+ public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
+ this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
+ }
+
+ /**
+ * @return pingTimeout
+ */
+ public long getPingTimeoutInNanos() {
+ return this.pingTimeout.get();
+ }
+
+ /**
+ * @param pingTimeout the time to wait for a ping to complete
+ * @param unit the time unit of the time argument
+ */
+ public void setPingTimeout( long pingTimeout,
+ TimeUnit unit ) {
+ ArgCheck.isNonNegative(pingTimeout, "time");
+ this.pingTimeout.set(unit.toNanos(pingTimeout));
+ }
+
+ /**
+ * @return maxFailedAttemptsBeforeError
+ */
+ public int getMaxFailedAttemptsBeforeError() {
+ return this.maxFailedAttemptsBeforeError.get();
+ }
+
+ /**
+ * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the
specified value.
+ */
+ public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
+ this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
+ }
+
+ /**
+ * Sets the time limit for which connections may remain idle before being closed. If
there are more than the core number of
+ * connections currently in the pool, after waiting this amount of time without being
used, excess threads will be terminated.
+ * This overrides any value set in the constructor.
+ *
+ * @param time the time to wait. A time value of zero will cause excess connections
to terminate immediately after being
+ * returned.
+ * @param unit the time unit of the time argument
+ * @throws IllegalArgumentException if time less than zero
+ * @see #getKeepAliveTime
+ */
+ public void setKeepAliveTime( long time,
+ TimeUnit unit ) {
+ ArgCheck.isNonNegative(time, "time");
+ this.keepAliveTime = unit.toNanos(time);
+ }
+
+ /**
+ * Returns the connection keep-alive time, which is the amount of time which
connections in excess of the core pool size may
+ * remain idle before being closed.
+ *
+ * @param unit the desired time unit of the result
+ * @return the time limit
+ * @see #setKeepAliveTime
+ */
+ public long getKeepAliveTime( TimeUnit unit ) {
+ assert unit != null;
+ return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * @return maximumPoolSize
+ */
+ public int getMaximumPoolSize() {
+ return this.maximumPoolSize;
+ }
+
+ /**
+ * Sets the maximum allowed number of connections. This overrides any value set in
the constructor. If the new value is
+ * smaller than the current value, excess existing but unused connections will be
closed.
+ *
+ * @param maximumPoolSize the new maximum
+ * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link
#getCorePoolSize() core pool size}
+ * @see #getMaximumPoolSize
+ */
+ public void setMaximumPoolSize( int maximumPoolSize ) {
+ ArgCheck.isPositive(maximumPoolSize, "maximum pool size");
+ if (maximumPoolSize < corePoolSize) {
+ throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
+ }
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ int extra = this.maximumPoolSize - maximumPoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ if (extra > 0 && poolSize > maximumPoolSize) {
+ // Drain the extra connections from those available ...
+ drainUnusedConnections(extra);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the core number of connections.
+ *
+ * @return the core number of connections
+ * @see #setCorePoolSize(int)
+ */
+ public int getCorePoolSize() {
+ return this.corePoolSize;
+ }
+
+ /**
+ * Sets the core number of connections. This overrides any value set in the
constructor. If the new value is smaller than the
+ * current value, excess existing and unused connections will be closed. If larger,
new connections will, if needed, be
+ * created.
+ *
+ * @param corePoolSize the new core size
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than
zero
+ * @see #getCorePoolSize()
+ */
+ public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException,
InterruptedException {
+ ArgCheck.isNonNegative(corePoolSize, "core pool size");
+ if (maximumPoolSize < corePoolSize) {
+ throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
+ }
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ int extra = this.corePoolSize - corePoolSize;
+ this.corePoolSize = corePoolSize;
+ if (extra < 0) {
+ // Add connections ...
+ addConnectionsIfUnderCorePoolSize();
+ } else if (extra > 0 && poolSize > corePoolSize) {
+ // Drain the extra connections from those available ...
+ drainUnusedConnections(extra);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ // -------------------------------------------------
+ // Statistics ...
+ // -------------------------------------------------
+
+ /**
+ * Returns the current number of connections in the pool, including those that are
checked out (in use) and those that are not
+ * being used.
+ *
+ * @return the number of connections
+ */
+ public int getPoolSize() {
+ return poolSize;
+ }
+
+ /**
+ * Returns the approximate number of connections that are currently checked out from
the pool.
+ *
+ * @return the number of checked-out connections
+ */
+ public int getInUseCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ return this.inUseConnections.size();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Get the total number of connections that have been created by this pool.
+ *
+ * @return the total number of connections created by this pool
+ */
+ public long getTotalConnectionsCreated() {
+ return this.totalConnectionsCreated.get();
+ }
+
+ /**
+ * Get the total number of times connections have been {@link #getConnection()}
used.
+ *
+ * @return the total number
+ */
+ public long getTotalConnectionsUsed() {
+ return this.totalConnectionsUsed.get();
+ }
+
+ // -------------------------------------------------
+ // State management methods ...
+ // -------------------------------------------------
+
+ /**
+ * Starts a core connection, causing it to idly wait for use. This overrides the
default policy of starting core connections
+ * only when they are {@link #getConnection() needed}. This method will return
<tt>false</tt> if all core connections have
+ * already been started.
+ *
+ * @return true if a connection was started
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ */
+ public boolean prestartCoreConnection() throws RepositorySourceException,
InterruptedException {
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ return addConnectionIfUnderCorePoolSize();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Starts all core connections, causing them to idly wait for use. This overrides the
default policy of starting core
+ * connections only when they are {@link #getConnection() needed}.
+ *
+ * @return the number of connections started.
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ */
+ public int prestartAllCoreConnections() throws RepositorySourceException,
InterruptedException {
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ return addConnectionsIfUnderCorePoolSize();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#shutdown()
+ */
+ public void shutdown() {
+ // Fail if caller doesn't have modifyThread permission. We
+ // explicitly check permissions directly because we can't trust
+ // implementations of SecurityManager to correctly override
+ // the "check access" methods such that our documented
+ // security policy is implemented.
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
java.security.AccessController.checkPermission(shutdownPerm);
+
+ this.logger.debug("Shutting down repository connection pool for {0}",
getName());
+ boolean fullyTerminated = false;
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ int state = this.runState;
+ if (state == RUNNING) {
+ // don't override shutdownNow
+ this.runState = SHUTDOWN;
+ }
+
+ // Kill the maintenance thread ...
+
+ // Remove and close all available connections ...
+ if (!this.availableConnections.isEmpty()) {
+ // Drain the extra connections from those available ...
+ drainUnusedConnections(this.availableConnections.size());
+ }
+
+ // If there are no connections being used, trigger full termination now ...
+ if (this.inUseConnections.isEmpty()) {
+ fullyTerminated = true;
+ this.logger.trace("Signalling termination of repository connection
pool for {0}", getName());
+ runState = TERMINATED;
+ termination.signalAll();
+ this.logger.debug("Terminated repository connection pool for
{0}", getName());
+ }
+ // Otherwise the last connection that is closed will transition the runState
to TERMINATED ...
+ } finally {
+ mainLock.unlock();
+ }
+ if (fullyTerminated) terminated();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#shutdownNow()
+ */
+ public void shutdownNow() {
+ // Almost the same code as shutdown()
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
java.security.AccessController.checkPermission(shutdownPerm);
+
+ this.logger.debug("Shutting down (immediately) repository connection pool
for {0}", getName());
+ boolean fullyTerminated = false;
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ int state = this.runState;
+ if (state != TERMINATED) {
+ // don't override shutdownNow
+ this.runState = STOP;
+ }
+
+ // Kill the maintenance thread ...
+
+ // Remove and close all available connections ...
+ if (!this.availableConnections.isEmpty()) {
+ // Drain the extra connections from those available ...
+ drainUnusedConnections(this.availableConnections.size());
+ }
+
+ // If there are connections being used, close them now ...
+ if (!this.inUseConnections.isEmpty()) {
+ for (ConnectionWrapper connectionInUse : this.inUseConnections) {
+ try {
+ this.logger.trace("Closing repository connection to
{0}", getName());
+ connectionInUse.getOriginal().close();
+ } catch (InterruptedException e) {
+ // Ignore this ...
+ }
+ }
+ this.poolSize -= this.inUseConnections.size();
+ // The last connection that is closed will transition the runState to
TERMINATED ...
+ } else {
+ // There are no connections in use, so trigger full termination now ...
+ fullyTerminated = true;
+ this.logger.trace("Signalling termination of repository connection
pool for {0}", getName());
+ runState = TERMINATED;
+ termination.signalAll();
+ this.logger.debug("Terminated repository connection pool for
{0}", getName());
+ }
+
+ } finally {
+ mainLock.unlock();
+ }
+ if (fullyTerminated) terminated();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#isRunning()
+ */
+ public boolean isRunning() {
+ return runState == RUNNING;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#isShutdown()
+ */
+ public boolean isShutdown() {
+ return runState != RUNNING;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#isTerminating()
+ */
+ public boolean isTerminating() {
+ return runState == STOP;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#isTerminated()
+ */
+ public boolean isTerminated() {
+ return runState == TERMINATED;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.RepositoryConnectionPool#awaitTermination(long,
java.util.concurrent.TimeUnit)
+ */
+ public boolean awaitTermination( long timeout,
+ TimeUnit unit ) throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ for (;;) {
+ // this.logger.debug("---> Run state = {}; condition = {}",
runState, termination);
+ if (runState == TERMINATED) return true;
+ if (nanos <= 0) return false;
+ nanos = termination.awaitNanos(nanos);
+ // this.logger.debug("---> Done waiting: run state = {};
condition = {}", runState, termination);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Method invoked when the pool has terminated. Default implementation does nothing.
Note: To properly nest multiple
+ * overridings, subclasses should generally invoke
<tt>super.terminated</tt> within this method.
+ */
+ protected void terminated() {
+ }
+
+ /**
+ * Invokes <tt>shutdown</tt> when this pool is no longer referenced.
+ */
+ @Override
+ protected void finalize() {
+ shutdown();
+ }
+
+ // -------------------------------------------------
+ // Connection management methods ...
+ // -------------------------------------------------
+
+ /**
+ * {@inheritDoc}
+ */
+ public RepositoryConnection getConnection() throws RepositorySourceException,
InterruptedException {
+ int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
+ ConnectionWrapper connection = null;
+ // Do this until we get a good connection ...
+ int attemptsRemaining = attemptsAllowed;
+ while (connection == null && attemptsRemaining > 0) {
+ --attemptsRemaining;
+ ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ // If we're shutting down the pool, then just close the connection
...
+ if (this.runState != RUNNING) {
+ throw new
IllegalStateException(SpiI18n.repositoryConnectionPoolIsNotRunning.text());
+ }
+ // If there are fewer total connections than the core size ...
+ if (this.poolSize < this.corePoolSize) {
+ // Immediately create a wrapped connection and return it ...
+ connection = newWrappedConnection();
+ }
+ // Peek to see if there is a connection available ...
+ else if (this.availableConnections.peek() != null) {
+ // There is, so take it and return it ...
+ connection = this.availableConnections.take();
+ }
+ // There is no connection available. If there are fewer total connections
than the maximum size ...
+ else if (this.poolSize < this.maximumPoolSize) {
+ // Immediately create a wrapped connection and return it ...
+ connection = newWrappedConnection();
+ }
+ if (connection != null) {
+ this.inUseConnections.add(connection);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ if (connection == null) {
+ // There are not enough connections, so wait in line for the next
available connection ...
+ this.logger.trace("Waiting for a repository connection from pool
{0}", getName());
+ connection = this.availableConnections.take();
+ mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (connection != null) {
+ this.inUseConnections.add(connection);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ this.logger.trace("Recieved a repository connection from pool
{0}", getName());
+ }
+ if (connection != null && this.validateConnectionBeforeUse.get()) {
+ connection = validateConnection(connection);
+ }
+ }
+ if (connection == null) {
+ // We were unable to obtain a usable connection, so fail ...
+ throw new
RepositorySourceException(SpiI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
+ }
+ this.totalConnectionsUsed.incrementAndGet();
+ return connection;
+ }
+
+ /**
+ * This method is automatically called by the {@link ConnectionWrapper} when it is
{@link ConnectionWrapper#close() closed}.
+ *
+ * @param wrapper the wrapper to the connection that is being returned to the pool
+ */
+ protected void returnConnection( ConnectionWrapper wrapper ) {
+ assert wrapper != null;
+ ConnectionWrapper wrapperToClose = null;
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ // Remove the connection from the in-use set ...
+ boolean removed = this.inUseConnections.remove(wrapper);
+ assert removed;
+
+ // If we're shutting down the pool, then just close the connection ...
+ if (this.runState != RUNNING) {
+ wrapperToClose = wrapper;
+ }
+ // If there are more connections than the maximum size...
+ else if (this.poolSize > this.maximumPoolSize) {
+ // Immediately close this connection ...
+ wrapperToClose = wrapper;
+ }
+ // Attempt to make the connection available (this should generally work,
unless there is an upper limit
+ // to the number of available connections) ...
+ else if (!this.availableConnections.offer(new
ConnectionWrapper(wrapper.getOriginal()))) {
+ // The pool of available connection is full, so release it ...
+ wrapperToClose = wrapper;
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ // Close the connection if we're supposed to (do it outside of the main
lock)...
+ if (wrapperToClose != null) {
+ try {
+ closeConnection(wrapper);
+ } catch (InterruptedException e) {
+ // catch this, as there's not much we can do and the caller
doesn't care or know how to handle it
+ this.logger.trace(e, "Interrupted while closing a repository
connection");
+ }
+ }
+ }
+
+ /**
+ * Validate the supplied connection, returning the connection if valid or null if the
connection is not valid.
+ *
+ * @param connection the connection to be validated; may not be null
+ * @return the validated connection, or null if the connection did not validate and
was removed from the pool
+ */
+ protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) {
+ assert connection != null;
+ ConnectionWrapper invalidConnection = null;
+ try {
+ if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
+ invalidConnection = connection;
+ }
+ } catch (InterruptedException e) {
+ // catch this, as there's not much we can do and the caller doesn't
care or know how to handle it
+ this.logger.trace(e, "Interrupted while pinging a repository
connection");
+ invalidConnection = connection;
+ } finally {
+ if (invalidConnection != null) {
+ connection = null;
+ returnConnection(invalidConnection);
+ }
+ }
+ return connection;
+ }
+
+ /**
+ * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does
not check whether creating the new
+ * connection would violate the {@link #maximumPoolSize maximum pool size} nor does
it add the new connection to the
+ * {@link #availableConnections available connections} (as the caller may want it
immediately), but it does increment the
+ * {@link #poolSize pool size}.
+ *
+ * @return the connection wrapper with a new connection
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ */
+ @GuardedBy( "mainLock" )
+ protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException,
InterruptedException {
+ RepositoryConnection connection = this.connectionFactory.getConnection();
+ ++this.poolSize;
+ this.totalConnectionsCreated.incrementAndGet();
+ return new ConnectionWrapper(connection);
+ }
+
+ /**
+ * Close a connection that is in the pool but no longer in the {@link
#availableConnections available connections}. This
+ * method does decrement the {@link #poolSize pool size}.
+ *
+ * @param wrapper the wrapper for the connection to be closed
+ * @throws InterruptedException if the thread was interrupted during the operation
+ */
+ protected void closeConnection( ConnectionWrapper wrapper ) throws
InterruptedException {
+ assert wrapper != null;
+ RepositoryConnection original = wrapper.getOriginal();
+ assert original != null;
+ try {
+ this.logger.debug("Closing repository connection to {0}",
getName());
+ original.close();
+ } finally {
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ // No matter what reduce the pool size count
+ --this.poolSize;
+ // And if shutting down and this was the last connection being used...
+ if (this.runState == SHUTDOWN && this.poolSize <= 0) {
+ // then signal anybody that has called
"awaitTermination(...)"
+ this.logger.trace("Signalling termination of repository
connection pool for {0}", getName());
+ this.runState = TERMINATED;
+ this.termination.signalAll();
+ this.logger.trace("Terminated repository connection pool for
{0}", getName());
+
+ // fall through to call terminate() outside of lock.
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+ }
+
+ @GuardedBy( "mainLock" )
+ protected int drainUnusedConnections( int count ) {
+ if (count <= 0) return 0;
+ this.logger.trace("Draining up to {0} unused repository connections to
{1}", count, getName());
+ // Drain the extra connections from those available ...
+ Collection<ConnectionWrapper> extraConnections = new
LinkedList<ConnectionWrapper>();
+ this.availableConnections.drainTo(extraConnections, count);
+ for (ConnectionWrapper connection : extraConnections) {
+ try {
+ this.logger.trace("Closing repository connection to {0}",
getName());
+ connection.getOriginal().close();
+ } catch (InterruptedException e) {
+ // Ignore this ...
+ }
+ }
+ int numClosed = extraConnections.size();
+ this.poolSize -= numClosed;
+ this.logger.trace("Drained {0} unused connections", numClosed);
+ return numClosed;
+ }
+
+ @GuardedBy( "mainLock" )
+ protected boolean addConnectionIfUnderCorePoolSize() throws
RepositorySourceException, InterruptedException {
+ // Add connection ...
+ if (this.poolSize < this.corePoolSize) {
+ this.availableConnections.offer(newWrappedConnection());
+ this.logger.trace("Added connection to {0} in undersized pool",
getName());
+ return true;
+ }
+ return false;
+ }
+
+ @GuardedBy( "mainLock" )
+ protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException,
InterruptedException {
+ // Add connections ...
+ int n = 0;
+ while (this.poolSize < this.corePoolSize) {
+ this.availableConnections.offer(newWrappedConnection());
+ ++n;
+ }
+ this.logger.trace("Added {0} connection(s) to {1} in undersized pool",
n, getName());
+ return n;
+ }
+
+ protected class ConnectionWrapper implements RepositoryConnection {
+
+ private final RepositoryConnection original;
+ private final long timeCreated;
+ private long lastUsed;
+ private boolean closed = false;
+
+ protected ConnectionWrapper( RepositoryConnection connection ) {
+ assert connection != null;
+ this.original = connection;
+ this.timeCreated = System.currentTimeMillis();
+ }
+
+ /**
+ * @return original
+ */
+ protected RepositoryConnection getOriginal() {
+ return this.original;
+ }
+
+ /**
+ * @return lastUsed
+ */
+ public long getTimeLastUsed() {
+ return this.lastUsed;
+ }
+
+ /**
+ * @return timeCreated
+ */
+ public long getTimeCreated() {
+ return this.timeCreated;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getSourceName() {
+ return this.original.getSourceName();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public XAResource getXAResource() {
+ if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
+ return this.original.getXAResource();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public CachePolicy getDefaultCachePolicy() {
+ if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
+ return this.original.getDefaultCachePolicy();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void execute( ExecutionEnvironment env,
+ GraphCommand... commands ) throws RepositorySourceException,
InterruptedException {
+ if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
+ this.original.execute(env, commands);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean ping( long time,
+ TimeUnit unit ) throws InterruptedException {
+ if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
+ return this.original.ping(time, unit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void close() throws InterruptedException {
+ if (!closed) {
+ this.lastUsed = System.currentTimeMillis();
+ this.original.close();
+ this.closed = true;
+ returnConnection(this);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setListener( RepositorySourceListener listener ) {
+ if (!closed) this.original.setListener(listener);
+ }
+
+ }
+
+}
Property changes on:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPool.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Deleted:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -1,989 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.dna.spi.graph.connection;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.transaction.xa.XAResource;
-import net.jcip.annotations.GuardedBy;
-import net.jcip.annotations.ThreadSafe;
-import org.jboss.dna.common.util.ArgCheck;
-import org.jboss.dna.common.util.Logger;
-import org.jboss.dna.spi.SpiI18n;
-import org.jboss.dna.spi.cache.CachePolicy;
-import org.jboss.dna.spi.graph.commands.GraphCommand;
-
-/**
- * @author Randall Hauch
- */
-@ThreadSafe
-public class RepositoryConnectionPool implements RepositoryConnectionFactory {
-
- /**
- * The core pool size for default-constructed pools is {@value}.
- */
- public static final int DEFAULT_CORE_POOL_SIZE = 1;
-
- /**
- * The maximum pool size for default-constructed pools is {@value}.
- */
- public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
-
- /**
- * The keep-alive time for connections in default-constructed pools is {@value}
seconds.
- */
- public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
-
- /**
- * Permission for checking shutdown
- */
- private static final RuntimePermission shutdownPerm = new
RuntimePermission("modifyThread");
-
- /**
- * The factory that this pool uses to create new connections.
- */
- private final RepositoryConnectionFactory connectionFactory;
-
- /**
- * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
- */
- private final ReentrantLock mainLock = new ReentrantLock();
-
- /**
- * Wait condition to support awaitTermination
- */
- private final Condition termination = mainLock.newCondition();
-
- /**
- * Set containing all connections that are available for use.
- */
- @GuardedBy( "mainLock" )
- private final BlockingQueue<ConnectionWrapper> availableConnections = new
LinkedBlockingQueue<ConnectionWrapper>();
-
- /**
- * The connections that are currently in use.
- */
- @GuardedBy( "mainLock" )
- private final Set<ConnectionWrapper> inUseConnections = new
HashSet<ConnectionWrapper>();
-
- /**
- * Timeout in nanoseconds for idle connections waiting to be used. Threads use this
timeout only when there are more than
- * corePoolSize present. Otherwise they wait forever to be used.
- */
- private volatile long keepAliveTime;
-
- /**
- * The target pool size, updated only while holding mainLock, but volatile to allow
concurrent readability even during
- * updates.
- */
- @GuardedBy( "mainLock" )
- private volatile int corePoolSize;
-
- /**
- * Maximum pool size, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
- */
- @GuardedBy( "mainLock" )
- private volatile int maximumPoolSize;
-
- /**
- * Current pool size, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
- */
- @GuardedBy( "mainLock" )
- private volatile int poolSize;
-
- /**
- * Lifecycle state, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
- */
- @GuardedBy( "mainLock" )
- private volatile int runState;
-
- // Special values for runState
- /** Normal, not-shutdown mode */
- static final int RUNNING = 0;
- /** Controlled shutdown mode */
- static final int SHUTDOWN = 1;
- /** Immediate shutdown mode */
- static final int STOP = 2;
- /** Final state */
- static final int TERMINATED = 3;
-
- /**
- * Flag specifying whether a connection should be validated before returning it from
the {@link #getConnection()} method.
- */
- private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
-
- /**
- * The time in nanoseconds that ping should wait before timing out and failing.
- */
- private final AtomicLong pingTimeout = new AtomicLong(0);
-
- /**
- * The number of times an attempt to obtain a connection should fail with invalid
connections before throwing an exception.
- */
- private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
-
- private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
-
- private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
-
- private final Logger logger = Logger.getLogger(this.getClass());
-
- /**
- * Create the pool to use the supplied connection factory, which is typically a
{@link RepositorySource}. This constructor
- * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link
#DEFAULT_MAXIMUM_POOL_SIZE default maximum pool
- * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in
seconds)}.
- *
- * @param connectionFactory the factory for connections
- * @throws IllegalArgumentException if the connection factory is null or any of the
supplied arguments are invalid
- */
- public RepositoryConnectionPool( RepositoryConnectionFactory connectionFactory ) {
- this(connectionFactory, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE,
DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS,
- TimeUnit.SECONDS);
- }
-
- /**
- * Create the pool to use the supplied connection factory, which is typically a
{@link RepositorySource}.
- *
- * @param connectionFactory the factory for connections
- * @param corePoolSize the number of connections to keep in the pool, even if they
are idle.
- * @param maximumPoolSize the maximum number of connections to allow in the pool.
- * @param keepAliveTime when the number of connection is greater than the core, this
is the maximum time that excess idle
- * connections will be kept before terminating.
- * @param unit the time unit for the keepAliveTime argument.
- * @throws IllegalArgumentException if the connection factory is null or any of the
supplied arguments are invalid
- */
- public RepositoryConnectionPool( RepositoryConnectionFactory connectionFactory,
- int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit ) {
- ArgCheck.isNonNegative(corePoolSize, "corePoolSize");
- ArgCheck.isPositive(maximumPoolSize, "maximumPoolSize");
- ArgCheck.isNonNegative(keepAliveTime, "keepAliveTime");
- ArgCheck.isNotNull(connectionFactory, "repository connection
factory");
- if (maximumPoolSize < corePoolSize) {
- throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
- }
- this.connectionFactory = connectionFactory;
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.setPingTimeout(100, TimeUnit.MILLISECONDS);
- }
-
- public String getName() {
- return this.connectionFactory.getName();
- }
-
- // -------------------------------------------------
- // Property settings ...
- // -------------------------------------------------
-
- /**
- * @return validateConnectionBeforeUse
- */
- public boolean getValidateConnectionBeforeUse() {
- return this.validateConnectionBeforeUse.get();
- }
-
- /**
- * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the
specified value.
- */
- public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
- this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
- }
-
- /**
- * @return pingTimeout
- */
- public long getPingTimeoutInNanos() {
- return this.pingTimeout.get();
- }
-
- /**
- * @param pingTimeout the time to wait for a ping to complete
- * @param unit the time unit of the time argument
- */
- public void setPingTimeout( long pingTimeout,
- TimeUnit unit ) {
- ArgCheck.isNonNegative(pingTimeout, "time");
- this.pingTimeout.set(unit.toNanos(pingTimeout));
- }
-
- /**
- * @return maxFailedAttemptsBeforeError
- */
- public int getMaxFailedAttemptsBeforeError() {
- return this.maxFailedAttemptsBeforeError.get();
- }
-
- /**
- * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the
specified value.
- */
- public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
- this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
- }
-
- /**
- * Sets the time limit for which connections may remain idle before being closed. If
there are more than the core number of
- * connections currently in the pool, after waiting this amount of time without being
used, excess threads will be terminated.
- * This overrides any value set in the constructor.
- *
- * @param time the time to wait. A time value of zero will cause excess connections
to terminate immediately after being
- * returned.
- * @param unit the time unit of the time argument
- * @throws IllegalArgumentException if time less than zero
- * @see #getKeepAliveTime
- */
- public void setKeepAliveTime( long time,
- TimeUnit unit ) {
- ArgCheck.isNonNegative(time, "time");
- this.keepAliveTime = unit.toNanos(time);
- }
-
- /**
- * Returns the connection keep-alive time, which is the amount of time which
connections in excess of the core pool size may
- * remain idle before being closed.
- *
- * @param unit the desired time unit of the result
- * @return the time limit
- * @see #setKeepAliveTime
- */
- public long getKeepAliveTime( TimeUnit unit ) {
- assert unit != null;
- return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
- }
-
- /**
- * @return maximumPoolSize
- */
- public int getMaximumPoolSize() {
- return this.maximumPoolSize;
- }
-
- /**
- * Sets the maximum allowed number of connections. This overrides any value set in
the constructor. If the new value is
- * smaller than the current value, excess existing but unused connections will be
closed.
- *
- * @param maximumPoolSize the new maximum
- * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link
#getCorePoolSize() core pool size}
- * @see #getMaximumPoolSize
- */
- public void setMaximumPoolSize( int maximumPoolSize ) {
- ArgCheck.isPositive(maximumPoolSize, "maximum pool size");
- if (maximumPoolSize < corePoolSize) {
- throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
- }
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- int extra = this.maximumPoolSize - maximumPoolSize;
- this.maximumPoolSize = maximumPoolSize;
- if (extra > 0 && poolSize > maximumPoolSize) {
- // Drain the extra connections from those available ...
- drainUnusedConnections(extra);
- }
- } finally {
- mainLock.unlock();
- }
- }
-
- /**
- * Returns the core number of connections.
- *
- * @return the core number of connections
- * @see #setCorePoolSize(int)
- */
- public int getCorePoolSize() {
- return this.corePoolSize;
- }
-
- /**
- * Sets the core number of connections. This overrides any value set in the
constructor. If the new value is smaller than the
- * current value, excess existing and unused connections will be closed. If larger,
new connections will, if needed, be
- * created.
- *
- * @param corePoolSize the new core size
- * @throws RepositorySourceException if there was an error obtaining the new
connection
- * @throws InterruptedException if the thread was interrupted during the operation
- * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than
zero
- * @see #getCorePoolSize()
- */
- public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException,
InterruptedException {
- ArgCheck.isNonNegative(corePoolSize, "core pool size");
- if (maximumPoolSize < corePoolSize) {
- throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
- }
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- int extra = this.corePoolSize - corePoolSize;
- this.corePoolSize = corePoolSize;
- if (extra < 0) {
- // Add connections ...
- addConnectionsIfUnderCorePoolSize();
- } else if (extra > 0 && poolSize > corePoolSize) {
- // Drain the extra connections from those available ...
- drainUnusedConnections(extra);
- }
- } finally {
- mainLock.unlock();
- }
- }
-
- // -------------------------------------------------
- // Statistics ...
- // -------------------------------------------------
-
- /**
- * Returns the current number of connections in the pool, including those that are
checked out (in use) and those that are not
- * being used.
- *
- * @return the number of connections
- */
- public int getPoolSize() {
- return poolSize;
- }
-
- /**
- * Returns the approximate number of connections that are currently checked out from
the pool.
- *
- * @return the number of checked-out connections
- */
- public int getInUseCount() {
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- return this.inUseConnections.size();
- } finally {
- mainLock.unlock();
- }
- }
-
- /**
- * Get the total number of connections that have been created by this pool.
- *
- * @return the total number of connections created by this pool
- */
- public long getTotalConnectionsCreated() {
- return this.totalConnectionsCreated.get();
- }
-
- /**
- * Get the total number of times connections have been {@link #getConnection()}
used.
- *
- * @return the total number
- */
- public long getTotalConnectionsUsed() {
- return this.totalConnectionsUsed.get();
- }
-
- // -------------------------------------------------
- // State management methods ...
- // -------------------------------------------------
-
- /**
- * Starts a core connection, causing it to idly wait for use. This overrides the
default policy of starting core connections
- * only when they are {@link #getConnection() needed}. This method will return
<tt>false</tt> if all core connections have
- * already been started.
- *
- * @return true if a connection was started
- * @throws RepositorySourceException if there was an error obtaining the new
connection
- * @throws InterruptedException if the thread was interrupted during the operation
- */
- public boolean prestartCoreConnection() throws RepositorySourceException,
InterruptedException {
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- return addConnectionIfUnderCorePoolSize();
- } finally {
- mainLock.unlock();
- }
- }
-
- /**
- * Starts all core connections, causing them to idly wait for use. This overrides the
default policy of starting core
- * connections only when they are {@link #getConnection() needed}.
- *
- * @return the number of connections started.
- * @throws RepositorySourceException if there was an error obtaining the new
connection
- * @throws InterruptedException if the thread was interrupted during the operation
- */
- public int prestartAllCoreConnections() throws RepositorySourceException,
InterruptedException {
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- return addConnectionsIfUnderCorePoolSize();
- } finally {
- mainLock.unlock();
- }
- }
-
- /**
- * Initiates an orderly shutdown in which connections that are currently in use are
allowed to be used and closed as normal,
- * but no new connections will be created. Invocation has no additional effect if
already shut down.
- *
- * @throws SecurityException if a security manager exists and shutting down this pool
may manipulate threads that the caller
- * is not permitted to modify because it does not hold {@link
java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
- * or the security manager's <tt>checkAccess</tt> method
denies access.
- * @see #shutdownNow()
- */
- public void shutdown() {
- // Fail if caller doesn't have modifyThread permission. We
- // explicitly check permissions directly because we can't trust
- // implementations of SecurityManager to correctly override
- // the "check access" methods such that our documented
- // security policy is implemented.
- SecurityManager security = System.getSecurityManager();
- if (security != null)
java.security.AccessController.checkPermission(shutdownPerm);
-
- this.logger.debug("Shutting down repository connection pool for {0}",
getName());
- boolean fullyTerminated = false;
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- int state = this.runState;
- if (state == RUNNING) {
- // don't override shutdownNow
- this.runState = SHUTDOWN;
- }
-
- // Kill the maintenance thread ...
-
- // Remove and close all available connections ...
- if (!this.availableConnections.isEmpty()) {
- // Drain the extra connections from those available ...
- drainUnusedConnections(this.availableConnections.size());
- }
-
- // If there are no connections being used, trigger full termination now ...
- if (this.inUseConnections.isEmpty()) {
- fullyTerminated = true;
- this.logger.trace("Signalling termination of repository connection
pool for {0}", getName());
- runState = TERMINATED;
- termination.signalAll();
- this.logger.debug("Terminated repository connection pool for
{0}", getName());
- }
- // Otherwise the last connection that is closed will transition the runState
to TERMINATED ...
- } finally {
- mainLock.unlock();
- }
- if (fullyTerminated) terminated();
- }
-
- /**
- * Attempts to close all connections, including those connections currently in use,
and prevent the use of other connections.
- *
- * @throws SecurityException if a security manager exists and shutting down this pool
may manipulate threads that the caller
- * is not permitted to modify because it does not hold {@link
java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
- * or the security manager's <tt>checkAccess</tt> method
denies access.
- * @see #shutdown()
- */
- public void shutdownNow() {
- // Almost the same code as shutdown()
- SecurityManager security = System.getSecurityManager();
- if (security != null)
java.security.AccessController.checkPermission(shutdownPerm);
-
- this.logger.debug("Shutting down (immediately) repository connection pool
for {0}", getName());
- boolean fullyTerminated = false;
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- int state = this.runState;
- if (state != TERMINATED) {
- // don't override shutdownNow
- this.runState = STOP;
- }
-
- // Kill the maintenance thread ...
-
- // Remove and close all available connections ...
- if (!this.availableConnections.isEmpty()) {
- // Drain the extra connections from those available ...
- drainUnusedConnections(this.availableConnections.size());
- }
-
- // If there are connections being used, close them now ...
- if (!this.inUseConnections.isEmpty()) {
- for (ConnectionWrapper connectionInUse : this.inUseConnections) {
- try {
- this.logger.trace("Closing repository connection to
{0}", getName());
- connectionInUse.getOriginal().close();
- } catch (InterruptedException e) {
- // Ignore this ...
- }
- }
- this.poolSize -= this.inUseConnections.size();
- // The last connection that is closed will transition the runState to
TERMINATED ...
- } else {
- // There are no connections in use, so trigger full termination now ...
- fullyTerminated = true;
- this.logger.trace("Signalling termination of repository connection
pool for {0}", getName());
- runState = TERMINATED;
- termination.signalAll();
- this.logger.debug("Terminated repository connection pool for
{0}", getName());
- }
-
- } finally {
- mainLock.unlock();
- }
- if (fullyTerminated) terminated();
- }
-
- /**
- * Return whether this connection pool is running and is able to {@link
#getConnection() provide connections}. Note that this
- * method is effectively <code>!isShutdown()</code>.
- *
- * @return true if this pool is running, or false otherwise
- * @see #isShutdown()
- * @see #isTerminated()
- * @see #isTerminating()
- */
- public boolean isRunning() {
- return runState == RUNNING;
- }
-
- /**
- * Return whether this connection pool is in the process of shutting down or has
already been shut down. A result of
- * <code>true</code> signals that the pool may no longer be used. Note
that this method is effectively
- * <code>!isRunning()</code>.
- *
- * @return true if this pool has been shut down, or false otherwise
- * @see #isShutdown()
- * @see #isTerminated()
- * @see #isTerminating()
- */
- public boolean isShutdown() {
- return runState != RUNNING;
- }
-
- /**
- * Returns true if this pool is in the process of terminating after {@link
#shutdown()} or {@link #shutdownNow()} has been
- * called but has not completely terminated. This method may be useful for debugging.
A return of <tt>true</tt> reported a
- * sufficient period after shutdown may indicate that submitted tasks have ignored or
suppressed interruption, causing this
- * executor not to properly terminate.
- *
- * @return true if terminating but not yet terminated, or false otherwise
- * @see #isTerminated()
- */
- public boolean isTerminating() {
- return runState == STOP;
- }
-
- /**
- * Return true if this pool has completed its termination and no longer has any open
connections.
- *
- * @return true if terminated, or false otherwise
- * @see #isTerminating()
- */
- public boolean isTerminated() {
- return runState == TERMINATED;
- }
-
- /**
- * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to
wait until all connections in use at the
- * time those methods were called have been closed normally. This method accepts a
maximum time duration, after which it will
- * return even if all connections have not been closed.
- *
- * @param timeout the maximum time to wait for all connections to be closed and
returned to the pool
- * @param unit the time unit for <code>timeout</code>
- * @return true if the pool was terminated in the supplied time (or was already
terminated), or false if the timeout occurred
- * before all the connections were closed
- * @throws InterruptedException if the thread was interrupted
- */
- public boolean awaitTermination( long timeout,
- TimeUnit unit ) throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- for (;;) {
- // this.logger.debug("---> Run state = {}; condition = {}",
runState, termination);
- if (runState == TERMINATED) return true;
- if (nanos <= 0) return false;
- nanos = termination.awaitNanos(nanos);
- // this.logger.debug("---> Done waiting: run state = {};
condition = {}", runState, termination);
- }
- } finally {
- mainLock.unlock();
- }
- }
-
- /**
- * Method invoked when the pool has terminated. Default implementation does nothing.
Note: To properly nest multiple
- * overridings, subclasses should generally invoke
<tt>super.terminated</tt> within this method.
- */
- protected void terminated() {
- }
-
- /**
- * Invokes <tt>shutdown</tt> when this pool is no longer referenced.
- */
- @Override
- protected void finalize() {
- shutdown();
- }
-
- // -------------------------------------------------
- // Connection management methods ...
- // -------------------------------------------------
-
- /**
- * {@inheritDoc}
- */
- public RepositoryConnection getConnection() throws RepositorySourceException,
InterruptedException {
- int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
- ConnectionWrapper connection = null;
- // Do this until we get a good connection ...
- int attemptsRemaining = attemptsAllowed;
- while (connection == null && attemptsRemaining > 0) {
- --attemptsRemaining;
- ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- // If we're shutting down the pool, then just close the connection
...
- if (this.runState != RUNNING) {
- throw new
IllegalStateException(SpiI18n.repositoryConnectionPoolIsNotRunning.text());
- }
- // If there are fewer total connections than the core size ...
- if (this.poolSize < this.corePoolSize) {
- // Immediately create a wrapped connection and return it ...
- connection = newWrappedConnection();
- }
- // Peek to see if there is a connection available ...
- else if (this.availableConnections.peek() != null) {
- // There is, so take it and return it ...
- connection = this.availableConnections.take();
- }
- // There is no connection available. If there are fewer total connections
than the maximum size ...
- else if (this.poolSize < this.maximumPoolSize) {
- // Immediately create a wrapped connection and return it ...
- connection = newWrappedConnection();
- }
- if (connection != null) {
- this.inUseConnections.add(connection);
- }
- } finally {
- mainLock.unlock();
- }
- if (connection == null) {
- // There are not enough connections, so wait in line for the next
available connection ...
- this.logger.trace("Waiting for a repository connection from pool
{0}", getName());
- connection = this.availableConnections.take();
- mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (connection != null) {
- this.inUseConnections.add(connection);
- }
- } finally {
- mainLock.unlock();
- }
- this.logger.trace("Recieved a repository connection from pool
{0}", getName());
- }
- if (connection != null && this.validateConnectionBeforeUse.get()) {
- connection = validateConnection(connection);
- }
- }
- if (connection == null) {
- // We were unable to obtain a usable connection, so fail ...
- throw new
RepositorySourceException(SpiI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
- }
- this.totalConnectionsUsed.incrementAndGet();
- return connection;
- }
-
- /**
- * This method is automatically called by the {@link ConnectionWrapper} when it is
{@link ConnectionWrapper#close() closed}.
- *
- * @param wrapper the wrapper to the connection that is being returned to the pool
- */
- protected void returnConnection( ConnectionWrapper wrapper ) {
- assert wrapper != null;
- ConnectionWrapper wrapperToClose = null;
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- // Remove the connection from the in-use set ...
- boolean removed = this.inUseConnections.remove(wrapper);
- assert removed;
-
- // If we're shutting down the pool, then just close the connection ...
- if (this.runState != RUNNING) {
- wrapperToClose = wrapper;
- }
- // If there are more connections than the maximum size...
- else if (this.poolSize > this.maximumPoolSize) {
- // Immediately close this connection ...
- wrapperToClose = wrapper;
- }
- // Attempt to make the connection available (this should generally work,
unless there is an upper limit
- // to the number of available connections) ...
- else if (!this.availableConnections.offer(new
ConnectionWrapper(wrapper.getOriginal()))) {
- // The pool of available connection is full, so release it ...
- wrapperToClose = wrapper;
- }
- } finally {
- mainLock.unlock();
- }
- // Close the connection if we're supposed to (do it outside of the main
lock)...
- if (wrapperToClose != null) {
- try {
- closeConnection(wrapper);
- } catch (InterruptedException e) {
- // catch this, as there's not much we can do and the caller
doesn't care or know how to handle it
- this.logger.trace(e, "Interrupted while closing a repository
connection");
- }
- }
- }
-
- /**
- * Validate the supplied connection, returning the connection if valid or null if the
connection is not valid.
- *
- * @param connection the connection to be validated; may not be null
- * @return the validated connection, or null if the connection did not validate and
was removed from the pool
- */
- protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) {
- assert connection != null;
- ConnectionWrapper invalidConnection = null;
- try {
- if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
- invalidConnection = connection;
- }
- } catch (InterruptedException e) {
- // catch this, as there's not much we can do and the caller doesn't
care or know how to handle it
- this.logger.trace(e, "Interrupted while pinging a repository
connection");
- invalidConnection = connection;
- } finally {
- if (invalidConnection != null) {
- connection = null;
- returnConnection(invalidConnection);
- }
- }
- return connection;
- }
-
- /**
- * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does
not check whether creating the new
- * connection would violate the {@link #maximumPoolSize maximum pool size} nor does
it add the new connection to the
- * {@link #availableConnections available connections} (as the caller may want it
immediately), but it does increment the
- * {@link #poolSize pool size}.
- *
- * @return the connection wrapper with a new connection
- * @throws RepositorySourceException if there was an error obtaining the new
connection
- * @throws InterruptedException if the thread was interrupted during the operation
- */
- @GuardedBy( "mainLock" )
- protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException,
InterruptedException {
- RepositoryConnection connection = this.connectionFactory.getConnection();
- ++this.poolSize;
- this.totalConnectionsCreated.incrementAndGet();
- return new ConnectionWrapper(connection);
- }
-
- /**
- * Close a connection that is in the pool but no longer in the {@link
#availableConnections available connections}. This
- * method does decrement the {@link #poolSize pool size}.
- *
- * @param wrapper the wrapper for the connection to be closed
- * @throws InterruptedException if the thread was interrupted during the operation
- */
- protected void closeConnection( ConnectionWrapper wrapper ) throws
InterruptedException {
- assert wrapper != null;
- RepositoryConnection original = wrapper.getOriginal();
- assert original != null;
- try {
- this.logger.debug("Closing repository connection to {0}",
getName());
- original.close();
- } finally {
- final ReentrantLock mainLock = this.mainLock;
- try {
- mainLock.lock();
- // No matter what reduce the pool size count
- --this.poolSize;
- // And if shutting down and this was the last connection being used...
- if (this.runState == SHUTDOWN && this.poolSize <= 0) {
- // then signal anybody that has called
"awaitTermination(...)"
- this.logger.trace("Signalling termination of repository
connection pool for {0}", getName());
- this.runState = TERMINATED;
- this.termination.signalAll();
- this.logger.trace("Terminated repository connection pool for
{0}", getName());
-
- // fall through to call terminate() outside of lock.
- }
- } finally {
- mainLock.unlock();
- }
- }
- }
-
- @GuardedBy( "mainLock" )
- protected int drainUnusedConnections( int count ) {
- if (count <= 0) return 0;
- this.logger.trace("Draining up to {0} unused repository connections to
{1}", count, getName());
- // Drain the extra connections from those available ...
- Collection<ConnectionWrapper> extraConnections = new
LinkedList<ConnectionWrapper>();
- this.availableConnections.drainTo(extraConnections, count);
- for (ConnectionWrapper connection : extraConnections) {
- try {
- this.logger.trace("Closing repository connection to {0}",
getName());
- connection.getOriginal().close();
- } catch (InterruptedException e) {
- // Ignore this ...
- }
- }
- int numClosed = extraConnections.size();
- this.poolSize -= numClosed;
- this.logger.trace("Drained {0} unused connections", numClosed);
- return numClosed;
- }
-
- @GuardedBy( "mainLock" )
- protected boolean addConnectionIfUnderCorePoolSize() throws
RepositorySourceException, InterruptedException {
- // Add connection ...
- if (this.poolSize < this.corePoolSize) {
- this.availableConnections.offer(newWrappedConnection());
- this.logger.trace("Added connection to {0} in undersized pool",
getName());
- return true;
- }
- return false;
- }
-
- @GuardedBy( "mainLock" )
- protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException,
InterruptedException {
- // Add connections ...
- int n = 0;
- while (this.poolSize < this.corePoolSize) {
- this.availableConnections.offer(newWrappedConnection());
- ++n;
- }
- this.logger.trace("Added {0} connection(s) to {1} in undersized pool",
n, getName());
- return n;
- }
-
- protected class ConnectionWrapper implements RepositoryConnection {
-
- private final RepositoryConnection original;
- private final long timeCreated;
- private long lastUsed;
- private boolean closed = false;
-
- protected ConnectionWrapper( RepositoryConnection connection ) {
- assert connection != null;
- this.original = connection;
- this.timeCreated = System.currentTimeMillis();
- }
-
- /**
- * @return original
- */
- protected RepositoryConnection getOriginal() {
- return this.original;
- }
-
- /**
- * @return lastUsed
- */
- public long getTimeLastUsed() {
- return this.lastUsed;
- }
-
- /**
- * @return timeCreated
- */
- public long getTimeCreated() {
- return this.timeCreated;
- }
-
- /**
- * {@inheritDoc}
- */
- public String getSourceName() {
- return this.original.getSourceName();
- }
-
- /**
- * {@inheritDoc}
- */
- public XAResource getXAResource() {
- if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
- return this.original.getXAResource();
- }
-
- /**
- * {@inheritDoc}
- */
- public CachePolicy getDefaultCachePolicy() {
- if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
- return this.original.getDefaultCachePolicy();
- }
-
- /**
- * {@inheritDoc}
- */
- public void execute( ExecutionEnvironment env,
- GraphCommand... commands ) throws RepositorySourceException,
InterruptedException {
- if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
- this.original.execute(env, commands);
- }
-
- /**
- * {@inheritDoc}
- */
- public boolean ping( long time,
- TimeUnit unit ) throws InterruptedException {
- if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
- return this.original.ping(time, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- public void close() throws InterruptedException {
- if (!closed) {
- this.lastUsed = System.currentTimeMillis();
- this.original.close();
- this.closed = true;
- returnConnection(this);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public void setListener( RepositorySourceListener listener ) {
- if (!closed) this.original.setListener(listener);
- }
-
- }
-
-}
Modified:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperation.java
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperation.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperation.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -39,12 +39,14 @@
/**
* Run the operation using the supplied connection.
*
+ * @param env the environment in which this operation is executing; may not be null
* @param connection the connection; may not be null
* @return the result of the operation
* @throws RepositorySourceException if there is a problem with the connection
* @throws InterruptedException if this thread was interrupted
*/
- T run( RepositoryConnection connection ) throws RepositorySourceException,
InterruptedException;
+ T run( ExecutionEnvironment env,
+ RepositoryConnection connection ) throws RepositorySourceException,
InterruptedException;
/**
* A factory interface for creating repository operations.
Modified:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperations.java
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperations.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperations.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -37,17 +37,20 @@
* Call the supplied operation, using a connection from this pool.
*
* @param <T> the return type for the operation
+ * @param env the environment in which the operation is to execute; may not be null
* @param connectionFactory the factory for the connection to use
* @param operation the operation to be run using a new connection obtained from the
factory
* @return the results from the operation
* @throws RepositorySourceException if there was an error obtaining the new
connection
* @throws InterruptedException if the thread was interrupted during the operation
* @throws IllegalArgumentException if the operation is null
- * @see #createCallable(RepositoryConnectionFactory, RepositoryOperation)
- * @see #createCallables(RepositoryConnectionFactory, Iterable)
- * @see #createCallables(RepositoryConnectionFactory, RepositoryOperation...)
+ * @see #createCallable(ExecutionEnvironment, RepositoryConnectionFactory,
RepositoryOperation)
+ * @see #createCallables(ExecutionEnvironment, RepositoryConnectionFactory,
Iterable)
+ * @see #createCallables(ExecutionEnvironment, RepositoryConnectionFactory,
RepositoryOperation...)
*/
- public static <T> T call( RepositoryConnectionFactory connectionFactory,
RepositoryOperation<T> operation ) throws RepositorySourceException,
InterruptedException {
+ public static <T> T call( ExecutionEnvironment env,
+ RepositoryConnectionFactory connectionFactory,
+ RepositoryOperation<T> operation ) throws
RepositorySourceException, InterruptedException {
ArgCheck.isNotNull(operation, "repository operation");
// Get a connection ...
T result = null;
@@ -55,7 +58,7 @@
RepositoryConnection conn = connectionFactory.getConnection();
try {
// And run the client with the connection ...
- result = operation.run(conn);
+ result = operation.run(env, conn);
} finally {
conn.close();
}
@@ -68,14 +71,17 @@
* supplied factory.
*
* @param <T> the return type for the operation
+ * @param env the environment in which the operation is to execute; may not be null
* @param connectionFactory the factory for the connection to use
* @param operation the operation to be run using a new connection obtained from the
factory
* @return the callable
- * @see #call(RepositoryConnectionFactory, RepositoryOperation)
- * @see #createCallables(RepositoryConnectionFactory, Iterable)
- * @see #createCallables(RepositoryConnectionFactory, RepositoryOperation...)
+ * @see #call(ExecutionEnvironment, RepositoryConnectionFactory,
RepositoryOperation)
+ * @see #createCallables(ExecutionEnvironment, RepositoryConnectionFactory,
Iterable)
+ * @see #createCallables(ExecutionEnvironment, RepositoryConnectionFactory,
RepositoryOperation...)
*/
- public static <T> Callable<T> createCallable( final
RepositoryConnectionFactory connectionFactory, final RepositoryOperation<T>
operation ) {
+ public static <T> Callable<T> createCallable( final ExecutionEnvironment
env,
+ final RepositoryConnectionFactory
connectionFactory,
+ final RepositoryOperation<T>
operation ) {
ArgCheck.isNotNull(operation, "repository operation");
return new Callable<T>() {
@@ -86,7 +92,7 @@
* @throws Exception
*/
public T call() throws Exception {
- return RepositoryOperations.call(connectionFactory, operation);
+ return RepositoryOperations.call(env, connectionFactory, operation);
}
};
}
@@ -96,17 +102,20 @@
* this pool.
*
* @param <T> the return type for the operations
+ * @param env the environment in which the operation is to execute; may not be null
* @param connectionFactory the factory for the connection to use
* @param operations the operations to be run using connections from the factory
* @return the collection of callables
- * @see #call(RepositoryConnectionFactory, RepositoryOperation)
- * @see #createCallable(RepositoryConnectionFactory, RepositoryOperation)
- * @see #createCallables(RepositoryConnectionFactory, Iterable)
+ * @see #call(ExecutionEnvironment, RepositoryConnectionFactory,
RepositoryOperation)
+ * @see #createCallable(ExecutionEnvironment, RepositoryConnectionFactory,
RepositoryOperation)
+ * @see #createCallables(ExecutionEnvironment, RepositoryConnectionFactory,
Iterable)
*/
- public static <T> List<Callable<T>> createCallables( final
RepositoryConnectionFactory connectionFactory, final RepositoryOperation<T>...
operations ) {
+ public static <T> List<Callable<T>> createCallables( final
ExecutionEnvironment env,
+ final
RepositoryConnectionFactory connectionFactory,
+ final
RepositoryOperation<T>... operations ) {
List<Callable<T>> callables = new
ArrayList<Callable<T>>();
for (final RepositoryOperation<T> operation : operations) {
- callables.add(createCallable(connectionFactory, operation));
+ callables.add(createCallable(env, connectionFactory, operation));
}
return callables;
}
@@ -116,17 +125,20 @@
* this pool.
*
* @param <T> the return type for the operations
+ * @param env the environment in which the operation is to execute; may not be null
* @param connectionFactory the factory for the connection to use
* @param operations the operations to be run using connections from the factory
* @return the collection of callables
- * @see #call(RepositoryConnectionFactory, RepositoryOperation)
- * @see #createCallable(RepositoryConnectionFactory, RepositoryOperation)
- * @see #createCallables(RepositoryConnectionFactory, RepositoryOperation...)
+ * @see #call(ExecutionEnvironment, RepositoryConnectionFactory,
RepositoryOperation)
+ * @see #createCallable(ExecutionEnvironment, RepositoryConnectionFactory,
RepositoryOperation)
+ * @see #createCallables(ExecutionEnvironment, RepositoryConnectionFactory,
RepositoryOperation...)
*/
- public static <T> List<Callable<T>> createCallables( final
RepositoryConnectionFactory connectionFactory,
Iterable<RepositoryOperation<T>> operations ) {
+ public static <T> List<Callable<T>> createCallables( final
ExecutionEnvironment env,
+ final
RepositoryConnectionFactory connectionFactory,
+
Iterable<RepositoryOperation<T>> operations ) {
List<Callable<T>> callables = new
ArrayList<Callable<T>>();
for (final RepositoryOperation<T> operation : operations) {
- callables.add(createCallable(connectionFactory, operation));
+ callables.add(createCallable(env, connectionFactory, operation));
}
return callables;
}
@@ -136,18 +148,21 @@
* this pool.
*
* @param <T> the return type for the operations
+ * @param env the environment in which the operation is to execute; may not be null
* @param connectionFactory the factory for the connection to use
* @param operations the operations to be run using connections from the factory
* @return the collection of callables
- * @see #call(RepositoryConnectionFactory, RepositoryOperation)
- * @see #createCallable(RepositoryConnectionFactory, RepositoryOperation)
- * @see #createCallables(RepositoryConnectionFactory, RepositoryOperation...)
+ * @see #call(ExecutionEnvironment, RepositoryConnectionFactory,
RepositoryOperation)
+ * @see #createCallable(ExecutionEnvironment, RepositoryConnectionFactory,
RepositoryOperation)
+ * @see #createCallables(ExecutionEnvironment, RepositoryConnectionFactory,
RepositoryOperation...)
*/
- public static <T> List<Callable<T>> createCallables( final
RepositoryConnectionFactory connectionFactory,
Iterator<RepositoryOperation<T>> operations ) {
+ public static <T> List<Callable<T>> createCallables( final
ExecutionEnvironment env,
+ final
RepositoryConnectionFactory connectionFactory,
+
Iterator<RepositoryOperation<T>> operations ) {
List<Callable<T>> callables = new
ArrayList<Callable<T>>();
while (operations.hasNext()) {
final RepositoryOperation<T> operation = operations.next();
- callables.add(createCallable(connectionFactory, operation));
+ callables.add(createCallable(env, connectionFactory, operation));
}
return callables;
}
Modified: trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/impl/PathValueFactory.java
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/impl/PathValueFactory.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/impl/PathValueFactory.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -321,8 +321,30 @@
/**
* {@inheritDoc}
+ *
+ * @see org.jboss.dna.spi.graph.PathFactory#create(org.jboss.dna.spi.graph.Path,
org.jboss.dna.spi.graph.Path)
*/
public Path create( Path parentPath,
+ Path childPath ) {
+ ArgCheck.isNotNull(parentPath, "parent path");
+ ArgCheck.isNotNull(childPath, "child path");
+ if (childPath.size() == 0) return parentPath;
+ if (parentPath.size() == 0) {
+ // Just need to return the child path, but it must be absolute if the parent
is ...
+ if (childPath.isAbsolute() == parentPath.isAbsolute()) return childPath;
+ // They aren't the same absoluteness, so create a new one ...
+ return new BasicPath(childPath.getSegmentsList(), parentPath.isAbsolute());
+ }
+ List<Segment> segments = new ArrayList<Segment>(parentPath.size() +
childPath.size());
+ segments.addAll(parentPath.getSegmentsList());
+ segments.addAll(childPath.getSegmentsList());
+ return new BasicPath(segments, parentPath.isAbsolute());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Path create( Path parentPath,
Name segmentName,
int index ) {
ArgCheck.isNotNull(parentPath, "parent path");
Modified:
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/impl/StandardValueFactories.java
===================================================================
---
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/impl/StandardValueFactories.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-spi/src/main/java/org/jboss/dna/spi/graph/impl/StandardValueFactories.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -23,9 +23,7 @@
import java.math.BigDecimal;
import java.net.URI;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import net.jcip.annotations.Immutable;
import org.jboss.dna.common.text.TextDecoder;
@@ -38,7 +36,6 @@
import org.jboss.dna.spi.graph.PathFactory;
import org.jboss.dna.spi.graph.PropertyType;
import org.jboss.dna.spi.graph.Reference;
-import org.jboss.dna.spi.graph.ValueFactories;
import org.jboss.dna.spi.graph.ValueFactory;
/**
@@ -47,7 +44,7 @@
* @author Randall Hauch
*/
@Immutable
-public class StandardValueFactories implements ValueFactories {
+public class StandardValueFactories extends AbstractValueFactories {
// This class is implemented with separate members for each factory so that the
typical usage is optimized.
private final ValueFactory<String> stringFactory;
@@ -62,7 +59,6 @@
private final ValueFactory<Reference> referenceFactory;
private final ValueFactory<URI> uriFactory;
private final ValueFactory<Object> objectFactory;
- private final Map<PropertyType, ValueFactory<?>> factories;
private final NamespaceRegistry namespaceRegistry;
private final TextDecoder decoder;
@@ -85,10 +81,13 @@
* @param decoder the decoder that should be used; if null, the {@link
ValueFactory#DEFAULT_DECODER default decoder} is used.
* @param encoder the encoder that should be used; if null, the {@link
ValueFactory#DEFAULT_ENCODER default encoder} is used.
* @param extraFactories any extra factories that should be used; any factory will
override the standard factories based upon
- * the {@link ValueFactory#getPropertyType() factory's property type}.
+ * the {@link ValueFactory#getPropertyType() factory's property type}.
* @throws IllegalArgumentException if the namespace registry is null
*/
- public StandardValueFactories( NamespaceRegistry namespaceRegistry, TextDecoder
decoder, TextEncoder encoder, ValueFactory<?>... extraFactories ) {
+ public StandardValueFactories( NamespaceRegistry namespaceRegistry,
+ TextDecoder decoder,
+ TextEncoder encoder,
+ ValueFactory<?>... extraFactories ) {
ArgCheck.isNotNull(namespaceRegistry, "namespaceRegistry");
this.namespaceRegistry = namespaceRegistry;
this.decoder = decoder != null ? decoder : ValueFactory.DEFAULT_DECODER;
@@ -109,18 +108,18 @@
this.decimalFactory = getFactory(factories, new DecimalValueFactory(this.decoder,
this.stringFactory));
this.doubleFactory = getFactory(factories, new DoubleValueFactory(this.decoder,
this.stringFactory));
this.longFactory = getFactory(factories, new LongValueFactory(this.decoder,
this.stringFactory));
- this.nameFactory = (NameFactory)getFactory(factories, new
NameValueFactory(this.namespaceRegistry, this.decoder, this.stringFactory));
- this.pathFactory = (PathFactory)getFactory(factories, new
PathValueFactory(this.decoder, this.stringFactory, this.nameFactory));
+ this.nameFactory = (NameFactory)getFactory(factories, new
NameValueFactory(this.namespaceRegistry, this.decoder,
+
this.stringFactory));
+ this.pathFactory = (PathFactory)getFactory(factories, new
PathValueFactory(this.decoder, this.stringFactory,
+
this.nameFactory));
this.referenceFactory = getFactory(factories, new
UuidReferenceValueFactory(this.decoder, this.stringFactory));
this.uriFactory = getFactory(factories, new
UriValueFactory(this.namespaceRegistry, this.decoder, this.stringFactory));
this.objectFactory = getFactory(factories, new ObjectValueFactory(this.decoder,
this.stringFactory, this.binaryFactory));
-
- // Wrap the factories with an unmodifiable ...
- this.factories = Collections.unmodifiableMap(factories);
}
@SuppressWarnings( "unchecked" )
- private static <T> ValueFactory<T> getFactory( Map<PropertyType,
ValueFactory<?>> factories, ValueFactory<T> defaultFactory ) {
+ private static <T> ValueFactory<T> getFactory( Map<PropertyType,
ValueFactory<?>> factories,
+ ValueFactory<T> defaultFactory )
{
PropertyType type = defaultFactory.getPropertyType();
ValueFactory<?> factory = factories.get(type);
if (factory == null) {
@@ -145,13 +144,6 @@
}
/**
- * @return factories
- */
- public Map<PropertyType, ValueFactory<?>> getMapOfValueFactories() {
- return this.factories;
- }
-
- /**
* {@inheritDoc}
*/
public ValueFactory<Binary> getBinaryFactory() {
@@ -235,29 +227,4 @@
return this.objectFactory;
}
- /**
- * {@inheritDoc}
- */
- public Iterator<ValueFactory<?>> iterator() {
- return this.factories.values().iterator();
- }
-
- /**
- * {@inheritDoc}
- */
- public ValueFactory<?> getValueFactory( PropertyType type ) {
- ArgCheck.isNotNull(type, "type");
- return this.factories.get(type);
- }
-
- /**
- * {@inheritDoc}
- */
- public ValueFactory<?> getValueFactory( Object prototype ) {
- ArgCheck.isNotNull(prototype, "prototype");
- PropertyType inferredType = PropertyType.discoverType(prototype);
- assert inferredType != null;
- return this.factories.get(inferredType);
- }
-
}
Copied:
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPoolTest.java
(from rev 307,
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java)
===================================================================
---
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPoolTest.java
(rev 0)
+++
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPoolTest.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -0,0 +1,218 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.connection;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static
org.jboss.dna.spi.graph.connection.RepositorySourceLoadHarness.runLoadTest;
+import static org.junit.Assert.assertThat;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author Randall Hauch
+ */
+public class BasicRepositoryConnectionPoolTest {
+
+ private BasicRepositoryConnectionPool pool;
+ private TimeDelayingRepositorySource repositorySource;
+ private ExecutionEnvironment env;
+
+ @Before
+ public void beforeEach() {
+ this.repositorySource = new TimeDelayingRepositorySource("source 1");
+ this.pool = new BasicRepositoryConnectionPool(this.repositorySource, 1, 1, 100,
TimeUnit.SECONDS);
+ this.env = null;
+ }
+
+ @After
+ public void afterEach() throws Exception {
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void shouldBeCreatedInRunningState() {
+ assertThat(pool.isShutdown(), is(false));
+ assertThat(pool.isTerminating(), is(false));
+ assertThat(pool.isTerminated(), is(false));
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+ }
+
+ @Test
+ public void shouldShutdownWhenNoConnectionsWereCreatedOrUsed() throws
InterruptedException {
+ assertThat(pool.isShutdown(), is(false));
+ assertThat(pool.isTerminating(), is(false));
+ assertThat(pool.isTerminated(), is(false));
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+
+ for (int i = 0; i != 4; ++i) {
+ pool.shutdown();
+ assertThat(pool.isShutdown() || pool.isTerminating() || pool.isTerminated(),
is(true));
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ assertThat(pool.isTerminated(), is(true));
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+ }
+ }
+
+ @Test
+ public void shouldCreateConnectionAndRecoverWhenClosed() throws
RepositorySourceException, InterruptedException {
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+
+ RepositoryConnection conn = pool.getConnection();
+ assertThat(conn, is(notNullValue()));
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+
+ conn.close();
+
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+ }
+
+ @Test
+ public void shouldAllowShutdownToBeCalledMultipleTimesEvenWhenShutdown()
+ throws RepositorySourceException, InterruptedException {
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+
+ RepositoryConnection conn = pool.getConnection();
+ assertThat(conn, is(notNullValue()));
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+
+ conn.close();
+
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+
+ pool.shutdown();
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ assertThat(pool.isTerminated(), is(true));
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ }
+
+ @Test( expected = IllegalStateException.class )
+ public void shouldNotCreateConnectionIfPoolIsNotRunning() throws
RepositorySourceException, InterruptedException {
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ assertThat(pool.isTerminated(), is(true));
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+ pool.getConnection(); // this should fail with illegal state
+ }
+
+ @Test
+ public void shouldAllowConnectionsToBeClosedMoreThanOnceWithNoIllEffects()
+ throws RepositorySourceException, InterruptedException {
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+
+ RepositoryConnection conn = pool.getConnection();
+ assertThat(conn, is(notNullValue()));
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+
+ conn.close();
+ conn.close();
+ }
+
+ @Test
+ public void shouldBlockClientsWhenNotEnoughConnections() throws Exception {
+ int numConnectionsInPool = 1;
+ int numClients = 2;
+ BasicRepositoryConnectionPool pool = new
BasicRepositoryConnectionPool(repositorySource);
+ pool.setCorePoolSize(numConnectionsInPool);
+ pool.setMaximumPoolSize(numConnectionsInPool);
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(10);
+ runLoadTest(env, pool, numClients, 100, TimeUnit.MILLISECONDS,
operationFactory);
+ pool.shutdown();
+ pool.awaitTermination(4, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void shouldLimitClientsToRunSequentiallyWithOneConnectionInPool() throws
Exception {
+ int numConnectionsInPool = 1;
+ int numClients = 3;
+ BasicRepositoryConnectionPool pool = new
BasicRepositoryConnectionPool(repositorySource);
+ pool.setCorePoolSize(numConnectionsInPool);
+ pool.setMaximumPoolSize(numConnectionsInPool);
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(10);
+ runLoadTest(env, pool, numClients, 100, TimeUnit.MILLISECONDS,
operationFactory);
+ pool.shutdown();
+ pool.awaitTermination(4, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void shouldClientsToRunConncurrentlyWithTwoConnectionsInPool() throws
Exception {
+ int numConnectionsInPool = 2;
+ int numClients = 10;
+ BasicRepositoryConnectionPool pool = new
BasicRepositoryConnectionPool(repositorySource);
+ pool.setCorePoolSize(numConnectionsInPool);
+ pool.setMaximumPoolSize(numConnectionsInPool);
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(10);
+ runLoadTest(env, pool, numClients, 100, TimeUnit.MILLISECONDS,
operationFactory);
+ pool.shutdown();
+ pool.awaitTermination(4, TimeUnit.SECONDS);
+ }
+
+ @Ignore( "doesn't run on hudson" )
+ @Test
+ public void shouldClientsToRunConncurrentlyWithMultipleConnectionInPool() throws
Exception {
+ int numConnectionsInPool = 10;
+ int numClients = 50;
+ BasicRepositoryConnectionPool pool = new
BasicRepositoryConnectionPool(repositorySource);
+ pool.setCorePoolSize(numConnectionsInPool);
+ pool.setMaximumPoolSize(numConnectionsInPool);
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(20);
+ List<Future<Integer>> results = runLoadTest(env, pool, numClients,
200, TimeUnit.MILLISECONDS, operationFactory);
+ int total = 0;
+ for (Future<Integer> result : results) {
+ assertThat(result.isDone(), is(true));
+ if (result.isDone()) total += result.get();
+ }
+ assertThat(total, is(20 * numClients));
+ pool.shutdown();
+ pool.awaitTermination(4, TimeUnit.SECONDS);
+ }
+
+}
Property changes on:
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/BasicRepositoryConnectionPoolTest.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Deleted:
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java
===================================================================
---
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -1,222 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.dna.spi.graph.connection;
-
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.core.IsNull.notNullValue;
-import static
org.jboss.dna.spi.graph.connection.RepositorySourceLoadHarness.runLoadTest;
-import static org.junit.Assert.assertThat;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * @author Randall Hauch
- */
-public class RepositoryConnectionPoolTest {
-
- private RepositoryConnectionPool pool;
- private TimeDelayingRepositorySource repositorySource;
- private ExecutionEnvironment env;
-
- @Before
- public void beforeEach() {
- this.repositorySource = new TimeDelayingRepositorySource("source 1");
- this.pool = new RepositoryConnectionPool(this.repositorySource, 1, 1, 100,
TimeUnit.SECONDS);
- this.env = null;
- }
-
- @After
- public void afterEach() throws Exception {
- pool.shutdown();
- pool.awaitTermination(2, TimeUnit.SECONDS);
- }
-
- @Test
- public void shouldBeCreatedInRunningState() {
- assertThat(pool.isShutdown(), is(false));
- assertThat(pool.isTerminating(), is(false));
- assertThat(pool.isTerminated(), is(false));
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
- }
-
- @Test
- public void shouldShutdownWhenNoConnectionsWereCreatedOrUsed() throws
InterruptedException {
- assertThat(pool.isShutdown(), is(false));
- assertThat(pool.isTerminating(), is(false));
- assertThat(pool.isTerminated(), is(false));
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
-
- for (int i = 0; i != 4; ++i) {
- pool.shutdown();
- assertThat(pool.isShutdown() || pool.isTerminating() || pool.isTerminated(),
is(true));
- pool.awaitTermination(2, TimeUnit.SECONDS);
- assertThat(pool.isTerminated(), is(true));
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
- }
- }
-
- @Test
- public void shouldCreateConnectionAndRecoverWhenClosed() throws
RepositorySourceException, InterruptedException {
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
-
- RepositoryConnection conn = pool.getConnection();
- assertThat(conn, is(notNullValue()));
- assertThat(pool.getTotalConnectionsCreated(), is(1l));
- assertThat(pool.getTotalConnectionsUsed(), is(1l));
- assertThat(pool.getPoolSize(), is(1));
-
- conn.close();
-
- assertThat(pool.getTotalConnectionsCreated(), is(1l));
- assertThat(pool.getTotalConnectionsUsed(), is(1l));
- assertThat(pool.getPoolSize(), is(1));
- }
-
- @Test
- public void shouldAllowShutdownToBeCalledMultipleTimesEvenWhenShutdown()
- throws RepositorySourceException, InterruptedException {
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
-
- RepositoryConnection conn = pool.getConnection();
- assertThat(conn, is(notNullValue()));
- assertThat(pool.getTotalConnectionsCreated(), is(1l));
- assertThat(pool.getTotalConnectionsUsed(), is(1l));
- assertThat(pool.getPoolSize(), is(1));
-
- conn.close();
-
- assertThat(pool.getTotalConnectionsCreated(), is(1l));
- assertThat(pool.getTotalConnectionsUsed(), is(1l));
- assertThat(pool.getPoolSize(), is(1));
-
- pool.shutdown();
- pool.shutdown();
- pool.awaitTermination(2, TimeUnit.SECONDS);
- assertThat(pool.isTerminated(), is(true));
- assertThat(pool.getTotalConnectionsCreated(), is(1l));
- assertThat(pool.getTotalConnectionsUsed(), is(1l));
- pool.shutdown();
- pool.awaitTermination(2, TimeUnit.SECONDS);
- pool.shutdown();
- pool.awaitTermination(2, TimeUnit.SECONDS);
- }
-
- @Test( expected = IllegalStateException.class )
- public void shouldNotCreateConnectionIfPoolIsNotRunning() throws
RepositorySourceException, InterruptedException {
- pool.shutdown();
- pool.awaitTermination(2, TimeUnit.SECONDS);
- assertThat(pool.isTerminated(), is(true));
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
- pool.getConnection(); // this should fail with illegal state
- }
-
- @Test
- public void shouldAllowConnectionsToBeClosedMoreThanOnceWithNoIllEffects()
- throws RepositorySourceException, InterruptedException {
- assertThat(pool.getTotalConnectionsCreated(), is(0l));
- assertThat(pool.getTotalConnectionsUsed(), is(0l));
-
- RepositoryConnection conn = pool.getConnection();
- assertThat(conn, is(notNullValue()));
- assertThat(pool.getTotalConnectionsCreated(), is(1l));
- assertThat(pool.getTotalConnectionsUsed(), is(1l));
- assertThat(pool.getPoolSize(), is(1));
-
- conn.close();
- conn.close();
- }
-
- @Test
- public void shouldBlockClientsWhenNotEnoughConnections() throws Exception {
- int numConnectionsInPool = 1;
- int numClients = 2;
- RepositoryConnectionPool pool = new RepositoryConnectionPool(repositorySource);
- pool.setCorePoolSize(numConnectionsInPool);
- pool.setMaximumPoolSize(numConnectionsInPool);
- RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(env,
-
10);
- runLoadTest(pool, numClients, 100, TimeUnit.MILLISECONDS, operationFactory);
- pool.shutdown();
- pool.awaitTermination(4, TimeUnit.SECONDS);
- }
-
- @Test
- public void shouldLimitClientsToRunSequentiallyWithOneConnectionInPool() throws
Exception {
- int numConnectionsInPool = 1;
- int numClients = 3;
- RepositoryConnectionPool pool = new RepositoryConnectionPool(repositorySource);
- pool.setCorePoolSize(numConnectionsInPool);
- pool.setMaximumPoolSize(numConnectionsInPool);
- RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(env,
-
10);
- runLoadTest(pool, numClients, 100, TimeUnit.MILLISECONDS, operationFactory);
- pool.shutdown();
- pool.awaitTermination(4, TimeUnit.SECONDS);
- }
-
- @Test
- public void shouldClientsToRunConncurrentlyWithTwoConnectionsInPool() throws
Exception {
- int numConnectionsInPool = 2;
- int numClients = 10;
- RepositoryConnectionPool pool = new RepositoryConnectionPool(repositorySource);
- pool.setCorePoolSize(numConnectionsInPool);
- pool.setMaximumPoolSize(numConnectionsInPool);
- RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(env,
-
10);
- runLoadTest(pool, numClients, 100, TimeUnit.MILLISECONDS, operationFactory);
- pool.shutdown();
- pool.awaitTermination(4, TimeUnit.SECONDS);
- }
-
- @Ignore( "doesn't run on hudson" )
- @Test
- public void shouldClientsToRunConncurrentlyWithMultipleConnectionInPool() throws
Exception {
- int numConnectionsInPool = 10;
- int numClients = 50;
- RepositoryConnectionPool pool = new RepositoryConnectionPool(repositorySource);
- pool.setCorePoolSize(numConnectionsInPool);
- pool.setMaximumPoolSize(numConnectionsInPool);
- RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(env,
-
20);
- List<Future<Integer>> results = runLoadTest(pool, numClients, 200,
TimeUnit.MILLISECONDS, operationFactory);
- int total = 0;
- for (Future<Integer> result : results) {
- assertThat(result.isDone(), is(true));
- if (result.isDone()) total += result.get();
- }
- assertThat(total, is(20 * numClients));
- pool.shutdown();
- pool.awaitTermination(4, TimeUnit.SECONDS);
- }
-
-}
Modified:
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositorySourceLoadHarness.java
===================================================================
---
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositorySourceLoadHarness.java 2008-06-30
21:34:41 UTC (rev 319)
+++
trunk/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositorySourceLoadHarness.java 2008-06-30
21:42:54 UTC (rev 320)
@@ -41,16 +41,23 @@
*/
public class RepositorySourceLoadHarness {
- public static Future<Integer> execute( RepositoryConnectionFactory
connectionFactory, ExecutionEnvironment env, long maxTime, TimeUnit maxTimeUnit ) throws
InterruptedException {
+ public static Future<Integer> execute( RepositoryConnectionFactory
connectionFactory,
+ ExecutionEnvironment env,
+ long maxTime,
+ TimeUnit maxTimeUnit ) throws
InterruptedException {
int numTimes = 1;
int numClients = 1;
- RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(env, numTimes);
- List<Future<Integer>> results = runLoadTest(connectionFactory,
numClients, maxTime, maxTimeUnit, operationFactory);
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositorySourceLoadHarness.createMultipleLoadOperationFactory(numTimes);
+ List<Future<Integer>> results = runLoadTest(env, connectionFactory,
numClients, maxTime, maxTimeUnit, operationFactory);
return results.get(0);
}
- public static <T> List<Future<T>> runLoadTest(
RepositoryConnectionFactory connectionFactory, int numClients, long maxTime, TimeUnit
maxTimeUnit, RepositoryOperation.Factory<T> clientFactory )
- throws InterruptedException {
+ public static <T> List<Future<T>> runLoadTest( ExecutionEnvironment
env,
+ RepositoryConnectionFactory
connectionFactory,
+ int numClients,
+ long maxTime,
+ TimeUnit maxTimeUnit,
+ RepositoryOperation.Factory<T>
clientFactory ) throws InterruptedException {
// Create the clients ...
Collection<RepositoryOperation<T>> clients = new
ArrayList<RepositoryOperation<T>>();
for (int i = 0; i != numClients; ++i) {
@@ -58,21 +65,28 @@
}
// and run the test ...
- return runLoadTest(connectionFactory, maxTime, maxTimeUnit, clients);
+ return runLoadTest(env, connectionFactory, maxTime, maxTimeUnit, clients);
}
- public static <T> List<Future<T>> runLoadTest(
RepositoryConnectionFactory connectionFactory, long maxTime, TimeUnit maxTimeUnit,
RepositoryOperation<T>... clients ) throws InterruptedException {
+ public static <T> List<Future<T>> runLoadTest( ExecutionEnvironment
env,
+ RepositoryConnectionFactory
connectionFactory,
+ long maxTime,
+ TimeUnit maxTimeUnit,
+ RepositoryOperation<T>...
clients ) throws InterruptedException {
// Create the client collection ...
Collection<RepositoryOperation<T>> clientCollection = new
ArrayList<RepositoryOperation<T>>();
for (RepositoryOperation<T> client : clients) {
if (client != null) clientCollection.add(client);
}
// and run the test ...
- return runLoadTest(connectionFactory, maxTime, maxTimeUnit, clientCollection);
+ return runLoadTest(env, connectionFactory, maxTime, maxTimeUnit,
clientCollection);
}
- public static <T> List<Future<T>> runLoadTest(
RepositoryConnectionFactory connectionFactory, long maxTime, TimeUnit maxTimeUnit,
Collection<RepositoryOperation<T>> clients )
- throws InterruptedException {
+ public static <T> List<Future<T>> runLoadTest( ExecutionEnvironment
env,
+ RepositoryConnectionFactory
connectionFactory,
+ long maxTime,
+ TimeUnit maxTimeUnit,
+
Collection<RepositoryOperation<T>> clients ) throws InterruptedException {
assert connectionFactory != null;
assert clients != null;
assert clients.size() > 0;
@@ -88,7 +102,7 @@
try {
// Wrap each client by a callable and by another that uses a latch ...
- List<Callable<T>> callables =
RepositoryOperations.createCallables(connectionFactory, clients);
+ List<Callable<T>> callables =
RepositoryOperations.createCallables(env, connectionFactory, clients);
// Run the tests ...
List<Future<T>> futures = clientPool.invokeAll(callables,
maxTime, maxTimeUnit);
@@ -153,15 +167,14 @@
* {@link RepositoryConnection#execute(ExecutionEnvironment,
org.jboss.dna.spi.graph.commands.GraphCommand...)} the supplied
* number of times, intermixed with random math operations and {@link Thread#yield()
yielding}.
*
- * @param env the environment
* @param callsPerOperation the number of <code>load</code> calls per
RepositoryOperation
* @return the factory
*/
- public static RepositoryOperation.Factory<Integer>
createMultipleLoadOperationFactory( final ExecutionEnvironment env, final int
callsPerOperation ) {
+ public static RepositoryOperation.Factory<Integer>
createMultipleLoadOperationFactory( final int callsPerOperation ) {
return new RepositoryOperation.Factory<Integer>() {
public RepositoryOperation<Integer> create() {
- return new CallLoadMultipleTimes(env, callsPerOperation);
+ return new CallLoadMultipleTimes(callsPerOperation);
}
};
}
@@ -169,12 +182,10 @@
public static class CallLoadMultipleTimes implements
RepositoryOperation<Integer> {
private final int count;
- private final ExecutionEnvironment env;
- public CallLoadMultipleTimes( ExecutionEnvironment env, int count ) {
+ public CallLoadMultipleTimes( int count ) {
Logger.getLogger(RepositorySourceLoadHarness.class).debug("Creating
repository operation to call {0} times", count);
this.count = count;
- this.env = env;
}
/**
@@ -186,8 +197,12 @@
/**
* {@inheritDoc}
+ *
+ * @see
org.jboss.dna.spi.graph.connection.RepositoryOperation#run(org.jboss.dna.spi.graph.connection.ExecutionEnvironment,
+ * org.jboss.dna.spi.graph.connection.RepositoryConnection)
*/
- public Integer run( RepositoryConnection connection ) throws
RepositorySourceException, InterruptedException {
+ public Integer run( ExecutionEnvironment env,
+ RepositoryConnection connection ) throws
RepositorySourceException, InterruptedException {
Logger.getLogger(RepositorySourceLoadHarness.class).debug("Running {0}
operation", this.getClass().getSimpleName());
int total = count;
for (int i = 0; i != count; ++i) {