MarkLogic Connector for Hadoop Developer's Guide (PDF)

MarkLogic 9 Product Documentation
MarkLogic Connector for Hadoop Developer's Guide
— Chapter 5

« Previous chapter
Next chapter »

Using MarkLogic Server for Output

This chapter covers the following topics related to storing MapReduce job results in a MarkLogic Server instance:

Basic Steps

The MarkLogic Connector for Hadoop API supports storing MapReduce results in MarkLogic Server as documents, nodes, and properties. When using MarkLogic Server to store the results of the reduce phase, configuring the reduce step of a MapReduce job includes at least the major tasks:

Identifying the Output MarkLogic Server Instance

The MarkLogic Server output instance is identified by setting configuration properties. For general information on setting configuration properties, see Configuring a MapReduce Job.

Specify the following properties to identify the output MarkLogic Server instance:

Property Description
mapreduce.marklogic.output.host Hostname or IP address of the server hosting your output XDBC App Server. The host must be resolvable by the nodes in your Hadoop cluster, so you should usually not use localhost.
mapreduce.marklogic.output.port The port configured for the target XDBC App Server on the input host.
mapreduce.marklogic.output.username Username privileged to update the database attached to the XDBC App Server.
mapreduce.marklogic.output.password Cleartext password for the output.username user.

If you want to use a database other than the one attached to your XDBC App Server, set the following additional property to the name of your database:

mapreduce.marklogic.output.databasename

When you use MarkLogic Server in a cluster, all MarkLogic Server hosts containing a forest in the output database must be accessible through an XDBC server on the same port. The host identified in the configuration properties may be any qualifying host in the cluster. For details, see Deploying the Connector with a MarkLogic Server Cluster.

The target database must be configured for manual directory creation by setting the directory creation database configuration setting to manual. See Database Settings in the Administrator's Guide.

You can configure a job to connect to the App Server through SSL by setting the mapreduce.marklogic.output.usessl property. For details, see Making a Secure Connection to MarkLogic Server with SSL. For an example, see ContentReader.

For more information on the properties, see Output Configuration Properties.

Configuring the Output Key and Value Types

As discussed in Reduce Task, the org.apache.hadoop.mapreduce.OutputFormat subclass configured for the job determines the types of the output keys and values, and how results are stored. Use the org.apache.hadoop.mapreduce.Job API to configure the OutputFormat and the output key and value types for a job.

The Hadoop MapReduce framework includes OutputFormat subclasses for saving results to files in HDFS. The MarkLogic Connector for Hadoop API includes OutputFormat subclasses for storing results as documents, nodes, or properties in a MarkLogic Server database. The output key and value types you configure must match the OutputFormat subclass. For example, PropertyOutputFormat expects (DocumentURI, MarkLogicNode) key-value pairs, so configure the job with DocumentURI as the key type and MarkLogicNode as the value type.

For a summary of the OutputFormat subclasses provided by the MarkLogic Connector for Hadoop, including the expected key and value types, see OutputFormat Subclasses.

The example below configures a job to use NodeOutputFormat, which expects (NodePath, MarkLogicNode) key-value pairs.

import com.marklogic.mapreduce.NodeOutputFormat;
import com.marklogic.mapreduce.NodePath;
import com.marklogic.mapreduce.MarkLogicNode;
import org.apache.hadoop.mapreduce.Job;
...
public class LinkCountInDoc {
    ...
    public static class IntSumReducer
      extends Reducer<Text, IntWritable, NodePath, MarkLogicNode> {...}
    public static void main(String[] args) throws Exception {
        Job job = new Job(conf);
        job.setOutputFormatClass(NodeOutputFormat.class);
        job.setOutputKeyClass(NodePath.class);
        job.setOutputValueClass(MarkLogicNode.class);
        job.setReducerClass(IntSumReducer.class);
        ...
    }
}

Defining the Reduce Function

To create a reducer, define a subclass of org.apache.hadoop.mapreduce.Reducer and override at least the Reducer.reduce method. Your reduce method should generate key-value pairs of the expected type and write them to the method's Context parameter. The MapReduce framework subsequently stores the results written to the Context parameter in the file system or database using the configured OutputFormat subclass.

