[jboss-svn-commits] JBL Code SVN: r24248 - in labs/jbossrules/trunk: drools-core/src/main/java/org/drools/agent/impl and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Dec 5 08:05:33 EST 2008


Author: mark.proctor at jboss.com
Date: 2008-12-05 08:05:33 -0500 (Fri, 05 Dec 2008)
New Revision: 24248

Modified:
   labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/xml/changeset/ChangeSetTest.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/agent/impl/KnowledgeAgentConfigurationImpl.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/agent/impl/KnowledgeAgentImpl.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/io/impl/ResourceChangeNotifierImpl.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/io/impl/ResourceChangeScannerImpl.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/io/impl/ResourceProviderImpl.java
Log:
JBRULES-1885 Update KnowledgeAgent to use new Resource api
-should now be thread safe

JBRULES-1874 Create Resource Framework

JBRULES-1875 Load ChangeSet XML files for multiple resources

Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/xml/changeset/ChangeSetTest.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/xml/changeset/ChangeSetTest.java	2008-12-05 11:30:47 UTC (rev 24247)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/xml/changeset/ChangeSetTest.java	2008-12-05 13:05:33 UTC (rev 24248)
@@ -219,6 +219,7 @@
                       list.size() );
         assertTrue( list.contains( "rule3" ) );
         assertTrue( list.contains( "rule2" ) );
+        kagent.monitorResourceChangeEvents( false );
     }
 
     public void testModifyDirectory() throws IOException,
@@ -346,6 +347,8 @@
                       list.size() );
         assertTrue( list.contains( "rule2" ) );
         assertTrue( list.contains( "rule3" ) );
+        
+        kagent.monitorResourceChangeEvents( false );
     }
 
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/agent/impl/KnowledgeAgentConfigurationImpl.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/agent/impl/KnowledgeAgentConfigurationImpl.java	2008-12-05 11:30:47 UTC (rev 24247)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/agent/impl/KnowledgeAgentConfigurationImpl.java	2008-12-05 13:05:33 UTC (rev 24248)
@@ -38,9 +38,9 @@
             return;
         }
 
