Data Hub Flows Fundamentals
Data Hub Flows Fundamentals
Data Hub simplifies building a data hub application on MarkLogic by allowing users to define models representing the entities in their application, and then enabling users to process data within MarkLogic based on those models. This guide covers how Data Hub supports data processing within MarkLogic.
Steps, Flows, and Jobs Overview
Steps: The core construct for data processing in Data Hub is a "step". A step defines a batch process to be performed on data within MarkLogic, or on data being ingested into MarkLogic. The key parts of a step are: A MarkLogic query for selecting items to process, such as documents or values from an index; A MarkLogic module for processing each of the selected items; configuration for persisting documents returned by the MarkLogic module. Processing data in Data Hub is thus achieved by defining a step, and then running it via a "flow", as described next.
Flows: In a data hub scenario, it is common for a series of batch processes to be performed on data, either when it is ingested, or on a scheduled basis. In Data Hub, a "flow" defines a sequence of steps, with each step having its own query, step module, and configuration for persisting documents.
For Data Hub 4 users, it is important to note that the definition of a "flow" differs in Data Hub 5 and later. In Data Hub 4, a "flow" defined a batch process, while in Data Hub 5 and later, a "step" defines a batch process, and a "flow" defines a sequence of steps.
The primary value of a flow is that it allows for a step to be run; a step may not be run by itself. Thus, to run the batch process defined by a step, a user must run a flow that contains the step. Because a flow can contain many steps, a user can also choose to run all or a subset of the steps contained by the flow.
Jobs: When a user runs a flow, Data Hub creates a "job" to capture the execution of the flow and the steps within it. A job is saved by Data Hub as a JSON document in the jobs database.
When a step is run, Data Hub will create one or more "batches" based on the number of items selected for processing by the step. For example, if a step query selects 1 million documents to process, and a step is configured with a batch size of 100, then Data Hub will create 10,000 batches to be processed by the step. For each batch, a JSON document will be written to the jobs database capturing details about the execution of that batch against the given step. Together, job and batch documents provide information about the data processing activities occurring within a Data Hub application.
A step is defined via a JSON file in a user's project. Steps can be created and modified via Hub Central and Gradle, but ultimately, steps live as JSON files in a user's project so that they can be managed easily via a version control system. When a Data Hub project is deployed, the step JSON files are written to both the staging and final databases.
To simplify the definition of a step and avoid repeating some configuration, a step must be associated with a "step definition". A step definition defines a module - the "step module" - that processes selected items, and may optionally define other properties that the step module depends on.
Data Hub includes several out-of-the-box step definitions for common data hub use cases:
- A mapping step definition for mapping source documents to entity model instances.
- A matching step definition for matching entity instances based on configurable queries.
- A merging step definition for merging entity instances based on the results of a matching step.
- An ingestion step definition for processing data that is ingested via a tool like MarkLogic Content Pump (MLCP) or an external tool such as Spark or Kafka Users may also define their own step definitions, referred to as "custom step definitions". A custom step definition allows a user to write any step module they desire, which can perform any desired functionality on the items selected for processing. Like steps, custom step definitions are also defined via JSON files in a user's project.
Users may also define their own step definitions, referred to as "custom step definitions". A custom step definition allows a user to write any step module they desire, which can perform any desired functionality on the items selected for processing. Like steps, custom step definitions are also defined via JSON files in a user's project.
Step inputs and outputs
A step module is required to be an SJS library module that exports a function named "main" with the following signature (see the section on creating a custom step for more details). When the step module is invoked, Data Hub will pass it one or more content objects to process based on the value of the "acceptsBatch" step property. The step module can then return any number of content objects, with an array being returned if multiple content objects must be returned.
The concept of a "content object" is critical to how steps works. A content object is a JSON object that is used as both the input and the output, but how it is used differs based on whether it is being used as the input to a step or the output from a step.
Content objects as step inputs
When Data Hub runs a step, it will use the sourceQuery property to run a query that finds items to process. For each item found, Data Hub constructs a content object that has following properties:
- uri = As an input to a step, the "uri" property identifies the item being processed. If the sourceQuery returned URIs, then this will be an actual URI. But if sourceQueryIsScript is true, then this can be any value.
- value = If "uri" is an actual URI, then this will be the document associated with the URI
- context = a JSON object with the following properties; this will only be populated if sourceQueryIsScript is not true:
- metadata = a JSON object that captures the document metadata keys and values associated with the URI
- permissions = an array of permissions based on one of the following: if the "permissions" step property is defined, then this will contain permissions corresponding to the roles and capabilities in the "permission" property value, else, the permissions currently associated with the URI
- originalCollections = an array of collections currently associated with the URI
Content objects as step outputs
A step's main function can return zero to many content objects. In this context, a content object will have the following properties:
- uri = As an output to the step, this defines the URI of the document that will be inserted into the step's target database
- value = The document to be inserted into the step's target database
- context = an optional JSON object with the following properties (each is optional):
- metadata = a JSON object defining the metadata keys and values to associate with the URI
- permissions = an array of permissions based on one of the following:
- permissions = array of permissions to associate with the URI
- collections = array of collections to associate with the URI
- quality = the quality of the document to be inserted
Common step properties
|acceptsBatch||boolean||No||Defaults to false; if true, then the step module's main function is passed the array of content objects that comprises the entire batch; else, the main function is called multiple times, with a single content object being passed to it each time|
|additionalCollections||array of strings||No||Combined with the value of "collections" below to comprise the collections to be added to documents returned by a step module before the documents are inserted|
|batchSize||positive integer||No||Defines the number of items to be processed in a batch; defaults to 100|
|collections||array of strings||No||Collections to be added to content objects returned by a step module before the documents within those content objects are inserted|
|constrainSourceQueryToJob||boolean||No||Defaults to false; if true, then the query defined by the sourceQuery property will be constrained to only select documents that have also been processed already by the current job|
|customHook||object||No||See Custom Hooks.|
|description||string||No||For documentation purposes only; has no impact on any part of Data Hub|
|enableBatchOutput||string||No||If "never", then a Batch document will never be created when the step is run; if "onFailure", then a Batch document is created only if an error occurs for a batch when the step is run; else, a Batch document is created for the step, unless "disableJobOutput" is set to "true" as a flow option or a runtime option|
|enableExcludeAlreadyProcessed||boolean||No||Defaults to false; if true, then Batch documents written when this step is executed will contain data that supports usage of the excludeAlreadyProcessed property|
|excludeAlreadyProcessed||boolean||No||Defaults to false; if true, then items returned by the step's sourceQuery that have already been processed by the step before will not be processed again|
|interceptors||array of objects||No||See Step Interceptors.|
|name||string||Yes||Name of the step; used to construct stepId to generate a unique identifier for the step|
|permissions||string||No||Permissions that are added to each input content object (but are not applied to the content objects returned by a step); comma-delimited with a format of "role1,capability1,role2,capability2,etc"|
|provenanceGranularityLevel||string||No||Supports values of "off" (no provenance is generated); "coarse" (only record-level provenance is generated); and "fine" (additional provenance may be generated based on the step type)|
|sourceDatabase||string||Yes *||The database from which items are found to be processed|
|stepDefinitionName||string||Yes||Name of the associated step definition|
|stepDefinitionType||string||Yes||Type of the associated step definition|
|stepId||string||Yes||Unique identifier for the step; value is always "(stepDefinitionName)-(stepDefinitionType)"|
|stepUpdate||boolean||No||Defaults to false; if set to true, then the step module is executed in a separate transaction where the step module is able to insert, update, and delete data|
|targetCollectionsAdditivity||boolean||No||Defaults to false; if set to true, then for any content object returned by a step that was also as input content object, its original collections will be retained|
|targetDatabase||string||Yes||The database to which documents returned by a step module will be inserted|
|targetEntityType||string||No||If specified, then Data Hub will group steps based on its value when displaying them in Hub Central; the value also impacts how a step runs in a very step-specific way|
|threadCount||positive integer||No||Defines the number of threads to use when running the step; defaults to 4|
|writeStepOutput||boolean||No||Defaults to true; if set to false, then the content objects outputted by a step will not be persisted; typically only useful when running multiple steps on ingest|
Step properties defined as options
Starting in Data Hub 5.4, steps can live outside of flows in their own JSON files in a Data Hub project. Prior to Data Hub 5.4, steps could only live inside flows in JSON files in the "flows" directory in a Data Hub project. In order to use Hub Central starting with Data Hub 5.4, it is necessary to migrate flows and steps to the new format where steps live outside of flow files.
If you do not migrate your project and continue to have steps live inside of flows, an important distinction with these "inline" steps is that a number of the common properties listed above are nested in an "options" JSON object within the inline step. The following list defines the properties above that are not defined within this "options" object:
Controlling what a step processes>
Selecting any items to process
Or you could import libraries and execute any code you wish that returns a sequence of items. As long as the value of "sourceQuery" is a valid argument to MarkLogic's "xdmp.eval" function, then your input will work.
Selecting only items that have been processed by the current job
A common scenario in production is that a flow may ingest new documents to a collection that already contains documents. And the next step may have a source query that constrains on the collection, but it's already processed all of the existing documents and it would be inefficient to do so again. Thus, the user wants the next step to only process the documents that were ingested and/or processed by the previous step within the same flow execution - i.e. job.
In this scenario, the following property can be set, either in step options or in runtime options (most likely, it will be set in runtime options):
At an implementation level, this will result in Data Hub combining the step's sourceQuery with a query on the Data Hub metadata field named "datahubCreatedByJob" where the value of that field equals the currently running job identifier.
Excluding items that have already been successfully processed
When a step runs and fails on one or more items, but not all items, it is often desirable to run the step again on any items that match the step's source query but have not yet been successfully processed. To enable this, a user must configure two options on a step.
First, before the step is run the first time, a step must have the following property in it:
When a step is run with this property set to true, each Batch document written to the jobs database will contain additional data to capture the items that were successfully processed. Because this approach involves adding data, a user must set the above option so that users that do not need this feature do not incur any overhead from this data being captured and stored.
When the step is run again with a goal of excluding items that have already been processed successfully, the step must have the following property in it:
With that property set, any items that have already been processed successfully by the step will not be processed again.
Customizing input and output
Data Hub includes several step definitions intended for reuse within a Data Hub application, such as for mapping and matching. Often, you can reuse an step definition in a step, but you may need to make modifications to the content objects being processed by a step and/or the returned by a step.
Data Hub 5.4 introduced step interceptors to allow a user to modify content objects returned by a step before they are persisted, and Data Hub 5.5 and later builds on this by allowing a user to modify content objects before the step module's "main" function processes them. Step interceptors are intended to allow a user to reuse step definitions when some custom code is still required.
Interceptors are defined in a step as an array of JSON objects. A step can have zero to many interceptors. Each interceptor has the following properties:
|path||string||Required; path of the module that will be invoked|
|when||string||Required; identifiers when the interceptor will be invoked. Allowable values are "beforeMain" and "beforeContentPersisted".|
|vars||object||Optional JSON object defining variable names and values to be passed to the interceptor|
The module identified by the "path" property will be invoked via MarkLogic's xdmp.invoke function https://docs.marklogic.com/xdmp.invoke with the default arguments for options. The module will be passed two arguments:
- contentArray = the array of content objects
- When the "when" property is "beforeMain", this will be the array of content objects that will be passed to the "main" function of the step module
- When the "when" property is "beforeContentPersisted", this will be the array of content objects returned by the "main" function of the step module
- options = the set of combined options from the step definition, flow, step, and runtime options
For a working example of step interceptors along with more details, please see https://github.com/marklogic/marklogic-data-hub/tree/master/examples/step-interceptors.
Custom hooks were introduced in Data Hub 5.0 as a technique for running custom code in an update transaction, either before or after a step's "main" function is invoked. However, custom hooks were not designed to allow for the custom code to reliably modify the content objects that a step had processed. Due to this, step interceptors should be preferred for modifying the array of content objects.
For a working example of a custom hook, please see https://github.com/marklogic/marklogic-data-hub/tree/master/examples/dhf5-custom-hook.
Creating a custom step
Data Hub includes several step definitions, but for many scenarios, these step definitions will not suffice for the requirements that need to be supported. A user can then create a custom step, which consists of two parts, each with its own JSON file:
- A custom step definition
- A custom step that references the custom step definition
The easiest way to create a custom step is via the Gradle task "hubCreateStep":
./gradlew hubCreateStep -PstepType=custom -PstepName=myStep
This will result in the following files being added to your project:
Note that if you want a different name for your step definition and step, you can do so by specifying an additional parameter:
./gradlew hubCreateStep -PstepType=custom -PstepName=myStep -PstepDefName=myStepDef
At this point, you can start customizing the code in the generated module. It is recommend to run the following task, so that as you change the module, it is loaded into your modules database:
Running the above task allows you to immediately test your changes.
The generated step module has numerous comments within it to explain how to customize its code. You are free to move all of the comments and all of the code within the "main" function and start from scratch if you wish.
A flow is defined via a JSON file in a user's project in the "./flows" directory. A flow defines one or more steps that will be run when the flow itself is run.
|name||string||Yes||Unique name for the flow|
|description||string||No||Description of the flow|
|stopOnError||boolean||No||Defaults to false; if set to true, then if an error occurs within a step, the step will be stopped and no more steps will be run|
|batchSize||integer||No||Defaults to 100; defines the number of items to be processed by each step in a batch; can be overridden by the same property for a step|
|threadCount||integer||No||Defaults to 4; defines the number of threads to be used to process batches of items for a step; can be overridden by the same property for a step|
|options||object||No||JSON object that defines options both specific to a flow and that can be used to affect step behavior|
|steps||object||Yes||Defines one or many steps within the flow|
Flow step formats
Prior to Data Hub 5.4, the "steps" object in a flow always contained the step properties. In effect, each step belonged to the flow that contained it and could not be reused elsewhere. The steps object would thus look like this:
Starting in Data Hub 5.4, steps can now be managed outside the context of flows. This allows, for example, for a Hub Central user to start working on a new mapping step without having to think upfront about what flow it will belong to. Additionally, a step can be referenced by multiple flows. These steps are considered to be "referenced steps", while the previous (and still supported) format uses "inline steps" (can point the user to the section above in Managing Steps that discusses the differences in how a step is defined between the two formats).
When using referenced steps, each step only requires a stepId to be provided:
Data Hub supports both flow formats.
As shown above, a flow may have an optional "options" JSON object. This options object has two purposes:
- To affect how a flow is run
- To be combined with step definition options, step options, and runtime options as the "options" argument passed to a step module
As of Data Hub 5.5, the only option value that affects how a flow is run is "disableJobOutput", which can also be specified via runtime options.
Running a flow
Once a flow has one or more steps, it can be run via a variety of Data Hub interfaces. First though, a user should consider how the steps in a flow should be run, which is typically based on the data processing scenario.
Prior to Data Hub 5.5, the primary scenario supported by Data Hub was a traditional batch process that involved loading data into MarkLogic - often from files, via MLCP - and then reprocessing it via a sequence of steps. The reprocessing of data in MarkLogic could occur immediately after new data was ingested, or it could be done on a scheduled basis, with new data being loaded throughout the day. Regardless of the scheduling, the following characterize how Data Hub supports this scenario:
- Each step is responsible for finding its items to process and then processing them and persisting the results to MarkLogic; there is no concept of the output of a step becoming the input for the next step
- A step does not begin until all of the items have been processed by the previous step
- A tool external to MarkLogic is required in order to manage the collection of items to process, breaking those items into small batches, and making a call to MarkLogic for each batch
The execution of a flow by Data Hub supports this traditional batch processing scenario well. But another important scenario for a data hub application to support involves new records that arrive throughout the day and must be immediately processed, ideally via one call to MarkLogic. A user does not want to wait for other records to arrive before processing a new record. Instead, the user wants to process the record - which may involve executing multiple steps - as quickly as possible via one call to MarkLogic. In addition, instead of each step having to find items to process, the user wants the input for the first step to be the new record, and the input for every other step should be the output of the previous step.
The above scenario is often referred to as "real time processing" or "event-based processing" - i.e. the record needs to be processed as soon as it is received without any concept of scheduling. Data Hub 5.5 and later supports this via the following:
- A new REST extension that can receive JSON or XML data and process it via multiple steps defined in a flow
- The Data Hub transform for MLCP now supports running multiple steps in a flow on each document created by MLCP
Thus, when selecting a Data Hub interface for running a flow, the user should first identify which scenario they must support - batch, or real time. Then, the user must choose one of the following interfaces in Data Hub:
- For batch scenarios: Data Hub client JAR;
- For real time scenarios: Data Hub REST extension; Data Hub transform for MLCP
The following table provides guidance on when to use each interface:
|Interface||Scenario||When to Use||When Not to Use|
|Data Hub Gradle plugin||Batch||
Best used when developing a Data Hub application, particularly in a local development environment. In such an environment, Gradle is likely already available and being used for deploying the Data Hub application to MarkLogic.
Gradle can also be a good choice when using a tool like Jenkins to build and deploy a Data Hub application, as Jenkins is likely using Gradle to perform those tasks.
|Gradle is often not available in production environments, and is thus not a good fit there. Most of the time, the client JAR will be a better choice than Gradle for running flows in a production environment.|
|Data Hub Client JAR||Batch||The client JAR is primarily intended for running flows in a production environment, or generally anywhere that Gradle is not available.||If Gradle is available while developing a Data Hub application, it will typically be easier to use Gradle than to use the client JAR.|
|Data Hub REST extension||Real time||Use when integrating external systems to a Data Hub application, where the external system will send data to MarkLogic via HTTP||When the data to be ingested and processed immediately is in files on disk|
|Data Hub transform for MLCP||Real time||Use then ingesting files on disk and processing them immediately via multiple steps||When the data to be ingested and processed immediately is not in files on disk|
Differences between batch and real-time flows
There are important differences to understand when choosing to run a real-time flow.
With a real-time flow, the user provides the input for the first step, whether that's via the REST extension or via MLCP. For every subsequent step, the input is the output from the previous step. The sourceQuery of every step is thus ignored, along with any properties that affect how the sourceQuery is executed.
Writing step outputs
Steps in a real time flow can choose whether to write their output to the database or not; in a batch flow, the output of a step is always written to the database. The "writeStepOutput" step property is used to control whether or not the content objects outputted by a step are written to the database identified by the step's "targetDatabase" property.
Writes to multiple databases
After all steps have been completed, Data Hub may write data to multiple databases, including the jobs database. For example, if one step in a flow writes to the staging database and another step writes to the final database, Data Hub will execute one transaction for each of the two databases in order to write data. Furthermore, any provenance and job data generated while running the flow will be written to the jobs database, with provenance and job data being written in separate transactions.
In a batch flow, a Batch document captures the results of running a single step against the batch of items. In a real time flow, where a single batch is processed against multiple steps, a Batch document will have the results for multiple steps. These are captured in a "stepResults" array within the Batch document.
When a flow is run, regardless of the interface, the user may provide runtime options that affect how the flow and its steps are run. The runtime options are defined as a JSON object and are combined with other sources of options based on the following order of precedence:
- Runtime options
- Step options
- Flow options
- Step definition options
When running multiple steps, a user may wish to define step-specific runtime options. A user can do this via the following JSON within the options JSON object:
In the example above, the user is defining step-specific options for the steps associated with step numbers 1 and 3.
Interfaces for running a flow
Data Hub Gradle plugin
Data Hub Client Jar
Data Hub REST extension
The Data Hub REST extension is a standard MarkLogic REST extension (see https://docs.marklogic.com/guide/rest-dev/extensions#id_59188 ) named "hubFlowRunner" and can be accessed at "/v1/resources/hubFlowRunner". It only accepts POST requests, where the inputs for defining the flow to run and the data to process are defined via the HTTP request body. The request body may be defined as either JSON or XML; typically, the choice is based on the format of data being submitted for processing.
For either JSON or XML, the following inputs can be provided:
|flowName||string||Yes||Name of the flow to run|
|steps||array of strings for JSON; comma-delimited string for XML||No||Step numbers to run, in case the user does not wish to run all of the steps in the flow|
|jobId||string||No||User-provided job identifier|
|options||JSON object for JSON; element for XML||No||Runtime options to affect flow and step behavior|
|content||array of objects for JSON; element for XML that can occur multiple times||Yes||The data to process; each content object/element must specify a "uri" value, and optionally a "value" object/element as well. The user may also specify a "context" object/element based on the content schema defined in the "Manage Steps" section above.|
The following shows an example JSON request body with all of the inputs included; this is based on the example reference-entity-model project:
"steps": ["1", "2"],
A similar XML representation of the request body above is shown below:
Data Hub transform for MLCP
Prior to Data Hub 5.5, the Data Hub transform for MLCP only allowed for a user to reference an ingestion step when ingesting data via MLCP. This behavior is documented at Ingest Using MLCP. This is technically not running a flow, but rather running the code in the ingestion step against each documented produced by MLCP.
Starting in Data Hub 5.5, the Data Hub transform for MLCP can now run a real time flow simply by specifying multiple steps to run. Instead of specifying "step" as part of the "transform_param" value, the user will specify "steps" with a value of a semicolon-delimited list of step numbers. For example, using the reference-entity-model example project again, a user can provide the following input to MLCP to run a real time flow with multiple steps:
Document metadata added by steps
For every content object outputted by a Data Hub step, regardless of the step type, Data Hub will add the following document metadata keys and values to the document wrapped by the content object:
- datahubCreatedOn = the date and time at which the document is written
- datahubCreatedBy = the MarkLogic user used to run the step
- datahubCreatedInFlow = the name of the flow containing the step being run
- datahubCreatedByStep = the name of the step being run
- datahubCreatedByJob = the ID of the job being run; this will contain the job ID of every flow run on the step, with multiple values being space-delimited
Trace events for debugging
Data Hub provides the following trace events to assist with debugging flows and steps:
Trace events can be enabled via the Group page in the MarkLogic Admin interface.
Transforming data on ingest
Data Hub ingestion steps can be referenced when ingesting data via the MarkLogic REST API or via MLCP. Technically, the step - and the flow it belongs to - is not "run" here, but rather it is "referenced" so that its code can be invoked as part of the REST or MLCP ingestion process.
The execution of a flow in Data Hub is referred to as a "job". Unless disabled, Data Hub will create the following data in its job database as it runs a flow:
- A Job document that captures a summary of run the flow and each of its steps
- A Batch document for every batch of items processed by each step
The details of each document are captured at https://github.com/marklogic/marklogic-data-hub/blob/master/specs/models/Job.schema.json and https://github.com/marklogic/marklogic-data-hub/blob/master/specs/models/Batch.schema.json.
Disabling jobs data
Job data can be very helpful for understanding when flows have been run and what items have been processed successfully by steps. However, if this data is not useful for a Data Hub application, it can be disabled either as a flow option or as a runtime option:
In some scenarios, it may be useful for a Job document to be created when a flow is run, but not for the Batch documents to be created. By default, if Job documents are created, then Batch documents will be created as well. In Data Hub 5.5 and later, this behavior can now be customized via the new step-level "enableBatchOutput" property. This property supports the following values:
|never||Batch documents will never be created|
|onFailure||If at least one item in a batch fails processing, a Batch document will be created for the batch|
If enableBatchOutput has any other value, then Batch documents will always be created.
Error handling in jobs data
When one or more items fail processing by a step, then the first error that occurs will be saved to the Batch document.
Within the Job document, each step response has a "stepOutput" property. This will contain up to 10 error messages that occurred across all batches processed by the step.
What "coarse" does vs “fine”:
- "coarse" = generic record-level provenance, regardless of the step type, which is based on the output of the step
- "fine" = step-specific provenance