The output key and value types produced by the reduce method must match the output key and value types expected by the OutputFormat subclass. For example, if the job is configured to use NodeOutputFormat then the reduce method must generate (NodePath, MarkLogicNode) key-value pairs. The following example uses (Text,IntWritable) reduce input pairs and produces (NodePath, MarkLogicNode) output pairs using NodeOutputFormat:

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import com.marklogic.mapreduce.MarkLogicNode;
import com.marklogic.mapreduce.NodePath;


public class LinkCountInDoc {
  ...
  public static class IntSumReducer
    extends Reducer<Text, IntWritable, NodePath, MarkLogicNode> {

    public void reduce(Text key, Iterable<IntWritable> values, 
        Context context) throws IOException, InterruptedException {
      ...
      context.write(output_node_path, output_node);
    }
    ...
  }
}

Notice that the reduce method receives a key and a list of all values produced by the map phase with the same key, as discussed in MapReduce Overview.

For a complete list of the OutputFormat subclasses provided by the connector and how they use the output keys and values to create database content, see OutputFormat Subclasses.

Disabling Speculative Execution

Disable Hadoop MapReduce speculative execution when using MarkLogic Server for output.

Hadoop MapReduce uses speculative execution to prevent jobs from stalling on slow tasks or worker nodes. For example, if most of the tasks for a job complete, but a few tasks are still running, Hadoop can schedule speculative redundant tasks on free worker nodes. If the original task completes before the redundant task, Hadoop cancels the redundant task . If the redundant task completes first, Hadoop cancels the original task.

Speculative execution is not safe when using MarkLogic Server for output because the cancelled tasks do not clean up their state. Uncommitted changes might be left dangling and eventually lead to XDMP-FORESTTIM errors.

Disable speculative execution by setting mapred.map.tasks.speculative.execution and/or mapred.reduce.tasks.speculative.execution to false in your job configuration file or using the org.apache.conf.Configuration API.

The nature of your job determines which property(s) to set. Set mapred.map.tasks.speculative.execution to false when using MarkLogic Server for output during map. Set mapred.reduce.tasks.speculative.execution to false when using MarkLogic Server for output during reduce. Set both properties when using MarkLogic Server for both map and reduce output.

The following examples shows how to set these properties to false:

<property>
  <name>mapred.map.tasks.speculative.execution</name>
  <value>false</value>
</property>
<property>
  <name>mapred.reduce.tasks.speculative.execution</name>
  <value>false</value>
</property>

Example: Storing MapReduce Results as Nodes

This example demonstrates storing the results of a MapReduce job as child elements of documents in the database. The code samples shown here are small slices of the full code. For the complete code, see the LinkCountInDoc example in the com.marklogic.mapreduce.examples package.

The map step in this example creates a key-value pair for each href to a document in the collection. The reduce step sums up the number of references to each document and stores the total as a new <ref-count> element on the referenced document.

The output from the map phase is (Text, IntWritable) pairs, where the text is the title attribute text of an href and the integer is always 1, indicating one reference to the title. The sample content is such that the title in an href matches the leaf name of the containing document URI.

For example, if the sample data includes a document with the URI /space/wikipedia/enwiki/Drama film and three occurrences of the following href:

<a href="http://www.mediawiki.org/xml/export-0.4/" title="Drama film">
  drama film
</a>

Then the map output includes three key-value pairs of the form:

(Drama film, 1)

This results in an input key-value pair for reduce of the form:

(Drama film, (1, 1, 1))

The reduce phase computes the reference count (3, in this case) and attaches the count as a child node of the referenced document:

<wp:page>
  ...
  <ref-count>3</ref-count>
</wp:page>

To produce this result, the job uses NodeOutputFormat as the OutputFormat subclass. NodeOutputFormat expects the reducer to produce (NodePath, MarkLogicNode) output pairs. NodeOutputFormat uses the config property com.marklogic.mapreduce.output.node.optype to determine where to insert the node relative to the path. In this example, nodeopttype is set to INSERT_CHILD, so the ref-count node is inserted as a child of the node in the node path:

