Author: rhauch
Date: 2009-05-20 16:30:39 -0400 (Wed, 20 May 2009)
New Revision: 914
Added:
trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Observer.java
Modified:
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/RepositoryContext.java
trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/ChangeObserver.java
trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/ChangeObservers.java
trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Changes.java
trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Observable.java
trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/package-info.java
trunk/dna-graph/src/test/java/org/jboss/dna/graph/connector/test/AbstractConnectorTest.java
trunk/dna-repository/src/main/java/org/jboss/dna/repository/RepositoryLibrary.java
Log:
DNA-252 Complete support for events to the connector framework
Additional changes.
Modified:
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/RepositoryContext.java
===================================================================
---
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/RepositoryContext.java 2009-05-19
20:15:05 UTC (rev 913)
+++
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/RepositoryContext.java 2009-05-20
20:30:39 UTC (rev 914)
@@ -24,6 +24,7 @@
package org.jboss.dna.graph.connector;
import org.jboss.dna.graph.ExecutionContext;
+import org.jboss.dna.graph.observe.Observer;
/**
* The context for a repository. This interface need not be implemented by a {@link
RepositorySource}, as it is normally provided
@@ -47,4 +48,10 @@
*/
RepositoryConnectionFactory getRepositoryConnectionFactory();
+ /**
+ * Get the observer that the connector may use to publish changes.
+ *
+ * @return the observer, or null if the are no listeners and publishing is not
required/requested
+ */
+ Observer getObserver();
}
Modified: trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/ChangeObserver.java
===================================================================
---
trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/ChangeObserver.java 2009-05-19
20:15:05 UTC (rev 913)
+++
trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/ChangeObserver.java 2009-05-20
20:30:39 UTC (rev 914)
@@ -38,7 +38,7 @@
* </p>
*/
@ThreadSafe
-public abstract class ChangeObserver {
+public abstract class ChangeObserver implements Observer {
private final CopyOnWriteArraySet<ChangeSourceReference> sources = new
CopyOnWriteArraySet<ChangeSourceReference>();
@@ -155,5 +155,4 @@
return false;
}
}
-
}
Modified: trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/ChangeObservers.java
===================================================================
---
trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/ChangeObservers.java 2009-05-19
20:15:05 UTC (rev 913)
+++
trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/ChangeObservers.java 2009-05-20
20:30:39 UTC (rev 914)
@@ -24,7 +24,9 @@
package org.jboss.dna.graph.observe;
import java.lang.ref.WeakReference;
+import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import net.jcip.annotations.ThreadSafe;
import org.jboss.dna.common.util.CheckArg;
import org.jboss.dna.common.util.Logger;
@@ -35,7 +37,8 @@
@ThreadSafe
public class ChangeObservers implements Observable {
- private CopyOnWriteArrayList<ObserverReference> observers = new
CopyOnWriteArrayList<ObserverReference>();
+ private final CopyOnWriteArrayList<ObserverReference> observers = new
CopyOnWriteArrayList<ObserverReference>();
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
public ChangeObservers() {
}
@@ -46,7 +49,7 @@
* @see
org.jboss.dna.graph.observe.Observable#register(org.jboss.dna.graph.observe.ChangeObserver)
*/
public boolean register( ChangeObserver observer ) {
- if (observer != null && observers.addIfAbsent(new
ObserverReference(observer))) {
+ if (observer != null && !shutdown.get() &&
observers.addIfAbsent(new ObserverReference(observer))) {
observer.registeredWith(this);
return true;
}
@@ -67,6 +70,30 @@
}
/**
+ * Unregister all registered observers, and mark this as no longer accepting new
registered observers.
+ */
+ public void shutdown() {
+ shutdown.set(true);
+ while (!observers.isEmpty()) {
+ Iterator<ObserverReference> iter = observers.iterator(); // gets
snapshot
+ observers.clear();
+ while (iter.hasNext()) {
+ ObserverReference reference = iter.next();
+ if (reference.get() != null) reference.get().unregisteredWith(this);
+ }
+ }
+ }
+
+ /**
+ * Determine whether there are any observers at the time this method is called.
+ *
+ * @return true if there are currently no observers, or false if there is at least
one observer
+ */
+ public boolean isEmpty() {
+ return observers.isEmpty();
+ }
+
+ /**
* Broadcast the supplied changes to the registered observers.
*
* @param changes the changes to broadcast
Modified: trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Changes.java
===================================================================
--- trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Changes.java 2009-05-19
20:15:05 UTC (rev 913)
+++ trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Changes.java 2009-05-20
20:30:39 UTC (rev 914)
@@ -23,6 +23,9 @@
*/
package org.jboss.dna.graph.observe;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
import javax.security.auth.Subject;
import net.jcip.annotations.Immutable;
import org.jboss.dna.graph.property.DateTime;
@@ -32,26 +35,109 @@
* A set of changes that were made atomically. Each change is in the form of a frozen
{@link ChangeRequest}.
*/
@Immutable
-public interface Changes extends Iterable<ChangeRequest> {
+public final class Changes implements Iterable<ChangeRequest>,
Comparable<Changes>, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String processId;
+ private final Subject subject;
+ private final String sourceName;
+ private final DateTime timestamp;
+ private final List<ChangeRequest> changeRequests;
+
+ public Changes( String processId,
+ Subject subject,
+ String sourceName,
+ DateTime timestamp,
+ List<ChangeRequest> requests ) {
+ this.subject = subject;
+ this.sourceName = sourceName;
+ this.timestamp = timestamp;
+ this.changeRequests = requests;
+ this.processId = processId != null ? processId : "";
+ }
+
/**
* Get the user that made these changes.
*
* @return the user; never null
*/
- public Subject getUser();
+ public Subject getSubject() {
+ return this.subject;
+ }
/**
* Get the name of the source that was changed.
*
* @return the source name; never null
*/
- public String getSourceName();
+ public String getSourceName() {
+ return this.sourceName;
+ }
/**
* Get the timestamp that the changes were made. All changes within the change set
were all made at this instant in time.
*
* @return the timestamp of the changes; never null
*/
- public DateTime getTimestamp();
+ public DateTime getTimestamp() {
+ return this.timestamp;
+ }
+
+ /**
+ * Get the identifier of the process where these changes originated. This identifier
may be useful in preventing feedbacks.
+ *
+ * @return the process identifier; never null
+ */
+ public String getProcessId() {
+ return processId;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see java.lang.Iterable#iterator()
+ */
+ public Iterator<ChangeRequest> iterator() {
+ return this.changeRequests.iterator();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ return getTimestamp().hashCode();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ public int compareTo( Changes that ) {
+ if (this == that) return 0;
+ return this.getTimestamp().compareTo(that.getTimestamp());
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals( Object obj ) {
+ if (obj == this) return true;
+ if (obj instanceof Changes) {
+ Changes that = (Changes)obj;
+ if (!this.getProcessId().equals(that.getProcessId())) return false;
+ if (!this.getSourceName().equals(that.getSourceName())) return false;
+ if (!this.getTimestamp().equals(that.getTimestamp())) return false;
+ if (!this.getSubject().equals(that.getSubject())) return false;
+ return true;
+ }
+ return false;
+ }
}
Modified: trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Observable.java
===================================================================
--- trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Observable.java 2009-05-19
20:15:05 UTC (rev 913)
+++ trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Observable.java 2009-05-20
20:30:39 UTC (rev 914)
@@ -36,7 +36,8 @@
* Register the supplied observer. This method does nothing if the observer reference
is null.
*
* @param observer the observer to be added; may be null
- * @return true if the observer was added, or false if the observer was null or if
the observer was already registered
+ * @return true if the observer was added, or false if the observer was null, if the
observer was already registered, or if
+ * the observer could not be added
*/
boolean register( ChangeObserver observer );
Copied: trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Observer.java (from rev
913, trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Changes.java)
===================================================================
--- trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Observer.java
(rev 0)
+++ trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/Observer.java 2009-05-20
20:30:39 UTC (rev 914)
@@ -0,0 +1,38 @@
+/*
+ * JBoss DNA (
http://www.jboss.org/dna)
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * See the AUTHORS.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA
+ * is licensed to you under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * JBoss DNA is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.dna.graph.observe;
+
+/**
+ *
+ */
+public interface Observer {
+
+ /**
+ * Method that is called for each {@link Changes set of changes} from the {@link
Observable} instance(s) with which this
+ * observer is registered.
+ *
+ * @param changes the changes that are being published
+ */
+ void notify( Changes changes );
+}
Modified: trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/package-info.java
===================================================================
--- trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/package-info.java 2009-05-19
20:15:05 UTC (rev 913)
+++ trunk/dna-graph/src/main/java/org/jboss/dna/graph/observe/package-info.java 2009-05-20
20:30:39 UTC (rev 914)
@@ -87,20 +87,6 @@
* Third, the requests that make up a {@link Changes} instance can actually be replayed.
Consider the case of a cache
* that is backed by a {@link org.jboss.dna.graph.connector.RepositorySource}, which
might use an observer to keep the cache in sync.
* As the cache is notified of Changes, the cache can simply replay the changes against
its source.
- * </p>
- * <h3>Change feed and timelines</h3>
- * <p>
- * While it is often sufficient to register observers and receive notifications of the
changes as they occur, it
- * is also often useful to be able to ask for the changes that have occurred since some
time or date. This allows
- * a component to poll for changes or, for example, to catch up on the changes that
occurred previous to registering
- * an observer.
- * </p>
- * <p>
- * Components that implement {@link Observable} may also implement the {@link ChangeFeed}
interface to allow for
- * components to {@link ChangeFeed#changesSince(org.jboss.dna.graph.property.DateTime)
poll for changes}. The result
- * of this method is a {@link ChangeTimeline} object that contains the {@link Changes} as
well as the time period during which
- * the change sets occurred. Note that an empty {@link ChangeTimeline} object implies
there were no changes made during the time period.
- * </p>
*/
package org.jboss.dna.graph.observe;
Modified:
trunk/dna-graph/src/test/java/org/jboss/dna/graph/connector/test/AbstractConnectorTest.java
===================================================================
---
trunk/dna-graph/src/test/java/org/jboss/dna/graph/connector/test/AbstractConnectorTest.java 2009-05-19
20:15:05 UTC (rev 913)
+++
trunk/dna-graph/src/test/java/org/jboss/dna/graph/connector/test/AbstractConnectorTest.java 2009-05-20
20:30:39 UTC (rev 914)
@@ -51,6 +51,7 @@
import org.jboss.dna.graph.connector.RepositoryContext;
import org.jboss.dna.graph.connector.RepositorySource;
import org.jboss.dna.graph.connector.RepositorySourceException;
+import org.jboss.dna.graph.observe.Observer;
import org.jboss.dna.graph.property.DateTime;
import org.jboss.dna.graph.property.Name;
import org.jboss.dna.graph.property.Path;
@@ -123,6 +124,10 @@
public RepositoryConnectionFactory getRepositoryConnectionFactory() {
return connectionFactory;
}
+
+ public Observer getObserver() {
+ return null; // no observers here
+ }
});
// And set up the graph instance ...
Modified:
trunk/dna-repository/src/main/java/org/jboss/dna/repository/RepositoryLibrary.java
===================================================================
---
trunk/dna-repository/src/main/java/org/jboss/dna/repository/RepositoryLibrary.java 2009-05-19
20:15:05 UTC (rev 913)
+++
trunk/dna-repository/src/main/java/org/jboss/dna/repository/RepositoryLibrary.java 2009-05-20
20:30:39 UTC (rev 914)
@@ -41,6 +41,11 @@
import org.jboss.dna.graph.connector.RepositoryConnectionPool;
import org.jboss.dna.graph.connector.RepositoryContext;
import org.jboss.dna.graph.connector.RepositorySource;
+import org.jboss.dna.graph.observe.ChangeObserver;
+import org.jboss.dna.graph.observe.ChangeObservers;
+import org.jboss.dna.graph.observe.Changes;
+import org.jboss.dna.graph.observe.Observable;
+import org.jboss.dna.graph.observe.Observer;
import org.jboss.dna.repository.mimetype.MimeTypeDetectors;
import org.jboss.dna.repository.service.AbstractServiceAdministrator;
import org.jboss.dna.repository.service.ServiceAdministrator;
@@ -52,7 +57,7 @@
* @author Randall Hauch
*/
@ThreadSafe
-public class RepositoryLibrary implements RepositoryConnectionFactory {
+public class RepositoryLibrary implements RepositoryConnectionFactory, Observable {
/**
* The administrative component for this service.
@@ -106,8 +111,9 @@
private final ReadWriteLock sourcesLock = new ReentrantReadWriteLock();
private final CopyOnWriteArrayList<RepositoryConnectionPool> pools = new
CopyOnWriteArrayList<RepositoryConnectionPool>();
private RepositoryConnectionFactory delegate;
- protected final ExecutionContext executionContext;
+ private final ExecutionContext executionContext;
private final RepositoryContext repositoryContext;
+ private final ObservationBus observationBus = new InMemoryObservationBus();
/**
* Create a new manager instance.
@@ -146,11 +152,12 @@
* this manager; may be null if there is no delegate
* @throws IllegalArgumentException if the
<code>executionContextFactory</code> reference is null
*/
- public RepositoryLibrary( ExecutionContext executionContext,
+ public RepositoryLibrary( final ExecutionContext executionContext,
RepositoryConnectionFactory delegate ) {
CheckArg.isNotNull(executionContext, "executionContext");
this.delegate = delegate;
this.executionContext = executionContext;
+ final ObservationBus observationBus = this.observationBus;
this.repositoryContext = new RepositoryContext() {
/**
* {@inheritDoc}
@@ -158,7 +165,7 @@
* @see
org.jboss.dna.graph.connector.RepositoryContext#getExecutionContext()
*/
public ExecutionContext getExecutionContext() {
- return RepositoryLibrary.this.executionContext;
+ return executionContext;
}
/**
@@ -170,10 +177,37 @@
return RepositoryLibrary.this;
}
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.graph.connector.RepositoryContext#getObserver()
+ */
+ public Observer getObserver() {
+ return observationBus.hasObservers() ? observationBus : null;
+ }
+
};
}
/**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.graph.observe.Observable#register(org.jboss.dna.graph.observe.ChangeObserver)
+ */
+ public boolean register( ChangeObserver observer ) {
+ return observationBus.register(observer);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.graph.observe.Observable#unregister(org.jboss.dna.graph.observe.ChangeObserver)
+ */
+ public boolean unregister( ChangeObserver observer ) {
+ return observationBus.unregister(observer);
+ }
+
+ /**
* @return executionContextFactory
*/
public ExecutionContext getExecutionContext() {
@@ -235,6 +269,8 @@
} finally {
this.sourcesLock.readLock().unlock();
}
+ // Remove all listeners ...
+ this.observationBus.shutdown();
}
/**
@@ -462,4 +498,65 @@
}
return null;
}
+
+ protected interface ObservationBus extends Observable, Observer {
+ boolean hasObservers();
+
+ void shutdown();
+ }
+
+ protected class InMemoryObservationBus implements ObservationBus {
+ private final ChangeObservers observers = new ChangeObservers();
+
+ protected InMemoryObservationBus() {
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.graph.observe.Observable#register(org.jboss.dna.graph.observe.ChangeObserver)
+ */
+ public boolean register( ChangeObserver observer ) {
+ return observers.register(observer);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.graph.observe.Observable#unregister(org.jboss.dna.graph.observe.ChangeObserver)
+ */
+ public boolean unregister( ChangeObserver observer ) {
+ return observers.unregister(observer);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
org.jboss.dna.graph.observe.Observer#notify(org.jboss.dna.graph.observe.Changes)
+ */
+ public void notify( Changes changes ) {
+ if (changes != null) {
+ // Broadcast the changes to the registered observers ...
+ observers.broadcast(changes);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.repository.RepositoryLibrary.ObservationBus#hasObservers()
+ */
+ public boolean hasObservers() {
+ return !observers.isEmpty();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jboss.dna.repository.RepositoryLibrary.ObservationBus#shutdown()
+ */
+ public void shutdown() {
+ observers.shutdown();
+ }
+ }
}