Saturday, April 12, 2008

OSWorkflow meets JMS

The title of this post is probably unfair, since the authors of OSWorkflow provide a built-in JMSMessage action to do Fire-and-Forget style calls to a JMS Queue or Topic. However, what I describe here takes that integration one step further, by marrying event-based OSWorkflow processing with Asynchronous with Callback style JMS calls using Apache ActiveMQ. My application needs to fire long running batch job which are dependent on each other. Rather than have a human operator fire them off in the right sequence, the idea is to build a workflow that captures these dependencies, then submit them asynchronously to a JMS queue. As each job complete, the JMS listener which executes these jobs on the other end sends callbacks to the workflow, which allow it to fire the next job in the dependency graph.

The workflow is fairly complex, it contains two splits and two corresponding joins, as shown below. See my previous post for a more thorough discussion of the workflow itself. I have reproduced the graph below for completeness below:

There are three main components in here. First the WorkflowRunner, which loads the OSWorkflow configuration for the workflow into memory. It is called once from an external client (probably a web user) to kick off the workflow. Each workflow step that is responsible for executing a batch job is tied to a custom JmsAction which writes a message to the request.topic Topic. At the far end of the Topics is a JmsServer component, which basically delegates off to Java processes representing each individual batch job. Once a job is complete, it writes a message to the response.topic Topic. This message is picked up by the correct JmsAction, which then publishes an event into the Spring ApplicationContext. The WorkflowRunner then picks this event up, and issues calls to process() recursively until the workflow is complete. The diagram below illustrates this flow.

I describe below each of the individual components. First, the OSWorkflow workflow definition file. Each of the steps are mapped to a JmsAction instance. This is the only difference from the previous week, when everything was mapped to a MockAction instance.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE workflow PUBLIC 
  "-//OpenSymphony Group//DTD OSWorkflow 2.8//EN"
  "http://www.opensymphony.com/osworkflow/workflow_2_8.dtd">
<workflow>
  <initial-actions>
    <action id="0" name="start">
      <pre-functions>
        <function type="class">
          <arg name="class.name">com.opensymphony.workflow.util.Caller</arg>
        </function>
        <function type="spring">
          <arg name="bean.name">jmsAction</arg>
          <arg name="action.name">t0</arg>
        </function>
      </pre-functions>
      <results>
        <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
          step="1"/>
      </results>
    </action>
  </initial-actions>
  <steps>
    <step id="1" name="p1">
      <actions>
        <action id="1" name="t1">
          <results>
            <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
              step="26"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t1</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="26" name="s26">
      <actions>
        <action id="26" name="split26">
          <results>
            <unconditional-result old-status="Finished" owner="${caller}" split="26"/>
          </results>
        </action>
      </actions>
    </step>
    <step id="2" name="p2">
      <actions>
        <action id="2" name="t2">
          <results>
            <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
              step="34"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t2</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="34" name="s34">
      <actions>
        <action id="34" name="split34">
          <results>
            <unconditional-result old-status="Finished" owner="${caller}" split="34"/>
          </results>
        </action>
      </actions>
    </step>
    <step id="3" name="p3">
      <actions>
        <action id="3" name="t3">
          <results>
            <unconditional-result old-status="Finished" owner="${caller}" join="43"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t3</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="4" name="p4">
      <actions>
        <action id="4" name="t4">
          <results>
            <unconditional-result old-status="Finished" owner="${caller}" join="43"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t4</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="5" name="p5">
      <actions>
        <action id="5" name="t5">
          <results>
            <unconditional-result old-status="Finished" owner="${caller}" join="75"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t5</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="6" name="p6">
      <actions>
        <action id="6" name="t6">
          <results>
            <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
              step="7"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t6</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="7" name="p7">
      <actions>
        <action id="7" name="t7">
          <results>
            <unconditional-result old-status="Finished" owner="${caller}" join="75"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t7</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="8" name="p8">
      <actions>
        <action id="8" name="t8">
          <results>
            <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
              step="9"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t8</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="9" name="stop">
      <actions>
        <action id="9" name="t9" finish="true">
          <results>
            <unconditional-result old-status="Finished" status="Complete" 
              owner="${caller}"/>
          </results>
        </action>
      </actions>
    </step>
  </steps>
  <splits>
    <split id="26">
      <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
        step="2"/>
      <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
        step="6"/>
    </split>
    <split id="34">
      <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
        step="3"/>
      <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
        step="4"/>
    </split>
  </splits>
  <joins>
    <join id="43">
      <conditions type="AND">
        <condition type="beanshell">
          <arg name="script"><![CDATA[
            "Finished".equals(jn.getStep(3).getStatus()) &&
            "Finished".equals(jn.getStep(4).getStatus())
          ]]>
          </arg>
        </condition>
      </conditions>
      <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
        step="5"/>
    </join>
    <join id="75">
      <conditions type="AND">
        <condition type="beanshell">
          <arg name="script"><![CDATA[
          "Finished".equals(jn.getStep(5).getStatus()) &&
          "Finished".equals(jn.getStep(7).getStatus())
          ]]>
          </arg>
        </condition>
      </conditions>
      <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
        step="8"/>
    </join>
  </joins>