-        if ( name.equals( "drools.agent.scanResources" ) ) {
+        if ( name.equals( "drools.agent.monitorChangeSetEvents" ) ) {
             setMonitorChangeSetEvents( StringUtils.isEmpty( value ) ? true : Boolean.parseBoolean( value ) );
-        }  else if ( name.equals( "drools.agent.monitorChangeSetEvents" ) ) {
+        }  else if ( name.equals( "drools.agent.scanResources" ) ) {
             boolean bool = StringUtils.isEmpty( value ) ? true : Boolean.parseBoolean( value );
             setScanResources(  bool );
             if ( bool ) {
@@ -59,6 +59,8 @@
 
         if ( name.equals( "drools.agent.scanResources " ) ) {
             return Boolean.toString( this.scanResources );
+        } else if ( name.equals( "drools.agent.monitorChangeSetEvents " ) ) {
+            return Boolean.toString( this.monitorChangeSetEvents );
         } else if ( name.equals( "drools.agent.newInstance " ) ) {
             return Boolean.toString( this.newInstance );
         }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/agent/impl/KnowledgeAgentImpl.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/agent/impl/KnowledgeAgentImpl.java	2008-12-05 11:30:47 UTC (rev 24247)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/agent/impl/KnowledgeAgentImpl.java	2008-12-05 13:05:33 UTC (rev 24248)
@@ -55,13 +55,21 @@
         if ( configuration != null ) {
             //this.newInstance = ((KnowledgeAgentConfigurationImpl) configuration).isNewInstance();
             this.notifier = (ResourceChangeNotifierImpl) ResourceFactory.getResourceChangeNotifierService();
+            if ( ((KnowledgeAgentConfigurationImpl) configuration).isMonitorChangeSetEvents() ) {
+                this.monitor = true;
+            }
             if ( ((KnowledgeAgentConfigurationImpl) configuration).isScanResources() ) {
                 this.notifier.addResourceChangeMonitor( ResourceFactory.getResourceChangeScannerService() );
+                this.monitor = true; // if scanning, monitor must be true;
             }
-            if ( ((KnowledgeAgentConfigurationImpl) configuration).isMonitorChangeSetEvents() ) {
-                this.monitor = true;
-            }
         }
+        
+        if ( this.monitor ) {
+            this.queue = new LinkedBlockingQueue<ChangeSet>();
+            thread = new Thread( this );
+            thread.start();
+        }
+        
         buildResourceMapping( kbase );
     }
 
@@ -172,6 +180,7 @@
 
     public void resourcesChanged(ChangeSet changeSet) {
         try {
+            System.out.println( "agent resource changed" );
             this.queue.put( changeSet );
         } catch ( InterruptedException e ) {
             // @TODO add proper error message

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/io/impl/ResourceChangeNotifierImpl.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/io/impl/ResourceChangeNotifierImpl.java	2008-12-05 11:30:47 UTC (rev 24247)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/io/impl/ResourceChangeNotifierImpl.java	2008-12-05 13:05:33 UTC (rev 24248)
@@ -20,17 +20,17 @@
 
 public class ResourceChangeNotifierImpl
     implements
-    ResourceChangeNotifier,
-    Runnable {
+    ResourceChangeNotifier {
     private Map<Resource, Set<ResourceChangeListener>> subscriptions;
     private List<ResourceChangeMonitor>                monitors;
-    private volatile boolean notify;
+
     
     private LinkedBlockingQueue<ChangeSet> queue;
 
     public ResourceChangeNotifierImpl() {
         this.subscriptions = new HashMap<Resource, Set<ResourceChangeListener>>();
         this.monitors = new CopyOnWriteArrayList<ResourceChangeMonitor>();
+        this.queue = new LinkedBlockingQueue<ChangeSet>();
     }
 
     public void addResourceChangeMonitor(ResourceChangeMonitor monitor) {
@@ -101,7 +101,7 @@
         
     }
     
-    private void processChangeSet(ChangeSet changeSet) {
+    public void processChangeSet(ChangeSet changeSet) {
         // this provides the complete published change set for this notifier.
         // however different listeners might be listening to different resources, so provide
         // listener change specified change sets.
@@ -180,22 +180,54 @@
     }
     
     public void start() {
-        this.notify = true;
+        if ( this.processChangeSet == null ) {
+            this.processChangeSet = new ProcessChangeSet( this.queue, this );
+        }
+        
+        if ( ! this.processChangeSet.isRunning() ) {
+            this.processChangeSet.setNotify( true );
+            thread = new Thread( this.processChangeSet );
+            thread.start();
+        }
     }
 
     public void stop() {
-        this.notify = false;
+//        this.processChangeSet.stop();
+//        this.queue.
     }    
-
-    public void run() {
-        while ( this.notify ) {           
-            try {
-                processChangeSet( this.queue.take() );
-            } catch ( InterruptedException e ) {
-                // @TODO print proper error message
-                e.printStackTrace();
+    
+    private Thread thread;
+    private ProcessChangeSet processChangeSet;
+    
+    public static class ProcessChangeSet implements Runnable {
+        private volatile boolean notify;
+        private LinkedBlockingQueue<ChangeSet> queue;
+        private ResourceChangeNotifierImpl notifier;
+        
+        ProcessChangeSet(LinkedBlockingQueue<ChangeSet> queue, ResourceChangeNotifierImpl notifier) {
+            this.queue = queue;
+            this.notifier = notifier;            
+        }
+        
+        public void setNotify( boolean notify ) {
+            this.notify = notify;
+        }
+        
+        public boolean isRunning() {
+            return this.notify;
+        }
+        
+        public void run() {
+            while ( this.notify ) {           
+                try {
+                    System.out.println( "notifier queueing" );
+                    this.notifier.processChangeSet( this.queue.take() );
+                } catch ( InterruptedException e ) {
+                    // @TODO print proper error message
+                    e.printStackTrace();
+                }
+                Thread.yield();
             }
-            Thread.yield();
         }
     }
 

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/io/impl/ResourceChangeScannerImpl.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/io/impl/ResourceChangeScannerImpl.java	2008-12-05 11:30:47 UTC (rev 24247)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/io/impl/ResourceChangeScannerImpl.java	2008-12-05 13:05:33 UTC (rev 24248)
@@ -1,10 +1,6 @@
 package org.drools.io.impl;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -19,30 +15,26 @@
 import org.drools.io.ResourceChangeNotifier;
 import org.drools.io.ResourceChangeScanner;
 import org.drools.io.ResourceChangeScannerConfiguration;
-import org.drools.util.StringUtils;
 
 public class ResourceChangeScannerImpl
     implements
-    ResourceChangeScanner,
-    Runnable {
+    ResourceChangeScanner {
 
     private Map<Resource, Set<ResourceChangeNotifier>> resources;
-    private Set<Resource>               directories;
+    private Set<Resource>                              directories;
 
-    private volatile boolean                           scan;
-
-    private volatile long                              interval;
-
     public ResourceChangeScannerImpl() {
         this.resources = new HashMap<Resource, Set<ResourceChangeNotifier>>();
         this.directories = new HashSet<Resource>();
+        this.scannerScheduler = new ProcessChangeSet( this.resources,
+                                                      this );
         setInterval( 60 );
-        this.scan = true;
     }
 
     public void configure(ResourceChangeScannerConfiguration configuration) {
-        this.interval = ((ResourceChangeScannerConfigurationImpl) configuration).getInterval();
-        System.out.println( this.interval );
+        this.scannerScheduler.setInterval( ((ResourceChangeScannerConfigurationImpl) configuration).getInterval() );
+        System.out.println( getInterval() );
+        
         synchronized ( this.resources ) {
             this.resources.notify(); // notify wait, so that it will wait again
         }
@@ -60,7 +52,7 @@
                                   Resource resource) {
         System.out.println( "scanner : " + resource );
         synchronized ( this.resources ) {
-            if ( ((InternalResource)resource).isDirectory() ) {
+            if ( ((InternalResource) resource).isDirectory() ) {
                 this.directories.add( resource );
             }
             Set<ResourceChangeNotifier> notifiers = this.resources.get( resource );
@@ -91,124 +83,170 @@
     public void scan() {
         System.out.println( "attempt scan : " + this.resources.size() );
 
-        if ( this.resources.size() > 0 ) {
-            System.out.println( "x" );
-        }
-        Map<ResourceChangeNotifier, ChangeSet> notifications = new HashMap<ResourceChangeNotifier, ChangeSet>();
-        
-    
-        List<Resource> removed = new ArrayList<Resource>();        
+        synchronized ( this.resources ) {
 
-        // detect modified and added
-        for ( Resource resource : this.directories ) {
-            for ( Resource child : ((InternalResource)resource).listResources() ) {
-                if ( !this.resources.containsKey( child ) ) {
-                    System.out.println( "found new file : " + child );
-                    // child is new
-                    ((InternalResource)child).setResourceType( ((InternalResource)resource).getResourceType() );
-                    Set<ResourceChangeNotifier> notifiers = this.resources.get( resource ); // get notifiers for this directory
-                    for ( ResourceChangeNotifier notifier : notifiers ) {
-                        ChangeSetImpl changeSet = (ChangeSetImpl) notifications.get( notifier );
-                        if ( changeSet == null ) {
-                            // lazy initialise changeSet
-                            changeSet = new ChangeSetImpl();
-                            notifications.put( notifier,
-                                               changeSet );
+            if ( this.resources.size() > 0 ) {
+                System.out.println( "x" );
+            }
+            Map<ResourceChangeNotifier, ChangeSet> notifications = new HashMap<ResourceChangeNotifier, ChangeSet>();
+
+            List<Resource> removed = new ArrayList<Resource>();
+
+            // detect modified and added
+            for ( Resource resource : this.directories ) {
+                for ( Resource child : ((InternalResource) resource).listResources() ) {
+                    if ( !this.resources.containsKey( child ) ) {
+                        System.out.println( "found new file : " + child );
+                        // child is new
+                        ((InternalResource) child).setResourceType( ((InternalResource) resource).getResourceType() );
+                        Set<ResourceChangeNotifier> notifiers = this.resources.get( resource ); // get notifiers for this directory
+                        for ( ResourceChangeNotifier notifier : notifiers ) {
+                            ChangeSetImpl changeSet = (ChangeSetImpl) notifications.get( notifier );
+                            if ( changeSet == null ) {
+                                // lazy initialise changeSet
+                                changeSet = new ChangeSetImpl();
+                                notifications.put( notifier,
+                                                   changeSet );
+                            }
+                            if ( changeSet.getResourcesAdded().isEmpty() ) {
+                                changeSet.setResourcesAdded( new ArrayList<Resource>() );
+                            }
+                            changeSet.getResourcesAdded().add( child );
+                            notifier.subscribeChildResource( resource,
+                                                             child );
                         }
-                        if ( changeSet.getResourcesAdded().isEmpty() ) {
-                            changeSet.setResourcesAdded( new ArrayList<Resource>() );
-                        }
-                        changeSet.getResourcesAdded().add( child );
-                        notifier.subscribeChildResource( resource, child );
-                    }                
+                    }
                 }
             }
-        }
 
-        for ( Entry<Resource, Set<ResourceChangeNotifier>> entry : this.resources.entrySet() ) {
-            Resource resource = entry.getKey();
-            Set<ResourceChangeNotifier> notifiers = entry.getValue();            
-            
-            if ( !((InternalResource)resource).isDirectory() ) {
-                // detect if Resource has been modified
-                System.out.println( "scan " + resource + ": " + ((InternalResource)resource).getLastModified() + " : " + ((InternalResource)resource).getLastRead() );
-                long lastModified = ((InternalResource)resource).getLastModified();
-                if ( lastModified == 0 ) {
-                    removed.add( resource );
-                    // resource is no longer present
-                    // iterate notifiers for this resource and add to each removed
-                    for ( ResourceChangeNotifier notifier : notifiers ) {
-                        ChangeSetImpl changeSet = (ChangeSetImpl) notifications.get( notifier );
-                        if ( changeSet == null ) {
-                            // lazy initialise changeSet
-                            changeSet = new ChangeSetImpl();
-                            notifications.put( notifier,
-                                               changeSet );
+            for ( Entry<Resource, Set<ResourceChangeNotifier>> entry : this.resources.entrySet() ) {
+                Resource resource = entry.getKey();
+                Set<ResourceChangeNotifier> notifiers = entry.getValue();
+
+                if ( !((InternalResource) resource).isDirectory() ) {
+                    // detect if Resource has been modified
+                    System.out.println( "scan " + resource + ": " + ((InternalResource) resource).getLastModified() + " : " + ((InternalResource) resource).getLastRead() );
+                    long lastModified = ((InternalResource) resource).getLastModified();
+                    if ( lastModified == 0 ) {
+                        removed.add( resource );
+                        // resource is no longer present
+                        // iterate notifiers for this resource and add to each removed
+                        for ( ResourceChangeNotifier notifier : notifiers ) {
+                            ChangeSetImpl changeSet = (ChangeSetImpl) notifications.get( notifier );
+                            if ( changeSet == null ) {
+                                // lazy initialise changeSet
+                                changeSet = new ChangeSetImpl();
+                                notifications.put( notifier,
+                                                   changeSet );
+                            }
+                            if ( changeSet.getResourcesRemoved().isEmpty() ) {
+                                changeSet.setResourcesRemoved( new ArrayList<Resource>() );
+                            }
+                            changeSet.getResourcesRemoved().add( resource );
                         }
-                        if ( changeSet.getResourcesRemoved().isEmpty() ) {
-                            changeSet.setResourcesRemoved( new ArrayList<Resource>() );
+                    } else if ( ((InternalResource) resource).getLastRead() < lastModified ) {
+                        // it's modified
+                        // iterate notifiers for this resource and add to each modified
+                        for ( ResourceChangeNotifier notifier : notifiers ) {
+                            ChangeSetImpl changeSet = (ChangeSetImpl) notifications.get( notifier );
+                            if ( changeSet == null ) {
+                                // lazy initialise changeSet
+                                changeSet = new ChangeSetImpl();
+                                notifications.put( notifier,
+                                                   changeSet );
+                            }
+                            if ( changeSet.getResourcesModified().isEmpty() ) {
+                                changeSet.setResourcesModified( new ArrayList<Resource>() );
+                            }
+                            changeSet.getResourcesModified().add( resource );
                         }
-                        changeSet.getResourcesRemoved().add( resource );
                     }
-                } else if ( ((InternalResource)resource).getLastRead() <  lastModified ) {
-                    // it's modified
-                    // iterate notifiers for this resource and add to each modified
-                    for ( ResourceChangeNotifier notifier : notifiers ) {
-                        ChangeSetImpl changeSet = (ChangeSetImpl) notifications.get( notifier );
-                        if ( changeSet == null ) {
-                            // lazy initialise changeSet
-                            changeSet = new ChangeSetImpl();
-                            notifications.put( notifier,
-                                               changeSet );
-                        }
-                        if ( changeSet.getResourcesModified().isEmpty() ) {
-                            changeSet.setResourcesModified( new ArrayList<Resource>() );
-                        }
-                        changeSet.getResourcesModified().add( resource );
-                    }
                 }
-            }                
-        }
-        
-        // now iterate and removed the removed resources, we do this so as not to mutate the foreach loop while iterating
-        for ( Resource resource : removed ) {
-            this.resources.remove( resource );
-        }
+            }
 
-        for ( Entry<ResourceChangeNotifier, ChangeSet> entry : notifications.entrySet() ) {
-            ResourceChangeNotifier notifier = entry.getKey();
-            ChangeSet changeSet = entry.getValue();
-            notifier.publishKnowledgeBaseChangeSet( changeSet );
+            // now iterate and removed the removed resources, we do this so as not to mutate the foreach loop while iterating
+            for ( Resource resource : removed ) {
+                this.resources.remove( resource );
+            }
+
+            for ( Entry<ResourceChangeNotifier, ChangeSet> entry : notifications.entrySet() ) {
+                ResourceChangeNotifier notifier = entry.getKey();
+                ChangeSet changeSet = entry.getValue();
+                notifier.publishKnowledgeBaseChangeSet( changeSet );
+            }
         }
     }
-    
+
+    public void setInterval(int interval) {
+
+        this.scannerScheduler.setInterval( interval * 1000 );
+    }
+
+    public int getInterval() {
+        return this.scannerScheduler.getInterval();
+    }
+
     public void start() {
-        this.scan = true;
+        if ( !this.scannerScheduler.isRunning() ) {
+            this.scannerScheduler.setScan( true );
+            thread = new Thread( this.scannerScheduler );
+            thread.start();
+        }
     }
 
     public void stop() {
-        this.scan = false;
+        this.scannerScheduler.setScan( false );
+        synchronized ( this.resources ) {
+            this.resources.notify(); // notify wait, so that it will wait again
+        }
     }
 
-    public void run() {
-        synchronized ( this.resources ) {
-            while ( scan ) {
-                scan();
-                try {
-                    this.resources.wait( this.interval );
-                } catch ( InterruptedException e ) {
-                    System.out.println( "wait interrupted, new interval is " + this.interval + "s" );
-                    // swallow, this will happen when we are waiting and the interval changes
+    private Thread           thread;
+    private ProcessChangeSet scannerScheduler;
+
+    public static class ProcessChangeSet
+        implements
+        Runnable {
+        private volatile boolean                           scan;
+        private ResourceChangeScannerImpl                  scanner;
+        private long                                       interval;
+        private Map<Resource, Set<ResourceChangeNotifier>> resources;
+
+        ProcessChangeSet(Map<Resource, Set<ResourceChangeNotifier>> resources,
+                         ResourceChangeScannerImpl scanner) {
+            this.resources = resources;
+            this.scanner = scanner;
+        }
+
+        public void setInterval(long interval) {
+            this.interval = interval;
+        }
+
+        public int getInterval() {
+            return (int) this.interval / 1000;
+        }
+
+        public void setScan(boolean scan) {
+            this.scan = scan;
+        }
+
+        public boolean isRunning() {
+            return this.scan;
+        }
+
+        public void run() {
+            synchronized ( this.resources ) {
+                while ( this.scan ) {
+                    this.scanner.scan();
+                    try {
+                        System.out.println( "scanner waiting" );
+                        this.resources.wait( this.interval );
+                    } catch ( InterruptedException e ) {
+                        System.out.println( "wait interrupted, new interval is " + this.interval + "s" );
+                        // swallow, this will happen when we are waiting and the interval changes
+                    }
                 }
             }
         }
     }
-
-    public void setInterval(int interval) {
-        this.interval = interval * 1000;
-    }
-
-    public int getInterval() {
-        return (int) this.interval / 1000;
-    }
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/io/impl/ResourceProviderImpl.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/io/impl/ResourceProviderImpl.java	2008-12-05 11:30:47 UTC (rev 24247)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/io/impl/ResourceProviderImpl.java	2008-12-05 13:05:33 UTC (rev 24248)
@@ -17,15 +17,11 @@
     private ResourceChangeNotifier notifier;
     private ResourceChangeScanner  scanner;
     private Object                 lock = new Object();
-    private Thread                 scannerThread;
-    private Thread                 notifierThread;
 
     public ResourceChangeNotifier getResourceChangeNotifierService() {
         synchronized ( this.lock ) {
             if ( this.notifier == null ) {
-                this.notifier = new ResourceChangeNotifierImpl();
-                this.notifierThread = new Thread( (ResourceChangeScannerImpl) this.notifier );
-                //this.scannerThread.start();                
+                this.notifier = new ResourceChangeNotifierImpl();               
             }
             return this.notifier;
         }
@@ -34,9 +30,7 @@
     public ResourceChangeScanner getResourceChangeScannerService() {
         synchronized ( this.lock ) {
             if ( scanner == null ) {
-                this.scanner = new ResourceChangeScannerImpl( );
-                this.scannerThread = new Thread( (ResourceChangeScannerImpl) this.scanner );
-                //this.scannerThread.start();
+                this.scanner = new ResourceChangeScannerImpl();
             }
             return this.scanner;
         }




More information about the jboss-svn-commits mailing list