Author: heiko.braun(a)jboss.com
Date: 2008-08-12 10:16:39 -0400 (Tue, 12 Aug 2008)
New Revision: 234
Added:
sam/trunk/modules/core/src/main/java/org/jboss/sam/config/Configurator.java
sam/trunk/modules/core/src/test/resources/database/sam-cfg.xml
Modified:
sam/trunk/modules/core/pom.xml
sam/trunk/modules/core/src/main/java/org/jboss/sam/EventProcessor.java
sam/trunk/modules/core/src/main/java/org/jboss/sam/StreamInput.java
sam/trunk/modules/core/src/main/java/org/jboss/sam/StreamOutput.java
sam/trunk/modules/core/src/main/java/org/jboss/sam/config/StatementDef.java
sam/trunk/modules/core/src/main/java/org/jboss/sam/config/internal/EPLValueAdapter.java
sam/trunk/modules/core/src/main/java/org/jboss/sam/internal/drools/DroolsEventProcessorImpl.java
sam/trunk/modules/core/src/main/java/org/jboss/sam/internal/stream/InVMStreamInput.java
sam/trunk/modules/core/src/test/java/org/jboss/test/sam/BufferedStreamOutput.java
sam/trunk/modules/core/src/test/java/org/jboss/test/sam/config/ConfigParserTestCase.java
sam/trunk/modules/core/src/test/java/org/jboss/test/sam/database/DatabaseTestCase.java
sam/trunk/modules/sim/src/main/java/org/jboss/sam/simulation/EventSinkEntity.java
Log:
Add intermediary that applies confgurations to an event processor
Modified: sam/trunk/modules/core/pom.xml
===================================================================
--- sam/trunk/modules/core/pom.xml 2008-08-12 13:07:17 UTC (rev 233)
+++ sam/trunk/modules/core/pom.xml 2008-08-12 14:16:39 UTC (rev 234)
@@ -101,7 +101,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
-
<exclude>org/jboss/test/sam/database/DatabaseTestCase.java</exclude>
+
<!--exclude>org/jboss/test/sam/database/DatabaseTestCase.java</exclude-->
</excludes>
</configuration>
</plugin>
Modified: sam/trunk/modules/core/src/main/java/org/jboss/sam/EventProcessor.java
===================================================================
--- sam/trunk/modules/core/src/main/java/org/jboss/sam/EventProcessor.java 2008-08-12
13:07:17 UTC (rev 233)
+++ sam/trunk/modules/core/src/main/java/org/jboss/sam/EventProcessor.java 2008-08-12
14:16:39 UTC (rev 234)
@@ -108,6 +108,8 @@
String getName();
+ void setName(String name);
+
T getUnderlying();
}
Modified: sam/trunk/modules/core/src/main/java/org/jboss/sam/StreamInput.java
===================================================================
--- sam/trunk/modules/core/src/main/java/org/jboss/sam/StreamInput.java 2008-08-12
13:07:17 UTC (rev 233)
+++ sam/trunk/modules/core/src/main/java/org/jboss/sam/StreamInput.java 2008-08-12
14:16:39 UTC (rev 234)
@@ -52,4 +52,6 @@
* @return
*/
String getStreamName();
+
+ void setStreamName(String name);
}
Modified: sam/trunk/modules/core/src/main/java/org/jboss/sam/StreamOutput.java
===================================================================
--- sam/trunk/modules/core/src/main/java/org/jboss/sam/StreamOutput.java 2008-08-12
13:07:17 UTC (rev 233)
+++ sam/trunk/modules/core/src/main/java/org/jboss/sam/StreamOutput.java 2008-08-12
14:16:39 UTC (rev 234)
@@ -47,4 +47,6 @@
* @return
*/
String getStreamName();
+
+ void setStreamName(String name);
}
Added: sam/trunk/modules/core/src/main/java/org/jboss/sam/config/Configurator.java
===================================================================
--- sam/trunk/modules/core/src/main/java/org/jboss/sam/config/Configurator.java
(rev 0)
+++ sam/trunk/modules/core/src/main/java/org/jboss/sam/config/Configurator.java 2008-08-12
14:16:39 UTC (rev 234)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.sam.config;
+
+import org.jboss.sam.EventProcessor;
+import org.jboss.sam.StreamInput;
+import org.jboss.sam.StreamOutput;
+
+/**
+ * An intermediary to configure {@link org.jboss.sam.EventProcessor}'s
+ * from configuration descriptors. Allows for interception/modification
+ * of configuration values.
+ *
+ * @author Heiko.Braun <heiko.braun(a)jboss.com>
+ */
+public class Configurator
+{
+ private ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ private ProcessingNodeDef nodeDef;
+
+ public Configurator(ProcessingNodeDef nodeDef, ClassLoader loader)
+ {
+ this.nodeDef = nodeDef;
+ this.loader = loader;
+ }
+
+ public Configurator(ProcessingNodeDef nodeDef)
+ {
+ this.nodeDef = nodeDef;
+ }
+
+ public void configure(EventProcessor processor)
+ {
+ processor.setName( nodeDef.getName() );
+
+ for(StreamInputDef def : nodeDef.getInputs())
+ {
+ StreamInput input = (StreamInput)newInstance(def.getClazz());
+ input.setStreamName(def.getName());
+ processor.addInput(input);
+ }
+
+ for(StreamOutputDef def : nodeDef.getOutputs())
+ {
+ StreamOutput output = (StreamOutput)newInstance(def.getClazz());
+ output.setStreamName(def.getName());
+ processor.addOutput(output);
+ }
+
+ for(StatementDef def : nodeDef.getStatements())
+ {
+ if(def.getBoundTo()!=null)
+ {
+ processor.registerBoundStatement(
+ def.getBoundTo(), def.getName(), def.getEpl().getExpression()
+ );
+ }
+ else
+ {
+ processor.registerStatement(
+ def.getName(), def.getEpl().getExpression()
+ );
+ }
+ }
+ }
+
+ private Object newInstance(String clazz)
+ {
+ Object obj = null;
+ try
+ {
+ obj = loader.loadClass( clazz ).newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to load class: " + clazz, e);
+ }
+
+ return obj;
+ }
+}
Property changes on:
sam/trunk/modules/core/src/main/java/org/jboss/sam/config/Configurator.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: sam/trunk/modules/core/src/main/java/org/jboss/sam/config/StatementDef.java
===================================================================
--- sam/trunk/modules/core/src/main/java/org/jboss/sam/config/StatementDef.java 2008-08-12
13:07:17 UTC (rev 233)
+++ sam/trunk/modules/core/src/main/java/org/jboss/sam/config/StatementDef.java 2008-08-12
14:16:39 UTC (rev 234)
@@ -80,5 +80,5 @@
public void setBoundTo(String boundTo)
{
this.boundTo = boundTo;
- }
+ }
}
Modified:
sam/trunk/modules/core/src/main/java/org/jboss/sam/config/internal/EPLValueAdapter.java
===================================================================
---
sam/trunk/modules/core/src/main/java/org/jboss/sam/config/internal/EPLValueAdapter.java 2008-08-12
13:07:17 UTC (rev 233)
+++
sam/trunk/modules/core/src/main/java/org/jboss/sam/config/internal/EPLValueAdapter.java 2008-08-12
14:16:39 UTC (rev 234)
@@ -31,12 +31,16 @@
public String unmarshal(String expr) throws Exception
{
- if(expr.indexOf("<![CDATA[") ==-1)
- throw new IllegalArgumentException("EPLDef.expression does not contain
CDATA section");
+ String result = expr;
- return expr.substring(
- expr.indexOf("<![CDATA[")+1, expr.indexOf("]]")
- );
+ if(expr.indexOf("<![CDATA[") !=-1) // seems that JAXB strips it by
default
+ {
+ result = expr.substring(
+ expr.indexOf("<![CDATA[")+1, expr.indexOf("]]")
+ );
+ }
+
+ return result;
}
public String marshal(String expr) throws Exception
Modified:
sam/trunk/modules/core/src/main/java/org/jboss/sam/internal/drools/DroolsEventProcessorImpl.java
===================================================================
---
sam/trunk/modules/core/src/main/java/org/jboss/sam/internal/drools/DroolsEventProcessorImpl.java 2008-08-12
13:07:17 UTC (rev 233)
+++
sam/trunk/modules/core/src/main/java/org/jboss/sam/internal/drools/DroolsEventProcessorImpl.java 2008-08-12
14:16:39 UTC (rev 234)
@@ -31,6 +31,8 @@
import org.drools.lang.descr.PackageDescr;
import org.drools.spi.GlobalResolver;
import org.jboss.sam.*;
+import org.jboss.sam.config.ConfigFactory;
+import org.jboss.sam.config.Configurator;
import java.io.*;
import java.net.URL;
@@ -66,17 +68,28 @@
try
{
+
this.ruleBase = RuleBaseFactory.newRuleBase( RuleBase.RETEOO, null );
this.session = ruleBase.newStatefulSession();
this.session.setGlobalResolver(this);
+ // parse config
+ if(config!=null) applyConfiguration(config);
+
} catch (Exception e)
{
throw new RuntimeException("Failed to initialize event processor",
e);
}
}
+ private void applyConfiguration(URL config) throws Exception
+ {
+ ConfigFactory cfgFactory = ConfigFactory.newInstance();
+ Configurator cfg = new Configurator( cfgFactory.unmarshall(config) );
+ cfg.configure(this);
+ }
+
public Object resolveGlobal(String key)
{
Object global = null;
@@ -217,11 +230,18 @@
clearStatement(stmtNames.next());
}
}
+
public String getName()
{
return this.name;
}
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
public RuleBase getUnderlying()
{
return this.ruleBase;
Modified:
sam/trunk/modules/core/src/main/java/org/jboss/sam/internal/stream/InVMStreamInput.java
===================================================================
---
sam/trunk/modules/core/src/main/java/org/jboss/sam/internal/stream/InVMStreamInput.java 2008-08-12
13:07:17 UTC (rev 233)
+++
sam/trunk/modules/core/src/main/java/org/jboss/sam/internal/stream/InVMStreamInput.java 2008-08-12
14:16:39 UTC (rev 234)
@@ -34,6 +34,11 @@
private String streamName;
InVMStreamRegistry channelRegistry;
+
+ public InVMStreamInput()
+ {
+ this.channelRegistry = InVMStreamRegistry.getInstance();
+ }
public InVMStreamInput(String name)
{
@@ -63,6 +68,12 @@
return this.streamName;
}
+
+ public void setStreamName(String name)
+ {
+ this.streamName = name;
+ }
+
public void registerCallback(StreamInputCallback listener)
{
channelRegistry.getChannelCallbacks(streamName).add(listener);
Modified:
sam/trunk/modules/core/src/test/java/org/jboss/test/sam/BufferedStreamOutput.java
===================================================================
---
sam/trunk/modules/core/src/test/java/org/jboss/test/sam/BufferedStreamOutput.java 2008-08-12
13:07:17 UTC (rev 233)
+++
sam/trunk/modules/core/src/test/java/org/jboss/test/sam/BufferedStreamOutput.java 2008-08-12
14:16:39 UTC (rev 234)
@@ -82,8 +82,14 @@
public String getStreamName()
{
return this.streamName;
- }
+ }
+
+ public void setStreamName(String name)
+ {
+ this.streamName = name;
+ }
+
public void write(EventMessage[] args)
{
update(args, new EventMessage[]{});
Modified:
sam/trunk/modules/core/src/test/java/org/jboss/test/sam/config/ConfigParserTestCase.java
===================================================================
---
sam/trunk/modules/core/src/test/java/org/jboss/test/sam/config/ConfigParserTestCase.java 2008-08-12
13:07:17 UTC (rev 233)
+++
sam/trunk/modules/core/src/test/java/org/jboss/test/sam/config/ConfigParserTestCase.java 2008-08-12
14:16:39 UTC (rev 234)
@@ -93,6 +93,7 @@
assertEquals(1, pnd.getStatements().size());
assertEquals(1, pnd.getInputs().size());
assertEquals(1, pnd.getOutputs().size());
- assertEquals(0, pnd.getProps().size());
+ assertEquals(0, pnd.getProps().size());
+ assertNotNull("EPL is null",
pnd.getStatements().get(0).getEpl().getExpression());
}
}
Modified:
sam/trunk/modules/core/src/test/java/org/jboss/test/sam/database/DatabaseTestCase.java
===================================================================
---
sam/trunk/modules/core/src/test/java/org/jboss/test/sam/database/DatabaseTestCase.java 2008-08-12
13:07:17 UTC (rev 233)
+++
sam/trunk/modules/core/src/test/java/org/jboss/test/sam/database/DatabaseTestCase.java 2008-08-12
14:16:39 UTC (rev 234)
@@ -37,7 +37,7 @@
public void setUp() throws Exception
{
// Runtime
- createProcessorWithDefaultIO("DataBaseTestCase",
"database/esper-cfg.xml");
+ createProcessorWithDefaultIO("DataBaseTestCase",
"database/sam-cfg.xml");
// InMemory Database
db = createDatabase();
@@ -75,7 +75,7 @@
public void testTriggeredSQLQuery() throws Exception
{
// setup
- String epl = "select symbol, price from Query.win:length(1) as query," +
+ /*String epl = "select symbol, price from Query.win:length(1) as query,"
+
" sql:SamTestDB ['SELECT symbol, price FROM quotes WHERE symbol like
(${query.criteria})']";
eventProcessor.registerBoundStatement("InVM","TriggerSQL",
epl);
@@ -91,7 +91,7 @@
for(EventMessage event : eventMessages)
{
System.out.println("symbol/price -> " +
event.getProperty("symbol") + "/" +
event.getProperty("price"));
- }
+ } */
}
public void testEsperPullAPI() throws Exception
Added: sam/trunk/modules/core/src/test/resources/database/sam-cfg.xml
===================================================================
--- sam/trunk/modules/core/src/test/resources/database/sam-cfg.xml
(rev 0)
+++ sam/trunk/modules/core/src/test/resources/database/sam-cfg.xml 2008-08-12 14:16:39 UTC
(rev 234)
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="ISO-8859-1"
standalone="yes"?>
+<ns2:processingNode name="DatabaseTestCase"
+ domain="http://org.jboss.sam/test/database"
+ xmlns:ns2="http://org.jboss.sam/08/2008/">
+ <config/>
+ <inputs>
+ <input name="VMInput"
clazz="org.jboss.sam.internal.stream.InVMStreamInput"/>
+ </inputs>
+ <outputs>
+ <output name="VMOutput"
clazz="org.jboss.test.sam.BufferedStreamOutput"/>
+ </outputs>
+ <statements>
+ <statement name="ExampleStatement">
+ <epl lang="drl"><![CDATA[
+package org.jboss.test.sam.drools;
+
+import org.jboss.test.sam.drools.StockTick;
+global org.jboss.sam.internal.drools.DroolsAdapter Drools;
+
+rule "Check event"
+when
+ $st: StockTick(symbol == "ACME")
+then
+ Drools.getListener("InVM").update($st);
+end
+]]></epl>
+ </statement>
+ </statements>
+</ns2:processingNode>
\ No newline at end of file
Property changes on: sam/trunk/modules/core/src/test/resources/database/sam-cfg.xml
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
sam/trunk/modules/sim/src/main/java/org/jboss/sam/simulation/EventSinkEntity.java
===================================================================
---
sam/trunk/modules/sim/src/main/java/org/jboss/sam/simulation/EventSinkEntity.java 2008-08-12
13:07:17 UTC (rev 233)
+++
sam/trunk/modules/sim/src/main/java/org/jboss/sam/simulation/EventSinkEntity.java 2008-08-12
14:16:39 UTC (rev 234)
@@ -92,6 +92,12 @@
return this.streamName;
}
+
+ public void setStreamName(String name)
+ {
+ this.streamName = name;
+ }
+
public void write(EventMessage[] events)
{
if(null==this.context)