</workflow>

Next up is the Spring applicationContext.xml file which ties this all together. It also contains some definitions for the JmsAction and JmsServer beans. This is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://www.springframework.org/schema/util 
       http://www.springframework.org/schema/util/spring-util-2.0.xsd">

  <bean class="org.springframework.beans.factory.annotation.RequiredAnnotationBeanPostProcessor"/>

  <!-- OSWorkflow -->
  <bean id="workflowStore" 
    class="com.opensymphony.workflow.spi.memory.MemoryWorkflowStore"/>
  
  <bean id="workflowFactory" 
      class="com.opensymphony.workflow.spi.hibernate.SpringWorkflowFactory" 
      init-method="init">
    <property name="resource" value="workflow-defs.xml"/>
    <property name="reload" value="true"/>    
  </bean>

  <bean id="workflowConfiguration" 
      class="com.opensymphony.workflow.config.SpringConfiguration">
    <property name="factory" ref="workflowFactory"/>
    <property name="store" ref="workflowStore"/>
  </bean>
  
  <bean id="workflowTypeResolver" class="com.opensymphony.workflow.util.SpringTypeResolver">
    <property name="functions">
      <map>
        <entry key="jmsAction"><ref bean="jmsAction"/></entry>
      </map>
    </property>
  </bean>

  <bean id="workflow" class="com.opensymphony.workflow.basic.BasicWorkflow" 
      scope="prototype">
    <constructor-arg><value>testuser</value></constructor-arg>
    <property name="configuration" ref="workflowConfiguration"/>
    <property name="resolver" ref="workflowTypeResolver"/>
  </bean>
    
  <bean id="workflowRunner" class="com.mycompany.myapp.workflow.WorkflowRunner" 
      scope="prototype">
    <property name="workflow" ref="workflow"/>
  </bean>
  
  <!-- JMS -->
  <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" 
      destroy-method="stop">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
          <value>tcp://localhost:61616</value>
        </property>
      </bean>
    </property>
  </bean>

  <bean id="jmsAction" class="com.mycompany.myapp.workflow.JmsAction" init-method="init" 
      destroy-method="destroy" scope="prototype">
    <property name="jmsConnectionFactory" ref="jmsConnectionFactory"/>
  </bean>

  <bean id="jmsServer" class="com.mycompany.myapp.workflow.JmsServer">
    <property name="jmsConnectionFactory" ref="jmsConnectionFactory"/>
  </bean>
    
</beans>

As mentioned above, on the OSWorkflow side, we have a WorkflowRunner component. It implements ApplicationListener so it can respond to callbacks from the JmsAction components. Moving through the workflow takes place in the process() method. The onApplicationEvent() recursively calls the process() method based on events received from the JmsAction instances.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// WorkflowRunner.java
package com.mycompany.myapp.workflow;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;

import com.opensymphony.workflow.Workflow;
import com.opensymphony.workflow.WorkflowException;
import com.opensymphony.workflow.spi.Step;

