Author: rhauch
Date: 2008-05-29 15:19:49 -0400 (Thu, 29 May 2008)
New Revision: 211
Added:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsAsUpdate.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/SetPropertiesCommand.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperation.java
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/MockRepositorySource.java
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositorySourceLoadHarness.java
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryTestOperations.java
Modified:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/SpiI18n.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsOnPath.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsOnProperties.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/CreateNodeCommand.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/DeleteBranchCommand.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/GetChildrenCommand.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/GetPropertiesCommand.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/MoveBranchCommand.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnection.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionFactory.java
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositorySource.java
branches/federation/dna-spi/src/main/resources/org/jboss/dna/spi/SpiI18n.properties
Log:
DNA-68: Create connector API
http://jira.jboss.org/jira/browse/DNA-68
Refactored some of the graph commands, the some of the connection framework, and added
some unit tests.
Modified: branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/SpiI18n.java
===================================================================
--- branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/SpiI18n.java 2008-05-29
19:18:34 UTC (rev 210)
+++ branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/SpiI18n.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -51,6 +51,11 @@
public static I18n invalidQualifiedNameString;
+ public static I18n maximumPoolSizeMayNotBeSmallerThanCorePoolSize;
+ public static I18n repositoryConnectionPoolIsNotRunning;
+ public static I18n unableToObtainValidRepositoryAfterAttempts;
+ public static I18n closedConnectionMayNotBeUsed;
+
static {
try {
I18n.initialize(SpiI18n.class);
Added:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsAsUpdate.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsAsUpdate.java
(rev 0)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsAsUpdate.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -0,0 +1,30 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.commands;
+
+/**
+ * Marker interface that signals that a command may update or modify information in a
repository.
+ * @author Randall Hauch
+ */
+public interface ActsAsUpdate {
+
+}
Property changes on:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsAsUpdate.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsOnPath.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsOnPath.java 2008-05-29
19:18:34 UTC (rev 210)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsOnPath.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -28,7 +28,7 @@
* recipient to obtain the path that the command applies to.
* @author Randall Hauch
*/
-public interface ActsOnPath extends GraphCommand {
+public interface ActsOnPath {
/**
* Get the path to which this command applies.
Modified:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsOnProperties.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsOnProperties.java 2008-05-29
19:18:34 UTC (rev 210)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/ActsOnProperties.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -22,7 +22,7 @@
package org.jboss.dna.spi.graph.commands;
import org.jboss.dna.spi.cache.Cacheable;
-import org.jboss.dna.spi.graph.Name;
+import org.jboss.dna.spi.graph.Property;
/**
* Aspect interface for any repository command that acts upon or updates properties on a
given node. This aspect also allows for
@@ -33,11 +33,9 @@
public interface ActsOnProperties extends ActsOnPath, Cacheable {
/**
- * Set the values for the named property. Any existing property values, if previously
set, will be overwritten. If there are
- * no property vlaues or if all of the property values are null, the property will be
removed.
- * @param propertyName the name of the property
- * @param values the property values
+ * Get the properties. Any property with no values will be removed.
+ * @return the properties
*/
- void setProperty( Name propertyName, Object... values );
+ Iterable<Property> getProperties();
}
Modified:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/CreateNodeCommand.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/CreateNodeCommand.java 2008-05-29
19:18:34 UTC (rev 210)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/CreateNodeCommand.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -23,26 +23,18 @@
import java.util.Iterator;
import org.jboss.dna.spi.cache.Cacheable;
-import org.jboss.dna.spi.graph.Name;
import org.jboss.dna.spi.graph.Property;
/**
* A command to get the children of a single node given its path.
* @author Randall Hauch
*/
-public interface CreateNodeCommand extends GraphCommand, ActsOnPath, Cacheable,
ActsOnProperties {
+public interface CreateNodeCommand extends GraphCommand, ActsOnPath, Cacheable,
ActsOnProperties, ActsAsUpdate {
/**
- * Get the names of the children for this new node. The recipient of the command
should {@link Iterator#remove() remove} any
- * child name that will not be stored.
- * @return the names of the node's children; never null, but possibly empty
- */
- Iterator<Name> getChildren();
-
- /**
* Get the properties for this new node. The recipient of the command should {@link
Iterator#remove() remove} any property
* that will not be stored.
* @return the property iterator; never null, but possibly empty
*/
- Iterator<Property> getProperties();
+ Iterable<Property> getProperties();
}
Modified:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/DeleteBranchCommand.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/DeleteBranchCommand.java 2008-05-29
19:18:34 UTC (rev 210)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/DeleteBranchCommand.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -25,5 +25,5 @@
* Command that deletes a branch given by a specified path.
* @author Randall Hauch
*/
-public interface DeleteBranchCommand extends GraphCommand, ActsOnPath {
+public interface DeleteBranchCommand extends GraphCommand, ActsOnPath, ActsAsUpdate {
}
Modified:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/GetChildrenCommand.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/GetChildrenCommand.java 2008-05-29
19:18:34 UTC (rev 210)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/GetChildrenCommand.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -46,6 +46,20 @@
void setChildren( Iterator<Name> namesOfChildren );
/**
+ * Set the children of this node using an iterator of names. Any existing child
references already set on this command will be
+ * replaced by those supplied to this method.
+ * <p>
+ * The indexes of the same-name siblings will be determined by the order in which
they appear in the iterator.
+ * </p>
+ * <p>
+ * The caller may supply a custom iterator implementation, which will be called on
this same connection within the same
+ * transaction when the node data is processed and consumed.
+ * </p>
+ * @param namesOfChildren the iterable names of children; may be null if there are no
children
+ */
+ void setChildren( Iterable<Name> namesOfChildren );
+
+ /**
* Set the children of this node using the array of names. Any existing child
references already set on this command will be
* replaced by those supplied to this method.
* <p>
Modified:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/GetPropertiesCommand.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/GetPropertiesCommand.java 2008-05-29
19:18:34 UTC (rev 210)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/GetPropertiesCommand.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -22,11 +22,35 @@
package org.jboss.dna.spi.graph.commands;
import org.jboss.dna.spi.cache.Cacheable;
+import org.jboss.dna.spi.graph.Name;
+import org.jboss.dna.spi.graph.Property;
/**
* A command to obtain from the source the properties for a single node given its path.
* @author Randall Hauch
*/
-public interface GetPropertiesCommand extends GraphCommand, ActsOnPath, Cacheable,
ActsOnProperties {
+public interface GetPropertiesCommand extends GraphCommand, ActsOnPath, Cacheable {
+ /**
+ * Set the properties. Any existing property values, if previously set, will be
overwritten. If there are no property vlaues
+ * or if all of the property values are null, the property will be removed.
+ * @param properties the properties
+ */
+ void setProperties( Iterable<Property> properties );
+
+ /**
+ * Set the values for the property. Any existing property values, if previously set,
will be overwritten. If there are no
+ * property vlaues or if all of the property values are null, the property will be
removed.
+ * @param property the property
+ */
+ void setProperty( Property property );
+
+ /**
+ * Set the values for the named property. Any existing property values, if previously
set, will be overwritten. If there are
+ * no property vlaues or if all of the property values are null, the property will be
removed.
+ * @param propertyName the name of the property
+ * @param values the property values
+ */
+ void setProperty( Name propertyName, Object... values );
+
}
Modified:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/MoveBranchCommand.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/MoveBranchCommand.java 2008-05-29
19:18:34 UTC (rev 210)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/MoveBranchCommand.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -27,7 +27,7 @@
* Command that moves a branch from one path to another.
* @author Randall Hauch
*/
-public interface MoveBranchCommand extends GraphCommand, ActsOnPath {
+public interface MoveBranchCommand extends GraphCommand, ActsOnPath, ActsAsUpdate {
/**
* Get the new path to which the branch is to be moved.
Added:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/SetPropertiesCommand.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/SetPropertiesCommand.java
(rev 0)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/SetPropertiesCommand.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -0,0 +1,30 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.commands;
+
+/**
+ * A command to obtain from the source the properties for a single node given its path.
+ * @author Randall Hauch
+ */
+public interface SetPropertiesCommand extends GraphCommand, ActsOnPath, ActsOnProperties
{
+
+}
Property changes on:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/commands/SetPropertiesCommand.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnection.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnection.java 2008-05-29
19:18:34 UTC (rev 210)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnection.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -69,8 +69,9 @@
* @param env the environment in which the commands are being executed; never null
* @param commands the commands to be executed; never null
* @throws RepositorySourceException if there is a problem loading the node data
+ * @throws InterruptedException if the thread has been interrupted during the
operation
*/
- void execute( ExecutionEnvironment env, GraphCommand... commands ) throws
RepositorySourceException;
+ void execute( ExecutionEnvironment env, GraphCommand... commands ) throws
RepositorySourceException, InterruptedException;
/**
* Close this connection to signal that it is no longer needed and that any
accumulated resources are to be released.
Modified:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionFactory.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionFactory.java 2008-05-29
19:18:34 UTC (rev 210)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionFactory.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -27,6 +27,12 @@
public interface RepositoryConnectionFactory {
/**
+ * Get the name for this repository source.
+ * @return the name; never null or empty
+ */
+ String getName();
+
+ /**
* Get a connection from this factory.
* @return a connection
* @throws RepositorySourceException if there is a problem obtaining a connection
Added:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java
(rev 0)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -0,0 +1,994 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.connection;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.transaction.xa.XAResource;
+import net.jcip.annotations.GuardedBy;
+import net.jcip.annotations.ThreadSafe;
+import org.jboss.dna.common.util.ArgCheck;
+import org.jboss.dna.common.util.LogContext;
+import org.jboss.dna.common.util.Logger;
+import org.jboss.dna.spi.SpiI18n;
+import org.jboss.dna.spi.graph.commands.GraphCommand;
+
+/**
+ * @author Randall Hauch
+ */
+@ThreadSafe
+public class RepositoryConnectionPool implements RepositoryConnectionFactory {
+
+ /**
+ * Permission for checking shutdown
+ */
+ private static final RuntimePermission shutdownPerm = new
RuntimePermission("modifyThread");
+
+ /**
+ * The factory that this pool uses to create new connections.
+ */
+ private final RepositoryConnectionFactory connectionFactory;
+
+ /**
+ * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
+ */
+ private final ReentrantLock mainLock = new ReentrantLock();
+
+ /**
+ * Wait condition to support awaitTermination
+ */
+ private final Condition termination = mainLock.newCondition();
+
+ /**
+ * Set containing all connections that are available for use.
+ */
+ @GuardedBy( "mainLock" )
+ private final BlockingQueue<ConnectionWrapper> availableConnections = new
LinkedBlockingQueue<ConnectionWrapper>();
+
+ /**
+ * The connections that are currently in use.
+ */
+ @GuardedBy( "mainLock" )
+ private final Set<ConnectionWrapper> inUseConnections = new
HashSet<ConnectionWrapper>();
+
+ /**
+ * Timeout in nanoseconds for idle connections waiting to be used. Threads use this
timeout only when there are more than
+ * corePoolSize present. Otherwise they wait forever to be used.
+ */
+ private volatile long keepAliveTime;
+
+ /**
+ * The target pool size, updated only while holding mainLock, but volatile to allow
concurrent readability even during
+ * updates.
+ */
+ @GuardedBy( "mainLock" )
+ private volatile int corePoolSize;
+
+ /**
+ * Maximum pool size, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
+ */
+ @GuardedBy( "mainLock" )
+ private volatile int maximumPoolSize;
+
+ /**
+ * Current pool size, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
+ */
+ @GuardedBy( "mainLock" )
+ private volatile int poolSize;
+
+ /**
+ * Lifecycle state, updated only while holding mainLock but volatile to allow
concurrent readability even during updates.
+ */
+ @GuardedBy( "mainLock" )
+ private volatile int runState;
+
+ // Special values for runState
+ /** Normal, not-shutdown mode */
+ static final int RUNNING = 0;
+ /** Controlled shutdown mode */
+ static final int SHUTDOWN = 1;
+ /** Immediate shutdown mode */
+ static final int STOP = 2;
+ /** Final state */
+ static final int TERMINATED = 3;
+
+ /**
+ * Flag specifying whether a connection should be validated before returning it from
the {@link #getConnection()} method.
+ */
+ private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
+
+ /**
+ * The time in nanoseconds that ping should wait before timing out and failing.
+ */
+ private final AtomicLong pingTimeout = new AtomicLong(0);
+
+ /**
+ * The number of times an attempt to obtain a connection should fail with invalid
connections before throwing an exception.
+ */
+ private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
+
+ private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
+
+ private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
+
+ private final Logger logger = Logger.getLogger(this.getClass());
+
+ /**
+ * Create the pool to use the supplied connection factory, which is typically a
{@link RepositorySource}.
+ * @param connectionFactory the factory for connections
+ * @param corePoolSize the number of connections to keep in the pool, even if they
are idle.
+ * @param maximumPoolSize the maximum number of connections to allow in the pool.
+ * @param keepAliveTime when the number of connection is greater than the core, this
is the maximum time that excess idle
+ * connections will be kept before terminating.
+ * @param unit the time unit for the keepAliveTime argument.
+ * @throws IllegalArgumentException if the connection factory is null or any of the
supplied arguments are invalid
+ */
+ public RepositoryConnectionPool( RepositoryConnectionFactory connectionFactory, int
corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit ) {
+ ArgCheck.isNonNegative(corePoolSize, "corePoolSize");
+ ArgCheck.isPositive(maximumPoolSize, "maximumPoolSize");
+ ArgCheck.isNonNegative(keepAliveTime, "keepAliveTime");
+ ArgCheck.isNotNull(connectionFactory, "repository connection
factory");
+ if (maximumPoolSize < corePoolSize) {
+ throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
+ }
+ this.connectionFactory = connectionFactory;
+ this.corePoolSize = corePoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ this.keepAliveTime = unit.toNanos(keepAliveTime);
+ this.setPingTimeout(100, TimeUnit.MILLISECONDS);
+ }
+
+ public String getName() {
+ return this.connectionFactory.getName();
+ }
+
+ // -------------------------------------------------
+ // Property settings ...
+ // -------------------------------------------------
+
+ /**
+ * @return validateConnectionBeforeUse
+ */
+ public boolean getValidateConnectionBeforeUse() {
+ return this.validateConnectionBeforeUse.get();
+ }
+
+ /**
+ * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the
specified value.
+ */
+ public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
+ this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
+ }
+
+ /**
+ * @return pingTimeout
+ */
+ public long getPingTimeoutInNanos() {
+ return this.pingTimeout.get();
+ }
+
+ /**
+ * @param pingTimeout the time to wait for a ping to complete
+ * @param unit the time unit of the time argument
+ */
+ public void setPingTimeout( long pingTimeout, TimeUnit unit ) {
+ ArgCheck.isNonNegative(pingTimeout, "time");
+ this.pingTimeout.set(unit.toNanos(pingTimeout));
+ }
+
+ /**
+ * @return maxFailedAttemptsBeforeError
+ */
+ public int getMaxFailedAttemptsBeforeError() {
+ return this.maxFailedAttemptsBeforeError.get();
+ }
+
+ /**
+ * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the
specified value.
+ */
+ public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
+ this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
+ }
+
+ /**
+ * Sets the time limit for which connections may remain idle before being closed. If
there are more than the core number of
+ * connections currently in the pool, after waiting this amount of time without being
used, excess threads will be terminated.
+ * This overrides any value set in the constructor.
+ * @param time the time to wait. A time value of zero will cause excess connections
to terminate immediately after being
+ * returned.
+ * @param unit the time unit of the time argument
+ * @throws IllegalArgumentException if time less than zero
+ * @see #getKeepAliveTime
+ */
+ public void setKeepAliveTime( long time, TimeUnit unit ) {
+ ArgCheck.isNonNegative(time, "time");
+ this.keepAliveTime = unit.toNanos(time);
+ }
+
+ /**
+ * Returns the connection keep-alive time, which is the amount of time which
connections in excess of the core pool size may
+ * remain idle before being closed.
+ * @param unit the desired time unit of the result
+ * @return the time limit
+ * @see #setKeepAliveTime
+ */
+ public long getKeepAliveTime( TimeUnit unit ) {
+ assert unit != null;
+ return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * @return maximumPoolSize
+ */
+ public int getMaximumPoolSize() {
+ return this.maximumPoolSize;
+ }
+
+ /**
+ * Sets the maximum allowed number of connections. This overrides any value set in
the constructor. If the new value is
+ * smaller than the current value, excess existing but unused connections will be
closed.
+ * @param maximumPoolSize the new maximum
+ * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link
#getCorePoolSize() core pool size}
+ * @see #getMaximumPoolSize
+ */
+ public void setMaximumPoolSize( int maximumPoolSize ) {
+ ArgCheck.isPositive(maximumPoolSize, "maximum pool size");
+ if (maximumPoolSize < corePoolSize) {
+ throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
+ }
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ int extra = this.maximumPoolSize - maximumPoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ if (extra > 0 && poolSize > maximumPoolSize) {
+ // Drain the extra connections from those available ...
+ drainUnusedConnections(extra);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the core number of connections.
+ * @return the core number of connections
+ * @see #setCorePoolSize(int)
+ */
+ public int getCorePoolSize() {
+ return this.corePoolSize;
+ }
+
+ /**
+ * Sets the core number of connections. This overrides any value set in the
constructor. If the new value is smaller than the
+ * current value, excess existing and unused connections will be closed. If larger,
new connections will, if needed, be
+ * created.
+ * @param corePoolSize the new core size
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than
zero
+ * @see #getCorePoolSize()
+ */
+ public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException,
InterruptedException {
+ ArgCheck.isNonNegative(corePoolSize, "core pool size");
+ if (maximumPoolSize < corePoolSize) {
+ throw new
IllegalArgumentException(SpiI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
+ }
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ int extra = this.corePoolSize - corePoolSize;
+ this.corePoolSize = corePoolSize;
+ if (extra < 0) {
+ // Add connections ...
+ addConnectionsIfUnderCorePoolSize();
+ } else if (extra > 0 && poolSize > corePoolSize) {
+ // Drain the extra connections from those available ...
+ drainUnusedConnections(extra);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ // -------------------------------------------------
+ // Statistics ...
+ // -------------------------------------------------
+
+ /**
+ * Returns the current number of connections in the pool.
+ * @return the number of connections
+ */
+ public int getPoolSize() {
+ return poolSize;
+ }
+
+ /**
+ * Returns the approximate number of connections that have been checked out from the
pool.
+ * @return the number of checked-out connections
+ */
+ public int getInUseCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ return this.inUseConnections.size();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Get the total number of connections that have been created by this pool.
+ * @return the total number of connections created by this pool
+ */
+ public long getTotalConnectionsCreated() {
+ return this.totalConnectionsCreated.get();
+ }
+
+ /**
+ * Get the total number of times connections have been {@link #getConnection()}
used.
+ * @return the total number
+ */
+ public long getTotalConnectionsUsed() {
+ return this.totalConnectionsUsed.get();
+ }
+
+ // -------------------------------------------------
+ // Utility methods ...
+ // -------------------------------------------------
+
+ /**
+ * Call the supplied operation, using a connection from this pool.
+ * @param <T> the return type for the operation
+ * @param operation the operation to be run using a connection in this pool
+ * @return the results from the operation
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ * @throws IllegalArgumentException if the operation is null
+ * @see #callable(RepositoryOperation)
+ * @see #callables(Collection)
+ * @see #callables(RepositoryOperation...)
+ */
+ public <T> T call( RepositoryOperation<T> operation ) throws
RepositorySourceException, InterruptedException {
+ ArgCheck.isNotNull(operation, "repository operation");
+ // Get a connection ...
+ T result = null;
+ LogContext.set("context", operation.getName());
+ RepositoryConnection conn = this.getConnection();
+ try {
+ // And run the client with the connection ...
+ result = operation.run(conn);
+ } finally {
+ conn.close();
+ }
+ LogContext.clear();
+ return result;
+ }
+
+ /**
+ * Return a callable object that, when run, performs the supplied repository
operation against a connection in this pool.
+ * @param <T> the return type for the operation
+ * @param operation the operation to be run using a connection in this pool
+ * @return the callable
+ * @see #call(RepositoryOperation)
+ * @see #callables(Collection)
+ * @see #callables(RepositoryOperation...)
+ */
+ public <T> Callable<T> callable( final RepositoryOperation<T>
operation ) {
+ ArgCheck.isNotNull(operation, "repository operation");
+ final RepositoryConnectionPool pool = this;
+ return new Callable<T>() {
+
+ /**
+ * Execute by getting a connection from this pool, running the client, and
return the connection to the pool.
+ * @return the operation's result
+ * @throws Exception
+ */
+ public T call() throws Exception {
+ return pool.call(operation);
+ }
+ };
+ }
+
+ /**
+ * Return a collection of callable objects that, when run, perform the supplied
repository operations against connections in
+ * this pool.
+ * @param <T> the return type for the operations
+ * @param operations the operations to be run using connection from this pool
+ * @return the collection of callables
+ * @see #call(RepositoryOperation)
+ * @see #callable(RepositoryOperation)
+ * @see #callables(Collection)
+ */
+ public <T> List<Callable<T>> callables(
RepositoryOperation<T>... operations ) {
+ List<Callable<T>> callables = new
ArrayList<Callable<T>>();
+ for (final RepositoryOperation<T> operation : operations) {
+ callables.add(callable(operation));
+ }
+ return callables;
+ }
+
+ /**
+ * Return a collection of callable objects that, when run, perform the supplied
repository operations against connections in
+ * this pool.
+ * @param <T> the return type for the operations
+ * @param operations the operations to be run using connection from this pool
+ * @return the collection of callables
+ * @see #call(RepositoryOperation)
+ * @see #callable(RepositoryOperation)
+ * @see #callables(RepositoryOperation...)
+ */
+ public <T> List<Callable<T>> callables(
Collection<RepositoryOperation<T>> operations ) {
+ List<Callable<T>> callables = new
ArrayList<Callable<T>>();
+ for (final RepositoryOperation<T> operation : operations) {
+ callables.add(callable(operation));
+ }
+ return callables;
+ }
+
+ // -------------------------------------------------
+ // State management methods ...
+ // -------------------------------------------------
+
+ /**
+ * Starts a core connection, causing it to idly wait for use. This overrides the
default policy of starting core connections
+ * only when they are {@link #getConnection() needed}. This method will return
<tt>false</tt> if all core connections have
+ * already been started.
+ * @return true if a connection was started
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ */
+ public boolean prestartCoreConnection() throws RepositorySourceException,
InterruptedException {
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ return addConnectionIfUnderCorePoolSize();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Starts all core connections, causing them to idly wait for use. This overrides the
default policy of starting core
+ * connections only when they are {@link #getConnection() needed}.
+ * @return the number of connections started.
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ */
+ public int prestartAllCoreConnections() throws RepositorySourceException,
InterruptedException {
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ return addConnectionsIfUnderCorePoolSize();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Initiates an orderly shutdown in which connections that are currently in use are
allowed to be used and closed as normal,
+ * but no new connections will be created. Invocation has no additional effect if
already shut down.
+ * @throws SecurityException if a security manager exists and shutting down this
ConnectionPool may manipulate threads that
+ * the caller is not permitted to modify because it does not hold {@link
java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+ * or the security manager's <tt>checkAccess</tt> method denies
access.
+ */
+ public void shutdown() {
+ // Fail if caller doesn't have modifyThread permission. We
+ // explicitly check permissions directly because we can't trust
+ // implementations of SecurityManager to correctly override
+ // the "check access" methods such that our documented
+ // security policy is implemented.
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
java.security.AccessController.checkPermission(shutdownPerm);
+
+ this.logger.debug("Shutting down repository connection pool for {0}",
getName());
+ boolean fullyTerminated = false;
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ int state = this.runState;
+ if (state == RUNNING) {
+ // don't override shutdownNow
+ this.runState = SHUTDOWN;
+ }
+
+ // Kill the maintenance thread ...
+
+ // Remove and close all available connections ...
+ if (!this.availableConnections.isEmpty()) {
+ // Drain the extra connections from those available ...
+ drainUnusedConnections(this.availableConnections.size());
+ }
+
+ // If there are no connections being used, trigger full termination now ...
+ if (this.inUseConnections.isEmpty()) {
+ fullyTerminated = true;
+ this.logger.trace("Signalling termination of repository connection
pool for {0}", getName());
+ runState = TERMINATED;
+ termination.signalAll();
+ this.logger.debug("Terminated repository connection pool for
{0}", getName());
+ }
+ // Otherwise the last connection that is closed will transition the runState
to TERMINATED ...
+ } finally {
+ mainLock.unlock();
+ }
+ if (fullyTerminated) terminated();
+ }
+
+ /**
+ * Attempts to stop all actively executing tasks, halts the processing of waiting
tasks, and returns a list of the tasks that
+ * were awaiting execution.
+ * <p>
+ * This implementation cancels tasks via {@link Thread#interrupt}, so if any tasks
mask or fail to respond to interrupts,
+ * they may never terminate.
+ * @throws SecurityException if a security manager exists and shutting down this
ExecutorService may manipulate threads that
+ * the caller is not permitted to modify because it does not hold {@link
java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+ * or the security manager's <tt>checkAccess</tt> method denies
access.
+ */
+ public void shutdownNow() {
+ // Almost the same code as shutdown()
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
java.security.AccessController.checkPermission(shutdownPerm);
+
+ this.logger.debug("Shutting down (immediately) repository connection pool
for {0}", getName());
+ boolean fullyTerminated = false;
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ int state = this.runState;
+ if (state != TERMINATED) {
+ // don't override shutdownNow
+ this.runState = STOP;
+ }
+
+ // Kill the maintenance thread ...
+
+ // Remove and close all available connections ...
+ if (!this.availableConnections.isEmpty()) {
+ // Drain the extra connections from those available ...
+ drainUnusedConnections(this.availableConnections.size());
+ }
+
+ // If there are connections being used, close them now ...
+ if (!this.inUseConnections.isEmpty()) {
+ for (ConnectionWrapper connectionInUse : this.inUseConnections) {
+ try {
+ this.logger.trace("Closing repository connection to
{0}", getName());
+ connectionInUse.getOriginal().close();
+ } catch (InterruptedException e) {
+ // Ignore this ...
+ }
+ }
+ this.poolSize -= this.inUseConnections.size();
+ // The last connection that is closed will transition the runState to
TERMINATED ...
+ } else {
+ // There are no connections in use, so trigger full termination now ...
+ fullyTerminated = true;
+ this.logger.trace("Signalling termination of repository connection
pool for {0}", getName());
+ runState = TERMINATED;
+ termination.signalAll();
+ this.logger.debug("Terminated repository connection pool for
{0}", getName());
+ }
+
+ } finally {
+ mainLock.unlock();
+ }
+ if (fullyTerminated) terminated();
+ }
+
+ public boolean isShutdown() {
+ return runState != RUNNING;
+ }
+
+ /**
+ * Returns true if this executor is in the process of terminating after
<tt>shutdown</tt> or <tt>shutdownNow</tt> but has
+ * not completely terminated. This method may be useful for debugging. A return of
<tt>true</tt> reported a sufficient
+ * period after shutdown may indicate that submitted tasks have ignored or suppressed
interruption, causing this executor not
+ * to properly terminate.
+ * @return true if terminating but not yet terminated.
+ */
+ public boolean isTerminating() {
+ return runState == STOP;
+ }
+
+ public boolean isTerminated() {
+ return runState == TERMINATED;
+ }
+
+ public boolean awaitTermination( long timeout, TimeUnit unit ) throws
InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ for (;;) {
+ // this.logger.debug("---> Run state = {}; condition = {}",
runState, termination);
+ if (runState == TERMINATED) return true;
+ if (nanos <= 0) return false;
+ nanos = termination.awaitNanos(nanos);
+ // this.logger.debug("---> Done waiting: run state = {};
condition = {}", runState, termination);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Method invoked when the Executor has terminated. Default implementation does
nothing. Note: To properly nest multiple
+ * overridings, subclasses should generally invoke
<tt>super.terminated</tt> within this method.
+ */
+ protected void terminated() {
+ }
+
+ /**
+ * Invokes <tt>shutdown</tt> when this executor is no longer referenced.
+ */
+ @Override
+ protected void finalize() {
+ shutdown();
+ }
+
+ // -------------------------------------------------
+ // Connection management methods ...
+ // -------------------------------------------------
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings( "null" )
+ public RepositoryConnection getConnection() throws RepositorySourceException,
InterruptedException {
+ int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
+ ConnectionWrapper connection = null;
+ try {
+ // Do this until we get a good connection ...
+ int attemptsRemaining = attemptsAllowed;
+ while (connection == null && attemptsRemaining > 0) {
+ --attemptsRemaining;
+ ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ // If we're shutting down the pool, then just close the
connection ...
+ if (this.runState != RUNNING) {
+ throw new
IllegalStateException(SpiI18n.repositoryConnectionPoolIsNotRunning.text());
+ }
+ // If there are fewer total connections than the core size ...
+ if (this.poolSize < this.corePoolSize) {
+ // Immediately create a wrapped connection and return it ...
+ connection = newWrappedConnection();
+ }
+ // Peek to see if there is a connection available ...
+ else if (this.availableConnections.peek() != null) {
+ // There is, so take it and return it ...
+ connection = this.availableConnections.take();
+ }
+ // There is no connection available. If there are fewer total
connections than the maximum size ...
+ else if (this.poolSize < this.maximumPoolSize) {
+ // Immediately create a wrapped connection and return it ...
+ connection = newWrappedConnection();
+ }
+ if (connection != null) {
+ this.inUseConnections.add(connection);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ if (connection == null) {
+ // There are not enough connections, so wait in line for the next
available connection ...
+ this.logger.trace("Waiting for a repository connection from pool
{0}", getName());
+ connection = this.availableConnections.take();
+ mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (connection != null) {
+ this.inUseConnections.add(connection);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ this.logger.trace("Recieved a repository connection from pool
{0}", getName());
+ }
+ if (connection != null && this.validateConnectionBeforeUse.get())
{
+ connection = validateConnection(connection);
+ }
+ }
+ } catch (InterruptedException e) {
+ this.logger.trace("Thread interrupted while waiting for a repository
connection from pool {0}", getName());
+
+ // If the thread has been interrupted after we've taken a connection from
the pool ...
+ if (connection != null) {
+ // We need to return the connection back into the pool ...
+ returnConnection(connection);
+ }
+ // And rethrow ...
+ throw e;
+ }
+ if (connection == null) {
+ // We were unable to obtain a usable connection, so fail ...
+ throw new
RepositorySourceException(SpiI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
+ }
+ this.totalConnectionsUsed.incrementAndGet();
+ return connection;
+ }
+
+ /**
+ * This method is automatically called by the {@link ConnectionWrapper} when it is
{@link ConnectionWrapper#close() closed}.
+ * @param wrapper the wrapper to the connection that is being returned to the pool
+ */
+ protected void returnConnection( ConnectionWrapper wrapper ) {
+ assert wrapper != null;
+ ConnectionWrapper wrapperToClose = null;
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ // Remove the connection from the in-use set ...
+ boolean removed = this.inUseConnections.remove(wrapper);
+ assert removed;
+
+ // If we're shutting down the pool, then just close the connection ...
+ if (this.runState != RUNNING) {
+ wrapperToClose = wrapper;
+ }
+ // If there are more connections than the maximum size...
+ else if (this.poolSize > this.maximumPoolSize) {
+ // Immediately close this connection ...
+ wrapperToClose = wrapper;
+ }
+ // Attempt to make the connection available (this should generally work,
unless there is an upper limit
+ // to the number of available connections) ...
+ else if (!this.availableConnections.offer(new
ConnectionWrapper(wrapper.getOriginal()))) {
+ // The pool of available connection is full, so release it ...
+ wrapperToClose = wrapper;
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ // Close the connection if we're supposed to (do it outside of the main
lock)...
+ if (wrapperToClose != null) {
+ try {
+ closeConnection(wrapper);
+ } catch (InterruptedException e) {
+ // catch this, as there's not much we can do and the caller
doesn't care or know how to handle it
+ this.logger.trace(e, "Interrupted while closing a repository
connection");
+ }
+ }
+ }
+
+ /**
+ * Validate the supplied connection, returning the connection if valid or null if the
connection is not valid.
+ * @param connection the connection to be validated; may not be null
+ * @return the validated connection, or null if the connection did not validate and
was removed from the pool
+ */
+ protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) {
+ assert connection != null;
+ ConnectionWrapper invalidConnection = null;
+ try {
+ if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
+ invalidConnection = connection;
+ }
+ } catch (InterruptedException e) {
+ // catch this, as there's not much we can do and the caller doesn't
care or know how to handle it
+ this.logger.trace(e, "Interrupted while pinging a repository
connection");
+ invalidConnection = connection;
+ } finally {
+ if (invalidConnection != null) {
+ connection = null;
+ returnConnection(invalidConnection);
+ }
+ }
+ return connection;
+ }
+
+ /**
+ * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does
not check whether creating the new
+ * connection would violate the {@link #maximumPoolSize maximum pool size} nor does
it add the new connection to the
+ * {@link #availableConnections available connections} (as the caller may want it
immediately), but it does increment the
+ * {@link #poolSize pool size}.
+ * @return the connection wrapper with a new connection
+ * @throws RepositorySourceException if there was an error obtaining the new
connection
+ * @throws InterruptedException if the thread was interrupted during the operation
+ */
+ @GuardedBy( "mainLock" )
+ protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException,
InterruptedException {
+ RepositoryConnection connection = this.connectionFactory.getConnection();
+ ++this.poolSize;
+ this.totalConnectionsCreated.incrementAndGet();
+ return new ConnectionWrapper(connection);
+ }
+
+ /**
+ * Close a connection that is in the pool but no longer in the {@link
#availableConnections available connections}. This
+ * method does decrement the {@link #poolSize pool size}.
+ * @param wrapper the wrapper for the connection to be closed
+ * @throws InterruptedException if the thread was interrupted during the operation
+ */
+ protected void closeConnection( ConnectionWrapper wrapper ) throws
InterruptedException {
+ assert wrapper != null;
+ RepositoryConnection original = wrapper.getOriginal();
+ assert original != null;
+ try {
+ this.logger.debug("Closing repository connection to {0}",
getName());
+ original.close();
+ } finally {
+ final ReentrantLock mainLock = this.mainLock;
+ try {
+ mainLock.lock();
+ // No matter what reduce the pool size count
+ --this.poolSize;
+ // And if shutting down and this was the last connection being used...
+ if (this.runState == SHUTDOWN && this.poolSize <= 0) {
+ // then signal anybody that has called
"awaitTermination(...)"
+ this.logger.trace("Signalling termination of repository
connection pool for {0}", getName());
+ this.runState = TERMINATED;
+ this.termination.signalAll();
+ this.logger.trace("Terminated repository connection pool for
{0}", getName());
+
+ // fall through to call terminate() outside of lock.
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+ }
+
+ @GuardedBy( "mainLock" )
+ protected int drainUnusedConnections( int count ) {
+ if (count <= 0) return 0;
+ this.logger.trace("Draining up to {} unused repository connections to
{0}", count, getName());
+ // Drain the extra connections from those available ...
+ Collection<ConnectionWrapper> extraConnections = new
LinkedList<ConnectionWrapper>();
+ this.availableConnections.drainTo(extraConnections, count);
+ for (ConnectionWrapper connection : extraConnections) {
+ try {
+ this.logger.trace("Closing repository connection to {0}",
getName());
+ connection.getOriginal().close();
+ } catch (InterruptedException e) {
+ // Ignore this ...
+ }
+ }
+ int numClosed = extraConnections.size();
+ this.poolSize -= numClosed;
+ this.logger.trace("Drained {0} unused connections", numClosed);
+ return numClosed;
+ }
+
+ @GuardedBy( "mainLock" )
+ protected boolean addConnectionIfUnderCorePoolSize() throws
RepositorySourceException, InterruptedException {
+ // Add connection ...
+ if (this.poolSize < this.corePoolSize) {
+ this.availableConnections.offer(newWrappedConnection());
+ this.logger.trace("Added connection to {0} in undersized pool",
getName());
+ return true;
+ }
+ return false;
+ }
+
+ @GuardedBy( "mainLock" )
+ protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException,
InterruptedException {
+ // Add connections ...
+ int n = 0;
+ while (this.poolSize < this.corePoolSize) {
+ this.availableConnections.offer(newWrappedConnection());
+ ++n;
+ }
+ this.logger.trace("Added {0} connection(s) to {1} in undersized pool",
n, getName());
+ return n;
+ }
+
+ protected class ConnectionWrapper implements RepositoryConnection {
+
+ private final RepositoryConnection original;
+ private final long timeCreated;
+ private long lastUsed;
+ private boolean closed = false;
+
+ protected ConnectionWrapper( RepositoryConnection connection ) {
+ assert connection != null;
+ this.original = connection;
+ this.timeCreated = System.currentTimeMillis();
+ }
+
+ /**
+ * @return original
+ */
+ protected RepositoryConnection getOriginal() {
+ return this.original;
+ }
+
+ /**
+ * @return lastUsed
+ */
+ public long getTimeLastUsed() {
+ return this.lastUsed;
+ }
+
+ /**
+ * @return timeCreated
+ */
+ public long getTimeCreated() {
+ return this.timeCreated;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getSourceName() {
+ return this.original.getSourceName();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public XAResource getXAResource() {
+ if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
+ return this.original.getXAResource();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void execute( ExecutionEnvironment env, GraphCommand... commands ) throws
RepositorySourceException, InterruptedException {
+ if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
+ this.original.execute(env, commands);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean ping( long time, TimeUnit unit ) throws InterruptedException {
+ if (closed) throw new
IllegalStateException(SpiI18n.closedConnectionMayNotBeUsed.text());
+ return this.original.ping(time, unit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void close() throws InterruptedException {
+ if (!closed) {
+ this.lastUsed = System.currentTimeMillis();
+ this.original.close();
+ this.closed = true;
+ returnConnection(this);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setListener( RepositorySourceListener listener ) {
+ if (!closed) this.original.setListener(listener);
+ }
+
+ }
+
+}
Property changes on:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPool.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperation.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperation.java
(rev 0)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperation.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.connection;
+
+/**
+ * @author Randall Hauch
+ * @param <T> the type of result returned by the client
+ */
+public interface RepositoryOperation<T> {
+
+ String getName();
+
+ T run( RepositoryConnection connection ) throws RepositorySourceException,
InterruptedException;
+
+ public static interface Factory<T> {
+
+ RepositoryOperation<T> create();
+ }
+
+}
Property changes on:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositoryOperation.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositorySource.java
===================================================================
---
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositorySource.java 2008-05-29
19:18:34 UTC (rev 210)
+++
branches/federation/dna-spi/src/main/java/org/jboss/dna/spi/graph/connection/RepositorySource.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -44,12 +44,6 @@
public interface RepositorySource extends RepositoryConnectionFactory, Referenceable,
Serializable {
/**
- * Get the name for this repository source.
- * @return the name; never null or empty
- */
- String getName();
-
- /**
* Get the maximum number of retries that may be performed on a given operation when
using
* {@link #getConnection() connections} created by this source. This value does not
constitute a minimum number of retries; in
* fact, the connection user is not required to retry any operations.
Modified:
branches/federation/dna-spi/src/main/resources/org/jboss/dna/spi/SpiI18n.properties
===================================================================
---
branches/federation/dna-spi/src/main/resources/org/jboss/dna/spi/SpiI18n.properties 2008-05-29
19:18:34 UTC (rev 210)
+++
branches/federation/dna-spi/src/main/resources/org/jboss/dna/spi/SpiI18n.properties 2008-05-29
19:19:49 UTC (rev 211)
@@ -16,4 +16,9 @@
unbleToCreateSubpathBeginIndexGreaterThanOrEqualToSize = Unable to create subpath:
fromIndex({0}) >= size({1})
unbleToCreateSubpathBeginIndexGreaterThanOrEqualToEndingIndex = Unable to create subpath:
fromIndex({0}) >= toIndex({1})
-invalidQualifiedNameString = Unable to parse qualified name from "{0}"
\ No newline at end of file
+invalidQualifiedNameString = Unable to parse qualified name from "{0}"
+
+maximumPoolSizeMayNotBeSmallerThanCorePoolSize = The maximum pool size may not be smaller
than the core pool size
+repositoryConnectionPoolIsNotRunning = The repository connection pool is not running
+unableToObtainValidRepositoryAfterAttempts = Unable to obtain a valid repository after
{0} attempts
+closedConnectionMayNotBeUsed = The connection has been closed an may not be used
Added:
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/MockRepositorySource.java
===================================================================
---
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/MockRepositorySource.java
(rev 0)
+++
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/MockRepositorySource.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -0,0 +1,234 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.connection;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.naming.Reference;
+import javax.transaction.xa.XAResource;
+import net.jcip.annotations.ThreadSafe;
+import org.jboss.dna.spi.cache.CachePolicy;
+import org.jboss.dna.spi.graph.commands.GraphCommand;
+import org.jmock.Mockery;
+
+/**
+ * @author Randall Hauch
+ */
+@ThreadSafe
+public class MockRepositorySource implements RepositorySource {
+
+ private final String identifier;
+ private final AtomicInteger retryLimit = new AtomicInteger(0);
+ private final Mockery context;
+ private final AtomicInteger connectionsOpenedCount = new AtomicInteger(0);
+ private final AtomicInteger connectionsClosedCount = new AtomicInteger(0);
+ private final Set<Connection> openConnections = new
CopyOnWriteArraySet<Connection>();
+ private CachePolicy defaultCachePolicy;
+
+ public MockRepositorySource( String identifier, Mockery context ) {
+ this.identifier = identifier;
+ this.context = context;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getName() {
+ return this.identifier;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int getRetryLimit() {
+ return this.retryLimit.get();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setRetryLimit( int limit ) {
+ this.retryLimit.set(limit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public CachePolicy getDefaultCachePolicy() {
+ return defaultCachePolicy;
+ }
+
+ public void setDefaultCachePolicy( CachePolicy defaultCachePolicy ) {
+ this.defaultCachePolicy = defaultCachePolicy;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public RepositoryConnection getConnection() throws RepositorySourceException {
+ int connectionNumber = this.connectionsOpenedCount.incrementAndGet();
+ String connectionName = "Connection " + connectionNumber;
+ XAResource xaResource = context != null ? context.mock(XAResource.class,
connectionName) : null;
+ Connection c = newConnection(connectionName, xaResource);
+ this.openConnections.add(c);
+ return c;
+ }
+
+ protected Connection newConnection( String connectionName, XAResource xaResource )
throws RepositorySourceException {
+ Connection c = new Connection(connectionName);
+ c.setXaResource(xaResource);
+ return c;
+ }
+
+ protected void close( Connection conn ) {
+ if (conn != null && this.openConnections.remove(conn)) {
+ this.connectionsClosedCount.incrementAndGet();
+ }
+ }
+
+ public int getOpenConnectionCount() {
+ return this.openConnections.size();
+ }
+
+ public int getTotalConnectionsCreated() {
+ return this.connectionsOpenedCount.get();
+ }
+
+ public int getTotalConnectionsClosed() {
+ return this.connectionsClosedCount.get();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Reference getReference() {
+ throw new UnsupportedOperationException();
+ }
+
+ public class Connection implements RepositoryConnection {
+
+ private final String connectionName;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicBoolean loadResponse = new AtomicBoolean(true);
+ private final AtomicBoolean pingResponse = new AtomicBoolean(true);
+ private final AtomicLong closeCount = new AtomicLong(0);
+ private final AtomicLong loadCount = new AtomicLong(0);
+ private final AtomicLong loadDelay = new AtomicLong(0);
+ private final AtomicLong pingCount = new AtomicLong(0);
+ private final AtomicLong pingDelay = new AtomicLong(0);
+ private final AtomicReference<XAResource> xaResource = new
AtomicReference<XAResource>();
+
+ protected Connection( String connectionName ) {
+ assert connectionName != null && connectionName.trim().length() !=
0;
+ this.connectionName = connectionName;
+ }
+
+ public String getConnectionName() {
+ return this.connectionName;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void close() {
+ this.closeCount.incrementAndGet();
+ this.closed.set(true);
+ MockRepositorySource.this.close(this);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getSourceName() {
+ return MockRepositorySource.this.getName();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public XAResource getXAResource() {
+ return this.xaResource.get();
+ }
+
+ public void setXaResource( XAResource xaResource ) {
+ this.xaResource.set(xaResource);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void execute( ExecutionEnvironment env, GraphCommand... commands ) throws
RepositorySourceException, InterruptedException {
+ long delay = this.loadDelay.get();
+ if (delay > 0l) Thread.sleep(delay);
+ this.loadCount.incrementAndGet();
+ }
+
+ public void setLoadResponse( boolean response ) {
+ this.loadResponse.set(response);
+ }
+
+ public void setLoadDelay( long time, TimeUnit unit ) {
+ this.loadDelay.set(unit.toMillis(time));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean ping( long time, TimeUnit unit ) throws InterruptedException {
+ Thread.sleep(this.pingDelay.get());
+ return this.pingResponse.get();
+ }
+
+ public void setPingResponse( boolean pingResponse ) {
+ this.pingResponse.set(pingResponse);
+ }
+
+ public void setPingDelay( long time, TimeUnit unit ) {
+ this.pingDelay.set(unit.toMillis(time));
+ }
+
+ public long getPingCount() {
+ return this.pingCount.get();
+ }
+
+ public long getLoadCount() {
+ return this.loadCount.get();
+ }
+
+ public long getCloseCount() {
+ return this.closeCount.get();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setListener( RepositorySourceListener listener ) {
+ }
+
+ }
+
+}
Property changes on:
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/MockRepositorySource.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added:
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java
===================================================================
---
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java
(rev 0)
+++
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -0,0 +1,195 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.connection;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.junit.Assert.assertThat;
+import static
org.jboss.dna.spi.graph.connection.RepositorySourceLoadHarness.runLoadTest;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.jmock.Mockery;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Randall Hauch
+ */
+public class RepositoryConnectionPoolTest {
+
+ private RepositoryConnectionPool pool;
+ private MockRepositorySource repositorySource;
+ private ExecutionEnvironment env;
+ private Mockery context;
+
+ @Before
+ public void beforeEach() throws Exception {
+ this.context = new Mockery();
+ this.repositorySource = new MockRepositorySource("source 1",
this.context);
+ this.pool = new RepositoryConnectionPool(this.repositorySource, 1, 1, 100,
TimeUnit.SECONDS);
+ this.env = null;
+ }
+
+ @After
+ public void afterEach() throws Exception {
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void shouldBeCreatedInRunningState() {
+ assertThat(pool.isShutdown(), is(false));
+ assertThat(pool.isTerminating(), is(false));
+ assertThat(pool.isTerminated(), is(false));
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+ }
+
+ @Test
+ public void shouldShutdownWhenNoConnectionsWereCreatedOrUsed() throws
InterruptedException {
+ assertThat(pool.isShutdown(), is(false));
+ assertThat(pool.isTerminating(), is(false));
+ assertThat(pool.isTerminated(), is(false));
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+
+ for (int i = 0; i != 4; ++i) {
+ pool.shutdown();
+ assertThat(pool.isShutdown() || pool.isTerminating() || pool.isTerminated(),
is(true));
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ assertThat(pool.isTerminated(), is(true));
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+ }
+ }
+
+ @Test
+ public void shouldCreateConnectionAndRecoverWhenClosed() throws
RepositorySourceException, InterruptedException {
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+
+ RepositoryConnection conn = pool.getConnection();
+ assertThat(conn, is(notNullValue()));
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+
+ conn.close();
+
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+ }
+
+ @Test
+ public void shouldAllowShutdownToBeCalledMultipleTimesEvenWhenShutdown() throws
RepositorySourceException, InterruptedException {
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+
+ RepositoryConnection conn = pool.getConnection();
+ assertThat(conn, is(notNullValue()));
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+
+ conn.close();
+
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+
+ pool.shutdown();
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ assertThat(pool.isTerminated(), is(true));
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ }
+
+ @Test( expected = IllegalStateException.class )
+ public void shouldNotCreateConnectionIfPoolIsNotRunning() throws
RepositorySourceException, InterruptedException {
+ pool.shutdown();
+ pool.awaitTermination(2, TimeUnit.SECONDS);
+ assertThat(pool.isTerminated(), is(true));
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+ pool.getConnection(); // this should fail with illegal state
+ }
+
+ @Test
+ public void shouldAllowConnectionsToBeClosedMoreThanOnceWithNoIllEffects() throws
RepositorySourceException, InterruptedException {
+ assertThat(pool.getTotalConnectionsCreated(), is(0l));
+ assertThat(pool.getTotalConnectionsUsed(), is(0l));
+
+ RepositoryConnection conn = pool.getConnection();
+ assertThat(conn, is(notNullValue()));
+ assertThat(pool.getTotalConnectionsCreated(), is(1l));
+ assertThat(pool.getTotalConnectionsUsed(), is(1l));
+ assertThat(pool.getPoolSize(), is(1));
+
+ conn.close();
+ conn.close();
+ }
+
+ @Test
+ public void shouldBlockClientsWhenNotEnoughConnections() throws Exception {
+ int numConnectionsInPool = 1;
+ int numClients = 2;
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositoryTestOperations.createMultipleLoadOperationFactory(env, 10);
+ runLoadTest(repositorySource, numConnectionsInPool, numClients, 2,
TimeUnit.SECONDS, operationFactory);
+ }
+
+ @Test
+ public void shouldLimitClientsToRunSequentiallyWithOneConnectionInPool() throws
Exception {
+ int numConnectionsInPool = 1;
+ int numClients = 3;
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositoryTestOperations.createMultipleLoadOperationFactory(env, 10);
+ runLoadTest(repositorySource, numConnectionsInPool, numClients, 2,
TimeUnit.SECONDS, operationFactory);
+ }
+
+ @Test
+ public void shouldClientsToRunConncurrentlyWithTwoConnectionsInPool() throws
Exception {
+ int numConnectionsInPool = 2;
+ int numClients = 10;
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositoryTestOperations.createMultipleLoadOperationFactory(env, 10);
+ runLoadTest(repositorySource, numConnectionsInPool, numClients, 2,
TimeUnit.SECONDS, operationFactory);
+ }
+
+ @Test
+ public void shouldClientsToRunConncurrentlyWithMultipleConnectionInPool() throws
Exception {
+ int numConnectionsInPool = 10;
+ int numClients = 200;
+ RepositoryOperation.Factory<Integer> operationFactory =
RepositoryTestOperations.createMultipleLoadOperationFactory(env, 100);
+ List<Integer> results = runLoadTest(repositorySource, numConnectionsInPool,
numClients, 2, TimeUnit.SECONDS, operationFactory);
+ int total = 0;
+ for (Integer integer : results) {
+ total += integer;
+ }
+ assertThat(total, is(100 * numClients));
+ }
+
+}
Property changes on:
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryConnectionPoolTest.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added:
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositorySourceLoadHarness.java
===================================================================
---
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositorySourceLoadHarness.java
(rev 0)
+++
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositorySourceLoadHarness.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -0,0 +1,165 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.connection;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.jboss.dna.common.i18n.MockI18n;
+import org.jboss.dna.common.util.Logger;
+
+/**
+ * A test harness for using repository connections under load.
+ * @author Randall Hauch
+ */
+public class RepositorySourceLoadHarness {
+
+ public static <T> List<T> runLoadTest( RepositoryConnectionFactory
connectionFactory, int numConnectionsInPool, int numClients, long maxTime, TimeUnit
maxTimeUnit,
+ RepositoryOperation.Factory<T>
clientFactory ) throws InterruptedException, ExecutionException {
+ // Create the clients ...
+ Collection<RepositoryOperation<T>> clients = new
ArrayList<RepositoryOperation<T>>();
+ for (int i = 0; i != numClients; ++i) {
+ clients.add(clientFactory.create());
+ }
+
+ // and run the test ...
+ return runLoadTest(connectionFactory, numConnectionsInPool, maxTime, maxTimeUnit,
clients);
+ }
+
+ public static <T> List<T> runLoadTest( RepositoryConnectionFactory
connectionFactory, int numConnectionsInPool, long maxTime, TimeUnit maxTimeUnit,
RepositoryOperation<T>... clients )
+ throws InterruptedException, ExecutionException {
+ // Create the client collection ...
+ Collection<RepositoryOperation<T>> clientCollection = new
ArrayList<RepositoryOperation<T>>();
+ for (RepositoryOperation<T> client : clients) {
+ if (client != null) clientCollection.add(client);
+ }
+ // and run the test ...
+ return runLoadTest(connectionFactory, numConnectionsInPool, maxTime, maxTimeUnit,
clientCollection);
+ }
+
+ public static <T> List<T> runLoadTest( RepositoryConnectionFactory
connectionFactory, int numConnectionsInPool, long maxTime, TimeUnit maxTimeUnit,
Collection<RepositoryOperation<T>> clients )
+ throws InterruptedException, ExecutionException {
+ assert connectionFactory != null;
+ assert numConnectionsInPool > 0;
+ assert clients != null;
+ assert clients.size() > 0;
+
+ // Create a connection pool ...
+ final RepositoryConnectionPool connectionPool = new
RepositoryConnectionPool(connectionFactory, numConnectionsInPool, numConnectionsInPool,
10, TimeUnit.SECONDS);
+
+ // Create an Executor Service, using a thread factory that makes the first
'n' thread all wait for each other ...
+ final ThreadFactory threadFactory = new TestThreadFactory(clients.size(),
connectionPool);
+ final ExecutorService clientPool = Executors.newFixedThreadPool(clients.size(),
threadFactory);
+
+ try {
+ // Wrap each client by a callable and by another that uses a latch ...
+ List<Callable<T>> callables = connectionPool.callables(clients);
+
+ // Run the tests ...
+ List<Future<T>> futures = clientPool.invokeAll(callables,
maxTime, maxTimeUnit);
+
+ // Whether or not all clients completed, process the results ...
+ List<T> results = new ArrayList<T>();
+ for (Future<T> future : futures) {
+ if (future.isDone() && !future.isCancelled()) {
+ // Record the results ...
+ results.add(future.get());
+ } else {
+ // Record the results as null
+ results.add(null);
+ // Cancell any operation that is not completed
+ future.cancel(true);
+ }
+ }
+ // Return the results ...
+ return results;
+ } finally {
+ try {
+ // Shut down the pool of clients ...
+ clientPool.shutdown();
+ if (!clientPool.awaitTermination(5, TimeUnit.SECONDS)) {
+ String msg = "Unable to shutdown clients after 5 seconds";
+
Logger.getLogger(RepositorySourceLoadHarness.class).error(MockI18n.passthrough, msg);
+ }
+ } finally {
+ // Shut down the connections ...
+ connectionPool.shutdown();
+ if (!connectionPool.awaitTermination(5, TimeUnit.SECONDS)) {
+ String msg = "Unable to shutdown connections after 5
seconds";
+
Logger.getLogger(RepositorySourceLoadHarness.class).error(MockI18n.passthrough, msg);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * A thread factory that makes an initial set of threads wait until all of those
threads are created and ready. This is useful
+ * in testing to ensure that the first threads created don't get a jump start.
+ * @author Randall Hauch
+ */
+ protected static class TestThreadFactory implements ThreadFactory {
+
+ protected final CountDownLatch latch;
+ protected final RepositoryConnectionPool pool;
+
+ public TestThreadFactory( int numberOfThreadsToWait, RepositoryConnectionPool
pool ) {
+ this.latch = new CountDownLatch(numberOfThreadsToWait);
+ this.pool = pool;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Thread newThread( Runnable runnable ) {
+ return new Thread(runnable) {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run() {
+ try {
+ // Count down the thread count (if 0, this doesn't do
anything)
+ latch.countDown();
+ // Wait for all threads to reach this point ...
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ // Check that the number of connections-in-use is smaller than the
maximum pool size
+ if (pool != null) assert pool.getInUseCount() <=
pool.getMaximumPoolSize();
+ super.run();
+ }
+ };
+ }
+ }
+
+}
Property changes on:
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositorySourceLoadHarness.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added:
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryTestOperations.java
===================================================================
---
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryTestOperations.java
(rev 0)
+++
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryTestOperations.java 2008-05-29
19:19:49 UTC (rev 211)
@@ -0,0 +1,102 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.spi.graph.connection;
+
+import org.jboss.dna.common.util.Logger;
+
+/**
+ * @author Randall Hauch
+ */
+public class RepositoryTestOperations {
+
+ /**
+ * Return an operation factory that produces {@link RepositoryOperation} instances
that each call
+ * {@link RepositoryConnection#execute(ExecutionEnvironment,
org.jboss.dna.spi.graph.commands.GraphCommand...)} the supplied
+ * number of times, intermixed with random math operations and {@link Thread#yield()
yielding}.
+ * @param env the environment
+ * @param callsPerOperation the number of <code>load</code> calls per
RepositoryOperation
+ * @return the factory
+ */
+ public static RepositoryOperation.Factory<Integer>
createMultipleLoadOperationFactory( final ExecutionEnvironment env, final int
callsPerOperation ) {
+ return new RepositoryOperation.Factory<Integer>() {
+
+ public RepositoryOperation<Integer> create() {
+ return new CallLoadMultipleTimes(env, callsPerOperation);
+ }
+ };
+ }
+
+ public static class CallLoadMultipleTimes implements
RepositoryOperation<Integer> {
+
+ private final int count;
+ private final ExecutionEnvironment env;
+
+ public CallLoadMultipleTimes( ExecutionEnvironment env, int count ) {
+ this.count = count;
+ this.env = env;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getName() {
+ return Thread.currentThread().getName() +
"-CallLoadMultipleTimes";
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Integer run( RepositoryConnection connection ) throws InterruptedException
{
+ Logger.getLogger(RepositoryTestOperations.class).debug("Running {}
operation", this.getClass().getSimpleName());
+ int total = count;
+ for (int i = 0; i != count; ++i) {
+ // Add two random numbers ...
+ int int1 = random(this.hashCode() ^ (int)System.nanoTime() * i);
+ if (i % 2 == 0) {
+ Thread.yield();
+ }
+ connection.execute(env);
+ int int2 = random(this.hashCode() ^ (int)System.nanoTime() + i);
+ total += Math.min(Math.abs(Math.max(int1, int2) + int1 * int2 / 3),
count);
+ }
+ Logger.getLogger(RepositoryTestOperations.class).debug("Finishing {}
operation", this.getClass().getSimpleName());
+ return total < count ? total : count; // should really always return
count
+ }
+ }
+
+ /**
+ * A "random-enough" number generator that is cheap and that has no
synchronization issues (like some other random number
+ * generators).
+ * <p>
+ * This was taken from <a href="http://wwww.jcip.org">Java
Concurrency In Practice</a> (page 253).
+ * </p>
+ * @param seed the seed, typically based on a hash code and nanoTime
+ * @return a number that is "random enough"
+ */
+ public static int random( int seed ) {
+ seed ^= (seed << 6);
+ seed ^= (seed >>> 21);
+ seed ^= (seed << 7);
+ return seed;
+ }
+
+}
Property changes on:
branches/federation/dna-spi/src/test/java/org/jboss/dna/spi/graph/connection/RepositoryTestOperations.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain