[JBoss jBPM] - Re: Token.signal() ---> 2 tokens?
by pojomonkey
Process definition:
<?xml version="1.0" encoding="UTF-8"?>
|
| <process-definition xmlns="" name="ForkTest">
|
| <start-state name="startstate">
| <task name="NewProcess">
| <controller>
| <variable access="read,write,required" name="choice"></variable>
| </controller>
| </task>
| <transition to="decisonnode" name="ok"></transition>
| </start-state>
|
|
| <decision name="decisonnode">
| <transition to="optiona" name="choose a">
| <condition>#{choice == "OptionA"}</condition>
| </transition>
| <transition to="optionb" name="choose b">
| <condition>#{choice == "OptionB"}</condition>
| </transition>
| </decision>
|
| <fork name="fork1">
| <transition to="track" name="t10"></transition>
| <transition to="proceed" name="t20"></transition>
| </fork>
|
| <fork name="fork2" async="true">
| <transition to="monitor" name="t1"></transition>
| <transition to="step1" name="t2"></transition>
| </fork>
|
| <join name="join1">
| <transition to="join2"></transition>
| </join>
|
| <join name="join2">
| <transition to="complete"></transition>
| </join>
|
| <state name="optiona">
| <transition to="nextstate" name="ok"></transition>
| </state>
|
| <state name="nextstate">
| <transition to="fork1" name="ok"></transition>
| </state>
|
| <state name="optionb">
| <transition to="nextstate" name="ok"></transition>
| </state>
|
| <state name="proceed" async="true">
| <transition to="fork2" name="ok"></transition>
| </state>
|
| <state name="track" async="true">
| <transition to="join2" name="ok"></transition>
| </state>
|
| <state name="monitor" async="true">
| <transition to="join1" name="ok"></transition>
| </state>
|
| <state name="step1" async="true">
| <transition to="step2" name="ok"></transition>
| </state>
|
| <state name="step2" async="true">
| <transition to="step3" name="ok"></transition>
| </state>
|
| <state name="complete">
| <transition to="end" name="ok"></transition>
| </state>
|
| <state async="true" name="step3">
| <transition to="join1" name="ok"></transition>
| </state>
|
|
| <end-state name="end"></end-state>
|
|
| <action name="EnterNode" class="taskmanager.EnterStateHandler"></action>
|
| <action name="LeaveNode" class="taskmanager.ExitStateHandler"></action>
|
|
| <event type="node-enter">
| <action class="taskmanager.EnterStateHandler" ref-name="EnterNode"></action>
| </event>
|
| <event type="node-leave">
| <action ref-name="LeaveNode"></action>
| </event>
|
|
| </process-definition>
Unit test:
package forktest;
|
| import java.util.Map;
|
| import junit.framework.TestCase;
|
| import org.jbpm.JbpmConfiguration;
| import org.jbpm.JbpmContext;
| import org.jbpm.context.exe.ContextInstance;
| import org.jbpm.graph.def.Node;
| import org.jbpm.graph.def.ProcessDefinition;
| import org.jbpm.graph.exe.ProcessInstance;
| import org.jbpm.graph.exe.Token;
| import org.jbpm.persistence.db.DbPersistenceServiceFactory;
| import org.jbpm.svc.Services;
| import org.jbpm.taskmgmt.exe.TaskInstance;
|
| public class ForkUnitTest extends TestCase
| {
| /**
| * Some helpers
| */
| JbpmConfiguration jbpmConfiguration = JbpmConfiguration.getInstance();
| DbPersistenceServiceFactory dbPersistenceServiceFactory = (DbPersistenceServiceFactory) jbpmConfiguration.getServiceFactory(Services.SERVICENAME_PERSISTENCE);
|
| JbpmContext jbpmContext;
|
| private ProcessInstance processInstance;
|
| private long processInstanceId;
| private ContextInstance contextInstance;
| // private TaskMgmtInstance taskMgmtInstance;
|
| public ForkUnitTest()
| {
| }
|
| /**
| * Set up the basic in-memory DB, deploy the process description and
| * set up a jBPM context.
| * @see jBPM docs wrt jbpm configuration file
| */
| protected void setUp() throws Exception
| {
| super.setUp();
| dbPersistenceServiceFactory.createSchema();
| deployProcess();
| jbpmConfiguration.startJobExecutor();
| jbpmContext = jbpmConfiguration.createJbpmContext();
| }
|
| /**
| * Tidy up.
| */
| protected void tearDown() throws Exception
| {
| super.tearDown();
| jbpmContext.close();
| dbPersistenceServiceFactory.dropSchema();
| jbpmContext = null;
| }
|
| /**
| *
| */
| public void newTransaction()
| {
| jbpmContext.close();
| jbpmContext = jbpmConfiguration.createJbpmContext();
| processInstance = jbpmContext.loadProcessInstanceForUpdate(processInstanceId);
| // processInstanceId = processInstance.getId();
| }
|
| /**
| * Get the process description file, parse it and deploy the
| * process definition.
| */
| private void deployProcess()
| {
| JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
| try
| {
| ProcessDefinition processDefinition =
| ProcessDefinition.parseXmlResource("ForkProcess/processdefinition.xml");
| jbpmContext.deployProcessDefinition(processDefinition);
| } finally
| {
| jbpmContext.close();
| }
| }
|
| /**
| * Each subsequent test will need a new process instance
| * @return a TaskInstance, or null if there is no start task defined.
| */
| private TaskInstance createNewProcessInstance()
| {
| processInstance = jbpmContext.newProcessInstanceForUpdate("ForkTest");
| processInstanceId = processInstance.getId();
| contextInstance = processInstance.getContextInstance();
| // taskMgmtInstance = processInstance.getTaskMgmtInstance();
| return processInstance.getTaskMgmtInstance().createStartTaskInstance();
| }
|
| public void testFork()
| {
| TaskInstance taskInstance = createNewProcessInstance();
| contextInstance.setVariable("choice", "OptionB");
| assertEquals("startstate", processInstance.getRootToken().getNode().getName());
| processInstance.signal();
| jbpmContext.save(processInstance);
|
| newTransaction();
|
| assertEquals("optionb", processInstance.getRootToken().getNode().getName());
| processInstance.signal("ok");
| jbpmContext.save(processInstance);
|
| newTransaction();
|
| assertEquals("nextstate", processInstance.getRootToken().getNode().getName());
| processInstance.signal("ok");
| jbpmContext.save(processInstance);
|
| newTransaction();
|
| Token root = processInstance.getRootToken();
| root.signal();
| Map map = root.getChildren();
| assertEquals(2, map.size());
|
| Token tk_t10 = (Token) map.get("t10");
| assertEquals("t10", tk_t10.getName());
|
| ProcessInstance pi = tk_t10.getProcessInstance();
| assertEquals(processInstanceId, pi.getId());
| Node node = tk_t10.getNode();
| assertEquals("track", node.getName());
| tk_t10.signal("ok");
| jbpmContext.save(processInstance);
|
| newTransaction();
|
| root = processInstance.getRootToken();
| map = root.getChildren();
| assertEquals(2, map.size());
|
| tk_t10 = (Token) map.get("t10");
| assertEquals("t10", tk_t10.getName());
|
| Token tk_t20 = (Token) map.get("t20");
| assertEquals("t20", tk_t20.getName());
|
| pi = tk_t20.getProcessInstance();
| assertEquals(processInstanceId, pi.getId());
| node = tk_t20.getNode();
| assertEquals("proceed", node.getName());
|
| tk_t20.signal();
| assertEquals("fork2", processInstance.getRootToken().getNode().getName());
| }
|
| }
|
And the two handlers:
| /**
| *
| */
| package forktest;
|
| import org.jbpm.graph.def.ActionHandler;
| import org.jbpm.graph.def.Node;
| import org.jbpm.graph.exe.ExecutionContext;
| import org.jbpm.graph.exe.Token;
| import org.jbpm.graph.node.State;
|
| /**
| * @author pauls
| *
| */
| public class EnterStateHandler implements ActionHandler
| {
|
| /**
| *
| */
| private static final long serialVersionUID = 7209848070013962714L;
|
| /* (non-Javadoc)
| * @see org.jbpm.graph.def.ActionHandler#execute(org.jbpm.graph.exe.ExecutionContext)
| */
| public void execute(ExecutionContext executionContext) throws Exception
| {
| Node node = executionContext.getNode();
| Token token = executionContext.getToken();
| String tok_name = (token.getName() == null ? "root" : token.getName());
|
| if (token.isLocked())
| {
| System.out.println("Token: " + tok_name + " is locked");
| }
|
| if (token.isSuspended())
| {
| System.out.println("Token: " + tok_name + " is suspended");
| }
|
| if (node instanceof State)
| {
| System.out.println("Token: " + tok_name + " entering node: " + node.getName());
| }
| else
| {
| System.out.println("Node is: " + node.getName());
| }
| }
|
| }
|
|
| /**
| *
| */
| package forktest;
|
| import org.jbpm.graph.def.ActionHandler;
| import org.jbpm.graph.def.Node;
| import org.jbpm.graph.exe.ExecutionContext;
| import org.jbpm.graph.exe.Token;
| import org.jbpm.graph.node.State;
|
| /**
| * @author pauls
| *
| */
| public class ExitStateHandler implements ActionHandler
| {
|
| /**
| *
| */
| private static final long serialVersionUID = 7209848070013962714L;
|
| /* (non-Javadoc)
| * @see org.jbpm.graph.def.ActionHandler#execute(org.jbpm.graph.exe.ExecutionContext)
| */
| public void execute(ExecutionContext executionContext) throws Exception
| {
| Node node = executionContext.getNode();
| Token token = executionContext.getToken();
| String tok_name = (token.getName() == null ? "root" : token.getName());
|
| if (node instanceof State)
| {
| System.out.println("Token: " + tok_name + " leaving node: " + node.getName());
| }
| else
| {
| System.out.println("Node was: " + node.getName());
| }
| }
|
| }
|
| [/CODE]
|
| And on the console the output from the handlers:
| Node was: startstate
| | Token: root is locked
| | Node is: decisonnode
| | Node was: decisonnode
| | Token: root is locked
| | Token: root entering node: optionb
| | Token: root leaving node: optionb
| | Token: root is locked
| | Token: root entering node: nextstate
| | Token: root leaving node: nextstate
| | Token: root is locked
| | Node is: fork1
| | Node was: fork1
| | Token: t10 is locked
| | Token: t10 entering node: track
| | Node was: fork1
| | Token: t20 is locked
| | Token: t20 entering node: proceed
| | Node was: fork1
| | Token: root is locked
| | Token: root entering node: track
| |
And it's that last line that's bugging me.
View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4131031#4131031
Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4131031
18 years, 2 months
[JBoss Messaging] - Re: Message Pulling
by timfox
"sams" wrote : Thanks for the tip about trying things outside of an MDB. The simple consumer test worked and proved that the queues would pull messages around as needed to get the jobs done as quick as possible. When doing the same type of test with an MDB it would never do this. I spent hours tweaking little config files and I was about to give up and go to bed but finally a long shot hit me, and it worked...
|
| It seems that even though I'm adding messages using the ClusteredConnectionFactory, this only works with a standard consumer.
|
The connection factory you use for sending messages has no bearing on the connection factory used for consuming messages.
anonymous wrote :
| When you start using an MDB, it seems to step in the middle of things and redirect it all to the old ConnectionFactory instead of the clustered one.
|
There is no redirection occurring
anonymous wrote :
| I even have the @Clustered annotation in my MDB and that doesn't make it work correctly.
|
| The solution was to simply add the following attributes to the standard ConnectionFactory in the connections-factory-service.xml file:
|
| | <attribute name="SupportsFailover">true</attribute>
| | <attribute name="SupportsLoadBalancing">true</attribute>
| | <attribute name="PrefetchSize">5</attribute>
| |
|
You shouldn't just change the SupportsFailover or SupportsLoadBalancing attributes - MDBs should always consume from the local node.
As mentioned before prefetchSize is the parameter you want to change if you don't want to buffer so many messages. Since you have now reduced that to 5 that is why you are seeing the difference in behaviour.
anonymous wrote :
| Had to lower the PrefetchSize down to a small number instead of the large 150 or else they will fetch 150 at a time and not let them go. I'm considering setting this to 1 to ensure even distribution of messages. I have no idea what sort of penalty will be applied for having such a low prefetch, but I cant imagine it would be too bad. If someone knows, please enlighten us.
|
Consumer flow control works a bit like TCP flow control. The server has a certain number of tokens and continues sending messages as long as it has tokens, when it depletes its tokens it won't send any more. As messages are consumed more tokens are sent to the server (in chunks) so the server can send more. This prevents the consumer having to go to the server every time to get a message which would involve a network round trip (RTT) and be sloooow. This is a standard technique that all messaging systems (apart from jboss mq) I know deploy. Setting prefetchSize to 1 effectively means the consumer will go to the server each time to get a message.
So there is a trade-off. Depending on how fast your MDBs consume messages you may not notice a difference. You can only tell this by experimentation.
Thanks
View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4131028#4131028
Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4131028
18 years, 2 months