/**
 * This class is called once by the client code, then the events that are
 * sent back from the Actions in the Workflow will move the workflow forward
 * until it ends.
 */
public class WorkflowRunner implements ApplicationListener {

  private final Log log = LogFactory.getLog(getClass());
  
  private String workflowName;
  private Workflow workflow;
  private Map<String,Object> inputs;
  
  private Set<Integer> alreadyExecuted = new HashSet<Integer>();
  
  @Required
  public void setWorkflow(Workflow workflow) {
    this.workflow = workflow;
  }
  
  public void setWorkflowName(String workflowName) {
    this.workflowName = workflowName;
  }
  
  public void setInputs(Map<String,Object> inputs) {
    this.inputs = inputs;
  }
  
  public long init(int initialActionId) throws Exception {
    long workflowId = workflow.initialize(workflowName, initialActionId, inputs);
    return workflowId;
  }
  
  @SuppressWarnings({"unused","unchecked"})
  public void process(long workflowId) {
    List<Step> currentSteps = workflow.getCurrentSteps(workflowId);
    for (Step currentStep : currentSteps) {
      int[] availableActions = workflow.getAvailableActions(workflowId, inputs);
      for (int availableAction : availableActions) {
        if (alreadyExecuted.contains(availableAction)) {
          continue;
        }
        try {
          alreadyExecuted.add(availableAction);
          System.out.println("Sending action.id=" + availableAction + " to JmsServer");
          workflow.doAction(workflowId, availableAction, inputs);
        } catch (WorkflowException e) {
          log.error("Exception in (workflow,action.id)=(" + workflowName + "," + 
            availableAction + "). Workflow stopped", e);
        }
      }
    }
  }
  
  public void onApplicationEvent(ApplicationEvent event) {
    if (event.getSource() instanceof Long) {
      process((Long) event.getSource());
    }
  }
}

Not too many surprises so far for people who have read my previous blog post. The JmsAction is our bridge between OSWorkflow and JMS, so it has to implement multiple interfaces. The FunctionProvider interface is so it can be injected as a function into the workflow XML configuration. ApplicationContextAware is so it can publish events back into Spring's context where it can be picked up by WorkflowRunner, and MessageListener is so it can listen on JMS events. The code is shown below and is fairly self-explanatory, comments are inlined where I felt more explanation may be helpful.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// JmsAction.java
package com.mycompany.myapp.workflow;

import java.util.Map;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;

import com.opensymphony.module.propertyset.PropertySet;
import com.opensymphony.workflow.FunctionProvider;
import com.opensymphony.workflow.WorkflowException;
import com.opensymphony.workflow.spi.WorkflowEntry;

/**
 * Publishes a job request to a JMS queue. Listens for a callback from the
 * JMS consumer module and passes the event back to the WorkflowRunner event
 * listener to move the workflow forward.
 */
public class JmsAction implements FunctionProvider, ApplicationContextAware, MessageListener {

  private final Log log = LogFactory.getLog(getClass());
  
  private ConnectionFactory jmsConnectionFactory;
  private String actionName;
  private ApplicationContext applicationContext;
  
  private Connection connection;
  private Session session;
  private MessageProducer messageProducer;
  private MessageConsumer messageConsumer;