<property>
    <name>mapreduce.marklogic.output.node.optype</name>
    <value>INSERT_CHILD</value>
</property>

To avoid the overhead of creating a new MarkLogicNode for every invocation of the reduce method, the sample overrides Reducer.setup to create a skeleton node which has its value replaced by the real reference count in reduce. The following code snippet demonstrates the initialization of the result and element variables used in reduce. For the full code, see the sample code for com.marklogic.mapreduce.examples.LinkCountInDoc.

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import com.marklogic.mapreduce.MarkLogicNode;
import com.marklogic.mapreduce.NodeInputFormat;
import com.marklogic.mapreduce.NodeOutputFormat;
import com.marklogic.mapreduce.NodePath;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.Element;

public static class IntSumReducer
extends Reducer<Text, IntWritable, NodePath, MarkLogicNode> {
  private final static String TEMPLATE = "<ref-count>0</ref-count>";
 
  private Element element;
  private MarkLogicNode result;

  protected void setup(Context context) 
      throws IOException, InterruptedException {
    try {
      DocumentBuilder docBuilder = 
DocumentBuilderFactory.newInstance().newDocumentBuilder();
      InputStream sbis = new StringBufferInputStream(TEMPLATE);
      element = docBuilder.parse(sbis).getDocumentElement();
      result = new MarkLogicNode(element);
    }
    ...
  }
}

The following code snippet shows how the reducer sums up the input values, sets up the ref-count node, and builds the document node path from a known base URI and the href title passed in as key:

public static class IntSumReducer
extends Reducer<Text, IntWritable, NodePath, MarkLogicNode> {

    private final static String ROOT_ELEMENT_NAME = "//wp:page";
    private final static String BASE_URI_PARAM_NAME =
        "/space/wikipedia/enwiki/";
    private NodePath nodePath = new NodePath();

public void reduce(Text key, Iterable<IntWritable> values, 
        Context context
        ) throws IOException, InterruptedException {        
    ...

    // Compute reference count and store it in result node
    int sum = 0;
    for (IntWritable val : values) {
        sum += val.get();
    }
    element.setTextContent(Integer.toString(sum));
    
    // Build node path to referenced document
    StringBuilder buf = new StringBuilder();
    buf.append(BASE_URI).append(key);
    nodePath.setDocumentUri(buf.toString());
    nodePath.setRelativePath(ROOT_ELEMENT_NAME);

    // Store the final results for insertion into the database
    context.write(nodePath, result);
}

The above code, with an input pair of (Drama film, (1,1,1)) produces the following (node path, node) output pair:

(
  /space/wikipedia/enwiki/Drama film//wp:page,   <ref-count>3</ref-count>
)

Since the mapreduce.marklogic.output.node.optype property is INSERT_CHILD, the new <ref-count> node is inserted as a child of the wp:page element addressed by the node path.

With very little code modification, the sample can store the reference count as a property of the referenced document instead of as a child node. To do so, use PropertyOutputFormat, construct the document URI instead of node path, and set the config property com.marklogic.mapreduce.output.propertypopttype. For an example, see LinkCountInProperty.

Creating a Custom Output Query with KeyValueOutputFormat

Use KeyValueOutputFormat to create a custom output query that can manipulate arbitrary key and value data types and perform operations beyond updating documents, nodes, and properties. The topics covered by this section are:

Output Query Requirements

A custom output query is a fully formed XQuery or Server-Side JavaScript module, specified as the value of the configuration property mapreduce.marklogic.output.query. The MarkLogic Connector for Hadoop invokes the query for each output key-value pair produced by the map or reduce function configured to use KeyValueOutputFormat. Your query may perform any operations. Output returned by the query is discarded.

The key and value types are determined by the configuration properties mapreduce.marklogic.output.keytype and mapreduce.marklogic.output.valuetype. The key and value XQuery types are constrained to those XML schema types with a meaningful org.apache.hadoop.io.Writable type transformation, such as between xs:string and org.apache.hadoop.io.Text or between element() and com.marklogic.mapreduce.MarkLogicNode. For details, see Supported Type Transformations.

Configure your job to use KeyValueOutputFormat and key and value Java types corresponding to the XQuery types expected by your output query. For details, see Job Configuration.

Implementing an XQuery Output Query

The output key-value pair is available to your output query through external variables with the names key and value, in the namespace http://marklogic.com/hadoop.

The prolog of your output query must include:

  1. A namespace declaration for the namespace containing the key and value variables (http://marklogic.com/hadoop). You may use any namespace prefix.
  2. An external variable declaration for key in the namespace from Step 1. Choose one of the types listed in Supported Type Transformations.
  3. An external variable declaration for value in the namespace from Step 1. Choose one of the types listed in Supported Type Transformations.

For example, the following output query prolog assumes a key type of xs:string and a value type of element():

<property>
  <name>mapreduce.marklogic.output.query</name>
  <value><![CDATA[
    xquery version '1.0-ml';
    declare namespace mlmr = "http://marklogic.com/hadoop";
    declare variable $mlmr:key as xs:string external;
    declare variable $mlmr:value as element() external;
    (: use the key and value... :)
  ]]></value>
</property>

Implementing an JavaScript Output Query

When you use a JavaScript output query, you must also set the property mapreduce.marklogic.output.queryLanguage to javascript as the default query language is XQuery.

In a JavaScript output query, the key and value values are available to your query in global variables named key and value, respectively. For example:

<property>
  <name>mapreduce.marklogic.output.query</name>
  <value><![CDATA[
    var key;
    var value;
    (: use the key and value... :)
  ]]></value>
</property>

Job Configuration

Configure the following job properties to use a custom output query:

  • Set mapreduce.marklogic.output.query to your output query. This must be a fully formed XQuery or Server-Side JavaScript module, suitable for evaluation with xdmp:eval (or xdmp.eval). For details, see Implementing an XQuery Output Query or Implementing an JavaScript Output Query.
  • Set mapreduce.marklogic.output.queryLanguage to javascript if your output query is implemented in Server-Side JavaScript.
  • Set mapreduce.marklogic.output.keytype to the type of the key. The default type is xs:string.
  • Set mapreduce.marklogic.output.valuetype to the type of the value. The default type is xs:string.
  • Set the map or reduce output format to KeyValueOutputFormat.
  • Set the map or reduce output key Java type to an org.apache.hadoop.io.Writable subclass that is convertible to the XQuery type in mapreduce.marklogic.output.keytype.
  • Set the map or reduce output value Java type to an org.apache.hadoop.io.Writable that is subclass convertible to the XQuery type mapreduce.marklogic.output.valuetype.

For details on the available key and value types and supported conversions between the XQuery types and the Hadoop Java classes, see Supported Type Transformations.

The configuration properties may be set either in a configuration file or using the org.apache.hadoop.mapreduce.Job API. The following example configures the MarkLogic Connector for Hadoop output query, XQuery key type, and XQuery value type in a configuration file:

<property>
  <name>mapreduce.marklogic.output.keytype</name>
  <value>xs:string</value>
</property>
<property>
  <name>mapreduce.marklogic.output.valuetype</name>
  <value>element()</value>
</property>
<property>
  <name>mapreduce.marklogic.output.query</name>
  <value><![CDATA[
    xquery version '1.0-ml';
    declare namespace mlmr = "http://marklogic.com/hadoop";
    declare variable $mlmr:key as xs:string external;
    declare variable $mlmr:value as element() external;
    (: use the key and value... :)
  ]]></value>
</property>

The following example configures the job output format class, key class, and value class corresponding to the custom output query settings above:

import org.apache.hadoop.Job;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import com.marklogic.mapreduce.MarkLogicNode;

class myClass {
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf);
    job.setOutputFormatClass(KeyValueOutputFormat);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(MarkLogicNode.class);
    ...
  }
}

For a complete example, see RevisionGrouper and the sample code for com.marklogic.mapreduce.examples.RevisionGrouper.

Supported Type Transformations

When you use KeyValueOutputFormat, the MarkLogic Connector for Hadoop converts the key and value produced by the job map or reduce function from an org.apache.hadoop.io.Writable object to values of the configured XQuery types. The table below summarizes the supported type conversions.

When using org.apache.hadoop.io.Text with any XQuery type other than xs:string, the string value format must correspond to the serialized representation of the XQuery type. For example, if the XQuery type is cts:box, then the Text string representation must be a serialized cts:box value, such as [-122, 78, 30, 45]. Similarly, if the XQuery type is xs:duration, then the Text string representation must be a serialized xs:duration, such as -P92M.

If the key or value XQuery type is Then the Java type must be
xs:string
org.apache.hadoop.io.Text
xs:boolean
org.apache.hadoop.io.BooleanWritable
org.apache.hadoop.io.Text
xs:integer
org.apache.hadoop.io.IntWritable
org.apache.hadoop.io.VIntWritable
org.apache.hadoop.io.LongWritable
org.apache.hadoop.io.VLongWritable
org.apache.hadoop.io.Text
xs:decimal
org.apache.hadoop.io.IntWritable
org.apache.hadoop.io.VIntWritable
org.apache.hadoop.io.LongWritable
org.apache.hadoop.io.VLongWritable
org.apache.hadoop.io.FloatWritable
org.apache.hadoop.io.DoubleWritable
org.apache.hadoop.io.Text
xs:float
xs:double
org.apache.hadoop.io.IntWritable
org.apache.hadoop.io.VIntWritable
org.apache.hadoop.io.LongWritable
org.apache.hadoop.io.VLongWritable
org.apache.hadoop.io.FloatWritable
org.apache.hadoop.io.DoubleWritable
org.apache.hadoop.io.Text
xs:duration
xs:dayTimeDuration
xs:yearMonthDuration
xs:dateTime
xs:time
xs:date
org.apache.hadoop.io.Text
xs:hexBinary
org.apache.hadoop.io.BytesWritable
org.apache.hadoop.io.Text
xs:base64Binary
org.apache.hadoop.io.Text
xs:gDay
xs:gMonth
xs:gYear
xs:gYearMonth
org.apache.hadoop.io.Text
cts:box
cts:circle
cts:point
cts:polygon
org.apache.hadoop.io.Text
node()
element()
binary()
com.marklogic.mapreduce.MarkLogicNode
org.apache.hadoop.io.Text

Controlling Transaction Boundaries

When you use one of the OutputFormat subclasses provided by the MarkLogic Connector for Hadoop, such as ContentOutputFormat, the MarkLogic Connector for Hadoop manages the output MarkLogic Server XDBC session and the requests for storing results in the database.

Each output key-value pair represents an update to the database. Multiple updates are grouped into a single transaction. For all OutputFormat subclasses except ContentOutputFormat:

  • Each output key-value pair generates an update request to MarkLogic Server.
  • Every 1000 requests constitutes a transaction.

Use the mapreduce.marklogic.output.transactionsize configuration property to adjust the number of requests per transaction. For example, if you set the transaction size to 5, then the connector commits a transaction for every 5 requests. For OutputFormat classes other than ContentOutputFormat, this means the connector bundles every 5 updates into a transaction.

<property>
  <name>mapreduce.marklogic.output.transactionsize</name>
  <value>5</value>
</property>

For ContentOutputFormat, the interactions between the number of updates, requests, and transactions is more complicated. For details, see Time vs. Space: Configuring Batch and Transaction Size.

Streaming Content Into the Database

When you use ContentOutputFormat, you can stream text, XML, and binary documents to MarkLogic Server.

Using streaming can significantly decrease the memory requirements on the task nodes initiating document insertion, but it can also significantly decrease overall throughput and changes the behavior of your job in the face of errors; for details, see Reducing Memory Consumption With Streaming.

To use the stream feature, configure your job as you normally would to insert content into the databae, but also do the following:

  • Set the map or reduce output format to ContentOutputFormat.
    job.setOutputFormatClass(ContentOutputFormat.class);
  • Set the map or reduce output key Java type to com.marklogic.mapreduce.DocumentURI.
    job.setOutputKeyClass(DocumentURI.class);
  • Set the map or reduce output value Java type to com.marklogic.mapreduce.StreamLocator.
    job.setOutputValueClass(StreamLocator.class);
  • In your map or reduce function, set the value in each key-value pair to a StreamLocator object associated with the content you want to stream.
    context.write(yourDocURI, 
                  new StreamLocator(yourPath, CompressionCodec.NONE);

Performance Considerations for ContentOutputFormat

When using ContentOutputFormat, be aware of the following performance tradeoffs discussed in this section:

Time vs. Space: Configuring Batch and Transaction Size

When using ContentOutputFormat, you can tune the insertion throughput and memory requirements of your job by configuring the batch size and transaction size of the job:

  • mapreduce.marklogic.output.batchsize controls the number of output records (updates) per request to the server.
  • mapreduce.marklogic.output.transactionsize controls the number of requests to the server per transaction.

Selecting a batch size is a speed vs. memory tradeoff. Each request to the server introduces overhead because extra work must be done. However, unless you use streaming, all the updates in a batch stay in memory until a request is sent, so larger batches consume more more memory.

Transactions introduce overhead on MarkLogic Server, so performing multiple updates per transaction can improve insertion throughput. However, an open transaction holds locks on fragments with pending updates, potentially increasing lock contention and affecting overall application performance.

The default batch size is 100. If you do not explicitly set the transaction size, it varies with batch size, adjusting to keep the maxiumum number of updates per transaction at 2000.

Consider the following example of inserting 10000 documents. Assume batch size is controlled by setting mapreduce.marklogic.output.batchsize and transaction size not explicitly set. The last row represents the default behavior for ContentOutputFormat.

batch size total requests txn size total txns updates/txn
1 10000 2000 5 2000
10 1000 200 5 2000
100 100 20 5 2000

If you explicitly set mapreduce.marklogic.output.transactionsize, then transaction size does not vary based on batch size. Given the example above of inserting 10000 documents, if you explicitly set batch size to 100 and transaction size to 50, then the job requires 2 transactions and each transaction performs 5000 updates.

Batch size is not configurable for other output formats, such as NodeOutputFormat, PropertyOutputFormat, and KeyValueFormat. For these classes, batch size is always 1 and the default transaction size is 1000. See Controlling Transaction Boundaries.

Time vs. Correctness: Using Direct Forest Updates

ContentOutputFormat performs best when the updating tasks interact directly with the forests in which content is inserted. However, direct forest updates can create duplicate document URIs under the following circumstances:

  • Content with the same URI already exists in the database, and
  • The content was inserted using forest placement or the number of forests changed after initial document creation.

Forest placement occurs when you use the $forest-ids parameter of xdmp:document-insert to instruct MarkLogic Server to insert a document in a specific forest or to choose from a specific set of forests. See Specifying a Forest in Which to Load a Document in the Loading Content Into MarkLogic Server Guide.

To prevent duplicate URIs, the MarkLogic Connector for Hadoop defaults to a slower protocol for ContentOutputFormat when it detects the potential for updates to existing content. In this case, MarkLogic Server manages the forest selection, rather than the MarkLogic Connector for Hadoop. This behavior guarantees unique URIs at the cost of performance.

You may override this behavior and use direct forest updates by doing the following:

  • Set mapreduce.marklogic.output.content.directory. This guarantees all inserts will be new documents. If the output directory already exists, it will either be removed or cause an error, depending on the value of mapreduce.marklogic.output.content.cleandir.
  • Set mapreduce.marklogic.output.content.fastload to true. When fastload is true, the MarkLogic Connector for Hadoop always optimizes for performance, even if duplicate URIs are possible.

You can safely set mapreduce.marklogic.output.content.fastload to true if the number of forests in the database will not change while the job runs, and at least one of the following is true:

  • Your job only creates new documents. That is, you are certain that the URIs are not in use by any document or property fragments already in the database.
  • The URIs output with ContentOutputFormat may already be in use, but both these conditions are true:
    • The in-use URIs were not originally inserted using forest placement.
    • The number of forests in the database has not changed since initial insertion.
  • You set mapreduce.marklogic.output.content.directory.

Reducing Memory Consumption With Streaming

The streaming protocol allows you to insert a large document into the database without holding the entire document in memory. Streaming uploads documents to MarkLogic Server in 128k chunks.

Streaming content into the database usually requires less memory on the task node, but ingestion can be slower because it introduces additional network overhead. Streaming also does not take advantage of the connector's builtin retry mechanism. If an error occurs that is normally retryable, the job will fail.

Streaming is only available with ContentOutputFormat.

To enable streaming, set the property mapreduce.marklogic.output.content.streaming to true and use a StreamLocator. For details, see Streaming Content Into the Database.

Output Configuration Properties

The table below summarizes connector configuration properties for using MarkLogic Server as an output destination. For details, see com.marklogic.mapreduce.MarkLogicConstants in the MarkLogic Hadoop MapReduce Connector API.

Property Description
mapreduce.marklogic.output.username
Username for a user privileged to write to the database attached to your XDBC App Server.
mapreduce.marklogic.output.password
Cleartext password for the output.username user.
mapreduce.marklogic.output.host
Hostname of the server hosting your output XDBC App Server.
mapreduce.marklogic.output.port
The port configured for the XDBC App Server on the output host.
mapreduce.marklogic.output.usessl
Whether or not to use an SSL connection to the server. Default: false.
mapreduce.marklogic.output.ssloptionsclass
The name of a class implementing com.marklogic.mapreduce.SslConfigOptions. Used to configure the SSL connection when output.usessl is true.
mapreduce.marklogic.output.node.namespace
A comma-separated list of namespace name-URI pairs to use when evaluating element names in the node path with NodeOutputFormat.
mapreduce.marklogic.output.batchsize
The number of output key-value pairs (updates) to send to MarkLogic Server in a single request. Only honored by ContentOutputFormat. Default: 100. Other output formats have an implicit, unconfigurable batch size of 1.
mapreduce.marklogic.output.content.cleandir
Whether or not to remove the database directory specified by output.content.directory before storing the reduce phase results. If false, an error occurs if the directory already exists when the job runs. Default: false.
mapreduce.marklogic.output.content.collection
A comma separated list of collections to which output documents are added when using ContentOutputFormat.
mapreduce.marklogic.output.content.directory
The database directory in which to create output documents when using ContentOutputFormat. If content.cleandir is false (the default), then the directory must not already exist. If content.cleandir is true and the directory already exists, it is deleted as part of job submission.
mapreduce.marklogic.output.content.encoding
The encoding to use when reading content into the database. Content is translated from this encoding to UTF-8. For details, see Character Encoding in the Search Developer's Guide. Default: UTF-8.
mapreduce.marklogic.output.content.fastload
Whether or not to force optimal performance when using ContentOutputFormat. Setting this property to true can result in duplicate URIs. Default: false. See Performance Considerations for ContentOutputFormat.
mapreduce.marklogic.output.content.language
For XML content, the language to specify in the xml:lang attribute on the roote element node if the attribute does not already exist. If not set, no xml:lang is added to the root node, and the language configured for the database is assumed.
mapreduce.marklogic.output.content.namespace
For XML content, specifies a namespace URI to use if there is no namespace at root node of the document. Default: No namespace.
mapreduce.marklogic.output.content.permission
A comma separated list of role-capability pairs to associate with output documents when using ContentOutputFormat.
mapreduce.marklogic.output.content.quality
The document quality to use when creating output documents with ContentOutputFormat. Default: 0.
mapreduce.marklogic.output.content.repairlevel
The level of repair to perform on XML content inserted into the database. Set to either none or full. Default: Behavior depends on the default XQuery version configured for the App Server; none for XQuery 1.0 or 1.0-ml, full for XQuery 0.9-ml.
mapreduce.marklogic.output.content.streaming
Whether or not to use streaming to insert content. Default: false.
mapreduce.marklogic.output.content.type

When using ContentOutputFormat, specifies the content type of output documents. Set to one of XML, TEXT, BINARY, MIXED, or UNKNOWN. Default: XML.

UNKNOWN uses the value type of the first value seen in each split to determine the content type.

MIXED uses the Document URI suffix and MarkLogic Server MIME type mappings to determine the content type for each document.

mapreduce.marklogic.output.content.tolerateerrors
When this option is true and batch size is greater than 1, if an error occurs for one or more documents being inserted into the database, only the erroneous documents are skipped; all other documents are inserted. When this option is false or batch size is great than 1, errors during insertion can cause all the inserts in the current batch to be rolled back. Default: false.
mapreduce.marklogic.output.keytype
The XQuery type of the output key available to the query defined by mapreduce.marklogic.output.query. Default: xs:string.
mapreduce.marklogic.output.node.optype
When using NodeOutputFormat, the node operation to perform. Set to one of: REPLACE, INSERT_BEFORE, INSERT_AFTER, INSERT_CHILD.
mapreduce.marklogic.output.property.optype
When using PropertyOutputFormat, the property operation to perform. Set to one of SET_PROPERTY or ADD_PROPERTY.
mapreduce.marklogic.output.property.alwayscreate
When using PropertyOutputFormat, whether or not to create a property even if no document exists with the output document URI. Default: false.
mapreduce.marklogic.output.query
A custom output query to be used by KeyValueOutputFormat. See Creating a Custom Output Query with KeyValueOutputFormat.
mapreduce.marklogic.output.queryLanguage
The implementation language of mapreduce.marklogic.output.query. Allowed values: xquery, javascript. Default: xquery.
mapreduce.marklogic.output.transactionsize
The number of requests to MarkLogic Server per transaction. Default: For ContentOutputFormat, this varies with mapreduce.marklogic.output.batchsize to maintain 2000 updates/transaction; for other output formats,1000.
mapreduce.marklogic.output.valuetype
The XQuery type of the output value available to the query defined by mapreduce.marklogic.output.query. Default: xs:string.

OutputFormat Subclasses

The MarkLogic Connector for Hadoop API provides subclasses of OutputFormat for defining your reduce output key-value pairs and storing the results in a MarkLogic Server database. Specify the OutputFormat subclass appropriate for your job using the org.apache.hadoop.mapreduce.job.setOutputFormatClass function. For example:

import com.marklogic.mapreduce.NodeInputFormat;
import org.apache.hadoop.mapreduce.Job;
...
public class LinkCountInDoc {
    ...
    public static void main(String[] args) throws Exception {
        Job job = new Job(conf);
        job.setOutputFormatClass(NodeOutputFormat.class);
        ...
    }
}

The following table summarizes the OutputFormat subclasses provided by the MarkLogic Connector for Hadoop and the key and value types produced by each class. All classes referenced below are in the package com.marklogic.mapreduce. All referenced properties are covered in Output Configuration Properties. For more details on these classes and properties, see the MarkLogic Hadoop MapReduce Connector API.

Class Key Type Value Type Description
MarkLogicOutputFormat any any Superclass for all connector-specific OutputFormat classes. Stores output in a MarkLogic Server database.
ContentOutputFormat DocumentURI any (text, XML, JSON, binary)

Stores output in a MarkLogic Server database, using the key as the document URI and the value as the content. The content type is determined by the mapreduce.marklogic.output.content.type config property. Related configuration properties:

  • content.type
  • content.directory
  • content.collection
  • content.permission
  • content.quality

If mapreduce.marklogic.output.content.type is UKNOWN, the value type must be an instance of Text, MarkLogicNode, BytesWritable, or MarkLogicDocument.

NodeOutputFormat NodePath MarkLogicNode Stores output in a MarkLogic Server database, using the key as the node path and the value as the node to insert. The mapreduce.marklogic.output.nodeopttype config property controls where the node is inserted relative to the node path in the key.
PropertyOutputFormat DocumentURI MarkLogicNode Stores output in a MarkLogic Server database by inserting the value node as a property of the document in the key URI. The mapreduce.marklogic.output.property.optype config property controls how the property is set.
KeyValueOutputFormat any any Run the query defined by mapreduce.marklogic.output.query with each output key-value pair. The result is determined by the output query. The key and value types are determined by mapreduce.marklogic.output.keytype and mapreduce.marklogic.output.valuetype. See Creating a Custom Output Query with KeyValueOutputFormat.

« Previous chapter
Next chapter »
Powered by MarkLogic Server | Terms of Use | Privacy Policy