Author: rhauch
Date: 2010-01-07 02:22:26 -0500 (Thu, 07 Jan 2010)
New Revision: 1547
Modified:
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/federation/FederatedRepositoryConnection.java
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/federation/ForkRequestProcessor.java
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/federation/JoinRequestProcessor.java
trunk/docs/examples/gettingstarted/pom.xml
trunk/docs/examples/gettingstarted/repositories/src/test/resources/log4j.properties
Log:
DNA-616 The problem has to do with the fact that this example uses a
FederatedRepositorySource as a source for a JpaRepository (which creates a
FederatedRepositorySource on top of the supplied source). Thus, there is a
FederatedRepositorySource that uses another FederatedRepositorySource, and this is the
condition that causes the deadlock.
When the JcrRepository is created, it uses a SearchIndexer to crawl the content and create
the requisite indexes for query/search. The SearchIndexer is create a
CompositeRequestChannel (which it uses to submit to the designated repository source a
CompositeRequest that remains open, allowing the SearchIndexer to supply requests and then
use those requests to build other requests that are submitted within the same
execution/transaction), and is submitting a ReadBranchRequest to the source that happens
to be the lower FederatedRepositorySource. The FederatedRepositorySource returns a
connection that has an execute(...) method that implements the work by using the fork-join
algorithm to process the request, which in this case is the CompositeRequest (owned by
the CompositeRequestChannel created by the SearchIndexer).
Now, the ForkRequestProcessor works on all the incoming request, which again is the
CompositeRequest. The FRP's process(CompositeRequest) is actually inherited from
RequestProcessor, and simply uses an iterator to get all of the requests within the
CompositeRequest. The iterator promptly returns the first ReadBranchRequest, so the
ForkRequestProcessor handles this via it's process(ReadBranchRequest), and like all of
the ForkRequestProcessor.process(...) methods, forks the request into appropriate requests
to submit to the underlying (federated) sources, and then it enqueues the forked requests
into a queue.
Note that the ForkRequestProcessor does NOT mark the requests it processes as being
completed. After all, this is the job of the JoinRequestProcessor.
Meanwhile, the ForkRequestProcessor.process(ReadBranchRequest) completes and returns, and
the iterator in the ForkRequestProcessor.process(CompositeRequest) then attempts to get
the next request.
But the SearchIndexer has only added one ReadBranchRequest and is waiting/blocking until
that request is completed before it continues. But the FederatedRepositoryConnection is
waiting/blocking until the next request is added to the CompositeRequest. Thus the
deadlock.
The only way to break this deadlock is for the ReadRequestBranch to be completed. But,
because of the way that FederatedRepositoryConnection.execute(...) is implemented, the
ForkRequestProcessor.process(request) must complete before the JoinRequestProcessor is
started. So we need to create and start the JoinRequestProcessor without waiting for the
ForkRequestProcessor.process(request) to complete.
The solution is actually coded in the FederatedRepositoryConnection.execute(...) - there
is a 'shouldProcessSynchronously(Request)' method that returns whether the
ForkRequestProcessor should be processed synchronously or asynchronously, and is
implemented to always return 'true' (synchronous). The solution to this problem
requires that the method return 'false' for CompositeRequests that contain more
than one (or an unknown number) of requests. (In other cases, there is just one request,
so the forking can be done synchronously, saving the work of pushing the fork processing
into the executor service.)
Thus, the 'shouldProcessSynchronously(Request)' was implemented to do this, and
the test completed ... with an error. This error is caused by a bug in the
JoinRequestProcessor.process(ReadBranchRequest) that failed to properly calculate the
projected actual Locations for the ProxyNodes from the source requests. (Apparently the
SearchIndexer reads a branch that is deeper than was previously tested.)
Fixing the JoinRequestProcessor.process(ReadBranchRequest) allows the repository example
test cases to complete successfully.
See the attached patch, which was committed and verified with a full integration build
(all unit and integration tests pass). The patch also changes the log level of the
repository example back to 'ERROR' (what it was previously), and also re-enables
the 'docs/examples/gettingstarted/repository' project in the
'docs/examples/gettingstarted/pom.xml'.
Modified:
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/federation/FederatedRepositoryConnection.java
===================================================================
---
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/federation/FederatedRepositoryConnection.java 2010-01-07
04:05:33 UTC (rev 1546)
+++
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/federation/FederatedRepositoryConnection.java 2010-01-07
07:22:26 UTC (rev 1547)
@@ -134,14 +134,18 @@
}
protected boolean shouldProcessSynchronously( Request request ) {
+ if (request instanceof CompositeRequest) {
+ CompositeRequest composite = (CompositeRequest)request;
+ if (composite.size() == 1) return true;
+ // There is more than one request, and the JoinRequestProcessor needs to be
able to run even if
+ // the ForkRequestProcessor is processing incoming requests. Normally this
would work fine,
+ // but if the incoming request is a CompositeRequestChannel.CompositeRequest,
the ForkRequestProcessor
+ // will block if the channel is still open. In a synchronous mode, this
prevents the JoinRequestProcessor
+ // from completing the incoming requests. See DNA-616 for details.
+ return false;
+ }
+ // Otherwise, its just a single request ...
return true;
- // if (request instanceof CompositeRequest) {
- // CompositeRequest composite = (CompositeRequest)request;
- // if (composite.size() == 1) return true;
- // return false;
- // }
- // // Otherwise, its just a single request ...
- // return true;
}
/**
Modified:
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/federation/ForkRequestProcessor.java
===================================================================
---
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/federation/ForkRequestProcessor.java 2010-01-07
04:05:33 UTC (rev 1546)
+++
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/federation/ForkRequestProcessor.java 2010-01-07
07:22:26 UTC (rev 1547)
@@ -611,7 +611,14 @@
ReadNodeRequest placeholderRequest = new
ReadNodeRequest(placeholder.location(), workspace.getName());
List<Location> children = new
ArrayList<Location>(placeholder.children().size());
for (ProjectedNode child : placeholder.children()) {
- children.add(child.location()); // the ProxyNodes will have only a
path!
+ if (child instanceof ProxyNode) {
+ ProxyNode proxy = (ProxyNode)child;
+ children.add(proxy.federatedLocation()); // the ProxyNodes will
have only a path!
+ proxy.federatedLocation();
+ } else {
+ assert child instanceof PlaceholderNode;
+ children.add(child.location());
+ }
}
placeholderRequest.addChildren(children);
placeholderRequest.addProperties(placeholder.properties().values());
Modified:
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/federation/JoinRequestProcessor.java
===================================================================
---
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/federation/JoinRequestProcessor.java 2010-01-07
04:05:33 UTC (rev 1546)
+++
trunk/dna-graph/src/main/java/org/jboss/dna/graph/connector/federation/JoinRequestProcessor.java 2010-01-07
07:22:26 UTC (rev 1547)
@@ -497,6 +497,17 @@
return actual;
}
+ protected Location determineActualLocation( Location actualInSource,
+ Projection projection ) {
+ assert projection != null;
+ // Get the projection from the source-specific location ...
+ Path pathInSource = actualInSource.getPath();
+ for (Path path : projection.getPathsInRepository(pathInSource, pathFactory)) {
+ return actualInSource.with(path);
+ }
+ return actualInSource;
+ }
+
/**
* {@inheritDoc}
*
@@ -657,6 +668,8 @@
projectToFederated(actualLocation, projection, request, parent,
children, properties);
}
Location locationOfProxy = readFromSource.getActualLocationOfNode();
+ // The location is in terms of the source, so get the projected location
...
+ locationOfProxy = determineActualLocation(locationOfProxy, projection);
actualLocationsOfProxyNodes.put(locationOfProxy.getPath(),
locationOfProxy);
}
setCacheableInfo(request, fromSource.getCachePolicy());
Modified: trunk/docs/examples/gettingstarted/pom.xml
===================================================================
--- trunk/docs/examples/gettingstarted/pom.xml 2010-01-07 04:05:33 UTC (rev 1546)
+++ trunk/docs/examples/gettingstarted/pom.xml 2010-01-07 07:22:26 UTC (rev 1547)
@@ -10,7 +10,7 @@
<modules>
<module>sequencers</module>
- <!-- module>repositories</module -->
+ <module>repositories</module>
</modules>
<dependencyManagement>
Modified:
trunk/docs/examples/gettingstarted/repositories/src/test/resources/log4j.properties
===================================================================
---
trunk/docs/examples/gettingstarted/repositories/src/test/resources/log4j.properties 2010-01-07
04:05:33 UTC (rev 1546)
+++
trunk/docs/examples/gettingstarted/repositories/src/test/resources/log4j.properties 2010-01-07
07:22:26 UTC (rev 1547)
@@ -5,7 +5,7 @@
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %m%n
# Root logger option
-log4j.rootLogger=WARNING, stdout
+log4j.rootLogger=ERROR, stdout
# Set up the default logging to be INFO level, then override specific units
-log4j.logger.org.jboss.dna=WARNING
+log4j.logger.org.jboss.dna=ERROR