  @Required
  public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory) {
    this.jmsConnectionFactory = jmsConnectionFactory; 
  }

  public void setActionName(String actionName) {
    this.actionName = actionName;
  }

  /**
   * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
   */
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
  }

  /**
   * @see com.opensymphony.workflow.FunctionProvider#execute(java.util.Map, java.util.Map, com.opensymphony.module.propertyset.PropertySet)
   */
  @SuppressWarnings("unchecked")
  public void execute(Map transientVars, Map args, PropertySet ps) throws WorkflowException {
    setActionName((String) args.get("action.name"));
    WorkflowEntry workflowEntry = (WorkflowEntry) transientVars.get("entry");
    final long workflowId = workflowEntry.getId();
    try {
      MapMessage message = session.createMapMessage();
      message.setString("action.name", actionName);
      message.setString("workflow.id", String.valueOf(workflowId));
      message.setString("topic.name", "request.topic");
      messageProducer.send(message);
    } catch (JMSException e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * Listens for a text message back from the JmsSubscriber module about job
   * completion. The text is formatted, contains Success or Failure, followed
   * by workflowId as a comma-separated list.
   * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
   */
  @SuppressWarnings("serial")
  public void onMessage(Message message) {
    if (MapMessage.class.isInstance(message)) {
      MapMessage mapMessage = MapMessage.class.cast(message);
      try {
        String actionName = mapMessage.getString("action.name");
        if (actionName.equals(this.actionName)) {
          // only respond to events meant for this Action
          String status = mapMessage.getString("status");
          String workflowId = mapMessage.getString("workflow.id");
          if (status.equals("Success")) {
            applicationContext.publishEvent(new ApplicationEvent(new Long(workflowId)) {});
          } else {
            log.error("Action " + actionName + " failed, see server error log for details");
          }
        }
      } catch (JMSException e) {
        throw new RuntimeException(e);
      }
    }
  }
  
  protected void init() throws Exception {
    connection = jmsConnectionFactory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic requestTopic = session.createTopic("request.topic");
    Topic responseTopic = session.createTopic("response.topic");
    messageProducer = session.createProducer(requestTopic);
    messageConsumer = session.createConsumer(responseTopic);
    messageConsumer.setMessageListener(this);
    connection.start();
  }
  
  protected void destroy() throws Exception {
    session.close();
    connection.close();
  }
}

I started off using Spring's JmsTemplate because I wanted to learn how to use it, but gave up when I could not find a clean way of registering a JmsAction as a listener. If any of you have used JmsTemplate for beans which are both publishers and subscribers, please let me know. Example code (or links to example code) would be greatly appreciated.

One more thing to note is that unlike the WorkflowRunner, which will have only a single instance per workflow, there will be many instances of JmsAction for a given workflow. We specify that its scope is prototype (built every time it is accessed from the Spring context), and that its lifecycle methods are init() and destroy(), all in the Spring configuration above.

Our final component is the JmsServer. All this does currently is to print that it is "executing" something to stdout. In a real application, it would start another Java batch process and wait for it to complete before sending the callback. The code is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// JmsServer.java
package com.mycompany.myapp.workflow;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

/**
 * Listens for JMS requests, services them, and returns a callback
 * when the job is completed.
 */
public class JmsServer implements MessageListener {

  private ConnectionFactory jmsConnectionFactory;
  
  private Connection connection;
  private Session session;
  private MessageProducer messageProducer;
  private MessageConsumer messageConsumer;

  public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory) {
    this.jmsConnectionFactory = jmsConnectionFactory;
  }
  
  /**
   * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
   */
  public void onMessage(Message message) {
    if (! MapMessage.class.isInstance(message)) {
      return;
    }
    try {
      MapMessage mapMessage = MapMessage.class.cast(message);
      final String actionName = mapMessage.getString("action.name");
      final String workflowId = mapMessage.getString("workflow.id");
      // this is where we will delegate to some kind of executor in a real app
      System.out.println("Executing job:" + actionName);
      // send the callback after the job is done
      MapMessage responseMessage = session.createMapMessage();
      responseMessage.setString("action.name", actionName);
      responseMessage.setString("workflow.id", workflowId);
      responseMessage.setString("topic.name", "response.topic");
      responseMessage.setString("status", "Success");
      System.out.println("Sending callback from server");
      messageProducer.send(responseMessage);
    } catch (JMSException e) {
      throw new RuntimeException(e);
    }
  }
  
  protected void init() throws Exception {
    System.out.println("Initializing server");
    connection = jmsConnectionFactory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic responseTopic = session.createTopic("request.topic");
    Topic requestTopic = session.createTopic("response.topic");
    messageProducer = session.createProducer(requestTopic);
    messageConsumer = session.createConsumer(responseTopic);
    messageConsumer.setMessageListener(this);
    System.out.println("Server started");
    connection.start();
  }
  
  protected void destroy() throws Exception {
    System.out.println("Shutting down server");
    session.close();
    connection.close();
    System.out.println("Done");
  }
  
  public void run() throws Exception {
    init();
    try {
      for (;;) {
        Thread.sleep(500L);
      }
    } finally {
      destroy();
    }
  }
}

To test this, we start off the following two unit tests which run forever until they are manually stopped by a Ctrl-C at the command line. The reason I had to do this instead of packaging the whole thing into a single JUnit test as I had done before, was because I noticed that the unit test completed before the workflow had a chance to complete. Since the whole thing is event driven after the first call, there is no way to keep the test running until the workflow is complete. In real-life, this is not a problem because these are likely to be long-lived server processes anyway.

This JUnit test starts the JmsServer end of the setup and loops forever, sleeping for 0.5s between loops. I run this in one console window.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// JmsServerTest.java
package com.mycompany.myapp.workflow;

import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class JmsServerTest {

  private JmsServer jmsServer;
  
  @Before
  public void setUp() throws Exception {
     ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
     jmsServer = (JmsServer) context.getBean("jmsServer");
  }
  
  @Test
  public void runJmsServer() throws Exception {
    jmsServer.run();
  }
}

This JUnit test starts the OSWorkflow end of the setup and after the first call to process(), also loops forever. I run this in a second console window.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// WorkflowRunnerWithJmsActionsTest.java
package com.mycompany.myapp.workflow;

import java.util.HashMap;
import java.util.Map;

import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class WorkflowRunnerWithJmsActionsTest {

  private WorkflowRunner runner;
  private Map<String,Object> inputs = new HashMap<String,Object>();
  private long workflowId;

  @Before
  public void setUp() throws Exception {
    ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); 
    runner = (WorkflowRunner) context.getBean("workflowRunner");
    runner.setWorkflowName("workflow-def-1");
    runner.setInputs(inputs);
    workflowId = runner.init(0);
  }
  
  @Test
  public void runWorkflow() throws Exception {
    runner.process(workflowId);
    for (;;) {
      Thread.sleep(500L);
    }
  }
}

Running these produces the following output on the client end, and a similar output on the server end. Looking at the output, it is clear that the setup works, ie the output is what you would expect given the dependencies designed into the workflow.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Sending action.id=1 to JmsServer
Sending action.id=26 to JmsServer
Sending action.id=2 to JmsServer
Sending action.id=6 to JmsServer
Sending action.id=34 to JmsServer
Sending action.id=7 to JmsServer
Sending action.id=3 to JmsServer
Sending action.id=4 to JmsServer
Sending action.id=5 to JmsServer
Sending action.id=8 to JmsServer
Sending action.id=9 to JmsServer

A nice tool that helped me debug my code is ActiveMQ's web-based administrator interface. Here you can see how many messages were received and sent on each of the topics. You can delete messages off the queue, which is very helpful after unsuccessful runs. It is available at 0.0.0.0:8161 of the machine where ApacheMQ is started.

So there you have it folks. A simple, asynchronous, event-driven way to manage dependencies among multiple batch jobs.

2 comments (moderated to prevent spam):

Anonymous said...

Hi, very nice article!

I would like to try to run your example, but I am new to all this. Is it possible for you to make the whole code available (with ant, or maven script) so that it can be built and tested?

Thanks a lot

Sujit Pal said...

Hi, thanks for the compliment.

To answer your question, sorry, no I cannot, because I don't have the code anymore. However, to run the examples, just do the following.
1) Create a maven project with archetype:create.
2) Create src/main/resources and copy over the applicationContext.xml and the osworkflow.xml file into it.
3) Copy over the Java source classes into src/main/java, creating any necessary packages (from the package declaration).
4) Copy over the Java test files into src/test/java, as above wrt package names.
5) Create a pom.xml (or ant build.xml) if that makes more sense) with necessary library dependencies.
6) To run the test, use mvn test -Dtest=JunitTestShortClassName.