Loading TOC...
MarkLogic Connector for Hadoop Developer's Guide (PDF)

MarkLogic 9 Product Documentation
MarkLogic Connector for Hadoop Developer's Guide
— Chapter 4

Using MarkLogic Server for Input

When using MarkLogic Server as an input source, the input key-value pairs passed to the map function of a map task are constructed by transforming database fragments or lexicon data in the input split into key-value pairs using the configured InputFormat subclass and input properties. This section covers the following topics:

Basic Steps

To configure the map phase of a MapReduce job to use MarkLogic Server for input, perform the following steps:

Identifying the Input MarkLogic Server Instance

The MarkLogic Server input instance is identified by setting job configuration properties. For general information on setting configuration properties, see Configuring a MapReduce Job.

Set the following properties to identify the input MarkLogic Server instance:

Property Description
mapreduce.marklogic.input.host Hostname or IP address of the server hosting your input XDBC App Server. The host must be resolvable by the nodes in your Hadoop cluster, so you should usually not use localhost.
mapreduce.marklogic.input.port The port configured for the target XDBC App Server on the input host.
mapreduce.marklogic.input.username Username privileged to read from the database attached to your XDBC App Server.
mapreduce.marklogic.input.password Cleartext password for the input.username user.

If you want to use a database other than the one attached to your XDBC App Server, set the following additional property to the name of your database:

mapreduce.marklogic.input.databasename

When you use MarkLogic Server in a cluster, all MarkLogic Server hosts containing a forest in the input database must be accessible through an XDBC server on the same port. The host identified in the configuration properties may be any qualifying host in the cluster. For details, see Deploying the Connector with a MarkLogic Server Cluster.

You can configure a job to connect to the App Server through SSL by setting the mapreduce.marklogic.input.usessl property. For details, see Making a Secure Connection to MarkLogic Server with SSL. For an example, see ContentReader.

For more information on the properties, see Input Configuration Properties.

Specifying the Input Mode

The MarkLogic Connector for Hadoop input mode determines how much responsibility your job has for creating input splits and input key-value pairs. The MarkLogic Connector for Hadoop supports basic and advanced input modes. Basic mode is the default. Set the input mode using the mapreduce.marklogic.input.mode configuration property.

When MarkLogic Server is the input source, each map task runs an input query against the task input split to select the fragments or records from which to create map input key-value pairs. An input split query divides the input content into input splits by forest, and an input query selects content within the split. For a general discussion of input splits in MapReduce, see How Hadoop Partitions Map Input Data.

In basic mode, the MarkLogic Connector for Hadoop uses a built-in input split query and builds the input query based on data selection properties defined by your job. This enables the connector to optimize the interaction between the Hadoop MapReduce framework and MarkLogic Server. If your input selection needs can be met by the basic mode query construction properties, you should use basic mode as it offers the best performance. For details, see Basic Input Mode.

Basic mode supports selecting input data from documents or a lexicon. You can only use one of these methods in a job. If configuration properties are set for both methods, the connector uses a lexicon for input. If none of the input selection properties are set, the connector uses the default document selectors.

In advanced mode, your application provides the input split query and input query. Using advanced mode gives you complete control over map input key-value pair creation, but adds complexity. For details, see Advanced Input Mode.

Specifying the Input Key and Value Types

As discussed in How Hadoop Partitions Map Input Data, the org.apache.hadoop.mapreduce.InputFormat subclass configured for the job determines the types of the input keys and values and how the content selected by the input query is transformed into key-value pairs.

To specify the InputFormat subclass programmatically, use the org.apache.hadoop.mapreduce.Job API. The following example configures the job to use NodeInputFormat, which creates (NodePath, MarkLogicNode) input key-value pairs:

import com.marklogic.mapreduce.NodeInputFormat;
import org.apache.hadoop.mapreduce.Job;
...
public class LinkCountInDoc {
    ...
    public static void main(String[] args) throws Exception {
        Job job = new Job(conf);
        job.setInputFormatClass(NodeInputFormat.class);
        ...
    }
}

The MarkLogic Connector for Hadoop API includes several InputFormat subclasses for MarkLogic Server data types, such as (document URI, node) key-value pairs. For details, see InputFormat Subclasses.

You can also use Hadoop MapReduce key-value types through the KeyValueInputFormat and ValueInputFormat subclasses. These classes define type conversions between MarkLogic Server types and standard Hadoop MapReduce types; see Using KeyValueInputFormat and ValueInputFormat.

Defining the Map Function

A MapReduce application must include a subclass of org.apache.hadoop.mapreduce.Mapper and implement a Mapper.map() method. The map method transforms the map input key-value pairs into output key-value pairs that can be used as input for the reduce step.

The application-specific Mapper subclass and the signature of the Mapper.map method must match the configured map phase input and output key-value pair types. For example, if the map phase uses NodeInputFormat for input and produces (Text, IntWritable) output key-value pairs, then the Mapper subclass should be similar to the following because NodeInputFormat creates (NodePath, MarkLogicNode) input key-value pairs:

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import com.marklogic.mapreduce.MarkLogicNode;
import com.marklogic.mapreduce.NodePath;


public class LinkCountInDoc {
  ...
  public static class RefMapper 
    extends Mapper<NodePath, MarkLogicNode, Text, IntWritable> {
    public void map(NodePath key, MarkLogicNode value, Context context)    {
      ...derive output key(s) and value(s)...
      context.write(output_key, output_value)
    }
  }

The Mapper.map() method constructs result key-value pairs corresponding to the expected output type, and then writes pairs to the org.apache.hadoop.mapreduce.Context parameter for subsequent handling by the MapReduce framework. In the example above, the map output key-value pairs must be (Text, IntWritable) pairs.

Configure the Mapper into the job using the org.apache.hadoop.mapreduce.Job API. For example:

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    ...
    Job job = new Job(conf);
    job.setMapperClass(RefMapper.class);
    ...
}

For a list of InputFormat subclasses provided by the MarkLogic Connector for Hadoop API and the associated key and value types, see InputFormat Subclasses.

For more details, see Example: Counting Href Links and the sample code provided in the connector package.

Basic Input Mode

The MarkLogic Connector for Hadoop supports basic and advanced input modes through the mapreduce.marklogic.input.mode configuration property. The default mode, basic, offers the best performance. In basic input mode, the connector handles all aspects of split creation. The job configuration properties control which fragments in a split to transform into input key-value pairs.

This section covers the following topics:

For details on advanced input mode, see Advanced Input Mode.

Creating Input Splits

Basic mode does not give you control over split creation, other than setting the maximum split size using the mapreduce.marklogic.input.maxsplitsize property. Basic mode does give the job control over the content passed to the map function, as described in Specifying the Input Mode.

When using document fragments for input data, the connector divides the fragments in the database into input splits by forest, such that each split contains at most the number of fragments specified by mapreduce.marklogic.input.maxsplitsize.

Multiple splits might be required to cover a single forest. A split never spans more than one forest. When the maxium split size is smaller than the total number of fragments in the forest, the MarkLogic Connector for Hadoop can adjust the split size downwards so that the size is balanced across a given forest.

For example, consider a database containing 2 forests, forest1 and forest2. Forest1 contains 15 documents. Forest2 contains 18 documents. If input.maxsplitsize is 10, then the connector creates 2 splits from each forest, with each split covering roughly half the documents in each forest:

A similar distribution of data occurs when using a lexicon for input. Splits are constructed by querying the lexicon in each forest and assigning at most input.maxsplitsize entries to each split.

Use advanced mode if you require control over split creation. See Advanced Input Mode.

Using a Lexicon to Generate Key-Value Pairs

This section describes how to use a MarkLogic Server lexicon to create map input key-value pairs. Using a lexicon precludes using an XPath document selector for input. For general information about lexicons, see Browsing With Lexicons in the Search Developer's Guide.

To use a lexicon function:

  1. Implement a Lexicon Function Wrapper Subclass corresponding to the XQuery lexicon function you wish to use.
  2. Override Lexicon Function Parameter Wrapper Methods to specify the parameter values for the lexicon function call.
  3. Choose an InputFormat subclass.
  4. Configure the Job to use the lexicon by setting the configuration property mapreduce.marklogic.input.lexiconfunctionclass.
  5. Additional De-duplication of Results Might Be Required if your application requires uniqueness in the map output values.
Implement a Lexicon Function Wrapper Subclass

The MarkLogic Connector for Hadoop API package com.marklogic.mapreduce.functions includes a lexicon function abstract class corresponding to each supported XQuery or JavaScript API lexicon function. Range and geospatial lexicons are not supported at this time. For a mapping between lexicon wrapper classes and the lexicon functions, see Lexicon Function Subclasses.

The subclass you create encapsulates a call to a lexicon function. The lexicon function is implicit in the parent class, and the function call parameters are implemented by the methods. For example, to use cts:element-attribute-value-co-occurrences, implement a subclass of com.marklogic.mapreduce.functions.ElemAttrValueCooccurrences:

import com.marklogic.mapreduce.functions.ElemAttrValueCooccurrences;

public class LinkCountCooccurrences {
    static class HrefTitleMap extends ElemAttrValueCooccurrences {...}
}

In your subclass, override the inherited methods for the required parameters and for the optional parameters to be included in the call, as described in Override Lexicon Function Parameter Wrapper Methods.

Override Lexicon Function Parameter Wrapper Methods

Each lexicon function wrapper class includes methods for defining the supported function parameter values, as strings. The connector uses this information to construct a call to the wrapped lexicon function in the input query.

The parameter related methods vary by the function interface. For details on a particular wrapper class, see the MarkLogic Connector for Hadoop javadoc.

You may not specify values corresponding to the $quality-weight or $forest-ids parameters of any lexicon function. Quality weight is not used in a MapReduce context, and the connector manages forest constraints for you in basic mode.

If you include options settings in the lexicon call, do not set the skip or truncate options. The MarkLogic Connector for Hadoop reserves the skip and truncate lexicon function options for internal use.

The frequency-order option supported by some lexicon calls is not honored in MapReduce results.

For example, the XQuery lexicon function cts:element-values is exposed through the com.marklogic.mapreduce.function.ElementValues class. The prototype for cts:element-values is:

cts:element-values(
	  $element-names as xs:QName*,
	  [$start as xs:anyAtomicType?],
	  [$options as xs:string*],
	  [$query as cts:query?],
	  [$quality-weight as xs:double?],
	  [$forest-ids as xs:unsignedLong*]
) 

The com.marklogic.mapreduce.function.ElementValues wrapper class includes abstract methods corresponding to the $element-names, $start, $options, and $query parameters:

package com.marklogic.mapreduce.functions;

public abstract class LexiconFunction {
    
    public String getLexiconQuery() {...}
    public String[] getUserDefinedOptions() {...}
}

public abstract class ValuesOrWordsFunction extends LexiconFunction {
    public String getStart() {...}
}

public abstract class ElementValues extends ValuesOrWordsFunction {
    public abstract String[] getElementNames();
}

The parameter method overrides must return the string representation of an XQuery expression that evaluates to the expected parameter type. For example, if your job needs to make a lexicon function call equivalent to the following, then it must override the methods related to the $element-names and $options parameters to cts:element-values:

cts:element-values(xs:QName("wp:a"), (), "ascending")

The default start value from the inherited ValuesOrWordsFunction.getStart method already matches the desired parameter value. Override ElementValue.getElementNames and LeixconFunction.getUserDefinedOptions to specify the other parameter values. The resulting subclass is:

import com.marklogic.mapreduce.functions.ElementValues;
...
public class myLexiconFunction extends ElementValues {
    public String[] getElementNames() {
        String[] elementNames = {"xs:QName(\"wp:a\")"};
        return elementNames;
    }

    public String[] getUserDefinedOptions() {
        String[] options = {"ascending"};
        return options;
    }
}

If you override ValuesOrWordsFunction.getStart to specify a start value of type xs:string, you must include escaped double-quotes in the returned string. For example, to specify a start value of "aardvark", the getStart override must return "\"aardvark\"".

For a complete example, see com.marklogic.mapreduce.examples.LinkCountCooccurrences, included with the MarkLogic Connector for Hadoop.

Choose an InputFormat

Lexicon functions return data from a lexicon, rather than document content, so the document oriented InputFormat classes such as DocumentInputFormat and NodeInputFormat are not applicable with lexicon input. Instead, use com.marklogic.mapreduce.ValueInputFormat or com.marklogic.mapreduce.KeyValueInputFormat.

KeyValueInputFormat is only usable with co-occurrences lexicon functions. ValueInputFormat may be used with any lexicon function.

When using ValueInputFormat with a lexicon function, choose an input value type that either matches the type returned by the lexicon function or can be converted from the lexicon return type to the input value type. For details on KeyValueFormat and ValueInputFormat and the supported type conversions, see Using KeyValueInputFormat and ValueInputFormat.

For example, cts:element-values returns xs:anyAtomicType*. If the element range index queried by your ElementValues subclass is an index with scalar type int, then you might configure the job to use IntWritable as the map input value type.

The co-occurrences lexicon function classes generate key-value pairs corresponding to the cts:value elements in each cts:co-occurrence returned by the underlying lexicon function. The key is the first cts:value in each cts:co-occurrence, and the value is the second. For example, if cts:element-values returns the following XML:

<cts:co-occurrence xmlns:cts="http://marklogic.com/cts" 
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
      xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <cts:value xsi:type="xs:string">MARCELLUS</cts:value>
    <cts:value xsi:type="xs:string">BERNARDO</cts:value>
  </cts:co-occurrence>
  <cts:co-occurrence xmlns:cts="http://marklogic.com/cts"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <cts:value xsi:type="xs:string">ROSENCRANTZ</cts:value>
    <cts:value xsi:type="xs:string">GUILDENSTERN</cts:value>
  </cts:co-occurrence>
  <cts:co-occurrence xmlns:cts="http://marklogic.com/cts"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <cts:value xsi:type="xs:string">HORATIO</cts:value>
    <cts:value xsi:type="xs:string">MARCELLUS</cts:value>
  </cts:co-occurrence>

Then the generated map input key-value pairs are:

(MARCELLUS, BERNARDO)
(ROSENCRANTZ, GUILDENSTERN)
(HORATIO, MARCELLUS)

As with ValueInputFormat, choose key and value types corresponding to the lexicon type or convertible from the lexicon type. For a complete example, see com.marklogic.mapreduce.examples.LinkCountCooccurrences.

Configure the Job

Set the mapreduce.marklogic.input.lexiconfunctionclass job configuration property to specify which lexicon call to use. This property setting takes precedence over mapreduce.marklogic.input.documentselector and mapreduce.marklogic.input.subdocumentexpr.

You can set mapreduce.marklogic.input.lexiconfunctionclass either in a configuration file or programmatically. To set the property in a configuration file, set the value to your lexicon subclass name with .class appended to it. For example:

<property>
  <name>mapreduce.marklogic.input.lexiconfunctionclass</name>
  <value>my.package.LexiconFunction.class</value>
</property>

To set the property programatically, use the org.apache.hadoop.conf.Configuration API. For example:

import org.apache.hadoop.conf.Configuration;
import com.marklogic.mapreduce.functions.ElemAttrValueCooccurrences;

public class LinkCountCooccurrences {
    static class HrefTitleMap extends ElemAttrValueCooccurrences {...}

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        ...

        conf.setClass(MarkLogicConstants.INPUT_LEXICON_FUNCTION_CLASS,
            HrefTitleMap.class, ElemAttrValueCooccurrences.class);
        ...
    }
}
De-duplication of Results Might Be Required

Your job might need to de-duplicate map results if the analysis performed by your job depends upon unique values in the map output key-value pairs. Only a URI lexicon is guaranteed to have unique values across all forests.

MarkLogic Server maintains lexicon data per forest. Each forest includes a lexicon for the fragments in that forest. Normally, when you directly call an XQuery lexicon function, MarkLogic Server gathers results from each forest level lexicon and de-duplicates them. As discussed in Creating Input Splits, the MarkLogic Connector for Hadoop runs input queries directly against each forest. The in-forest evaluation circumvents the de-duplication of results from lexicon functions.

For example, consider a database with 2 forests, each containing one document. If the database is configured with an element word lexicon for the <data> element, then the table below reflects the contents of the word lexicon in each forest:

forest document content words in per-forest lexicon
sample-1 <data>   hello world </data> hello world
sample-2 <data>   goodbye world </data> goodbye world

Calling cts:words (the lexicon function underlying com.marklogic.mapreduce.functions.Words) against the database returns (goodbye, hello, world). Therefore, if you use the corresponding lexicon function class in a MapReduce job, you might expect 3 map output key-value pairs from this input data.

However, the in-forest evaluation of cts:words used by the MarkLogic Connector for Hadoop results in the two word lists (hello, world) and (goodbye, world), generating 4 map output key-value pairs.If the purpose of the job is to count the number of unique words in the content, simply counting the number of reduce input key-value pairs results in an incorrect answer of 4.

If you require this kind of uniqueness, you must de-duplicate the results in your application. For example, your Reducer might use a hash map to detect and discard duplicate values.

Using XPath to Generate Key-Value Pairs

This section describes how to use XPath components to generate map input key-value pairs from document fragments. Using an XPath document selector precludes using a lexicon for input.

Use the following configuration properties to select the input fragments in a split based on an XPath expression:

  • mapreduce.marklogic.input.documentselector
  • mapreduce.marklogic.input.subdocumentexpr

The MarkLogic Connector for Hadoop constructs an input query that includes a path expression equivalent to concatenating the document selector and sub-document expression. For example, if the document selector is fn:collection() and the sub-document expression is /*:data, then the constructed input query uses an input XPath expression equivalent to fn:collection/*:data.

The document selectory determines which documents within a forest are included in each split. The sub-document expression determines which fragments of the selected documents contribute input key-value pairs. The MarkLogic Connector for Hadoop API uses separate document selector and sub-document expression components to maximize internal input query optimization.

The document selector must be a partially searchable XPath expression. For details, see Fully Searchable Paths and cts:search Operations in the Query Performance and Tuning Guide.

The default document selector is fn:collection(), which selects all documents in the split. The default sub-document expression returns all nodes selected by the document selector. If you do not specify either property, all documents nodes in the split are used.

The table below shows the results from several example document selector and subdocument expression combinations.

document selector subdocument expression result
none none All document nodes in the split.
fn:collection("wikipedia") none All document nodes in the split that are in the wikipedia colleciton.

fn:collection("wikipedia")

//wp:nominee All wp:nominee elements in documents in the split that are in the wikipedia collection.

If your document selector or subdocument expression uses namespace qualifiers, define the namespaces using the mapreduce.marklogic.input.namespaces configuration property. The property value is a comma-separated list of alias-URI pairs. The following example defines a wp namespace and then uses it in the subdocument expression:

<property>
  <name>mapreduce.marklogic.input.namespace</name>
  <value>wp, http://www.marklogic.com/wikipedia</value>
</property>
<property>
  <name>mapreduce.marklogic.input.subdocumentexpr</name>
  <value>wp:nominee</value>
</property>

For details, see Example: Counting Href Links and the sample code included in the connector package.

Example: Counting Href Links

Consider the LinkCountInDoc example, described in LinkCountInDoc. This example counts hyperlinks within the input collection. The map function for this sample expects (node URI, node) input pairs and produces (href title, href count) output pairs.

Suppose the database contains an input document in the wikipedia collection, with the URI /oscars/drama.xml and the following content (ellided for brevity):

<wp:nominee>...
  <wp:film>...
    <wp:p>...
      <wp:a href="http://en.wikipedia.org/wiki/Drama_film"
            title="Drama film">
        This is a dramatic film
      </wp:a>
    </wp:p>
  </wp:film>
</wp:nominee>

Notice the content includes embedded hrefs with titles. The following job configuration property settings extract hrefs with titles that reference other Wikipedia articles:

<property>
  <name>mapreduce.marklogic.input.namespace</name>
  <value>wp, http://www.marklogic.com/wikipedia</value>
</property>
<property>
  <name>mapreduce.marklogic.input.documentselector</name>
  <value>fn:collection("wikipedia")</value>
</property>
<property>
  <name>mapreduce.marklogic.input.subdocumentexpr</name>
  <value>//wp:a[@href and @title and fn:starts-with(@href, "http://en.wikipedia.org")]/@title</value>
</property>

Using the above document selector and subdocument expression, the connector constructs an input query that first selects documents in the split that are in the wikipedia collection. Then, from those documents, all title attribute nodes in wp:a elements that contain hrefs to Wikipedia articles. Here is a breakdown of the path expression properties that drive the input query;

documentselector: fn:collection("wikipedia")
subdocumentexpr: 
  //wp:a[                    (: all anchors :)
  @href and                  (: with href attributes :)
  @title and                 (: and title attributes :)
  fn:starts-with(@href, "http://en.wikipedia.org")]
                             (: referring to Wikipedia articles :)
  /@title                    (: return the title attribute node :)

The configured InputFormat subclass controls transformation of the fragments selected by the input query into input key-value pairs. For example, the input query for the sample document shown above finds a matching node with these characteristics:

document URI: /oscars/drama.xml

node path: /wp:nominee/wp:film/wp:abstract/html:p[1]/html:a[1]/@title

node: attribute node for title="Drama film" from this anchor element:
  <wp:a href="http://en.wikipedia.org/wiki/Drama_film" 
          title="Drama film">
    drama film
  </wp:a>

If you configure the job to use NodeInputFormat, the input key-value pairs are (NodePath, MarkLogicNode) pairs. That is, the key is the node URI and the value is the node. The sample data shown above results in this input pair:

key: /wp:nominee/wp:film/wp:p[1]/wp:a[1]/@title
value: attribute node for title="Drama film"

If you configure the job to use DocumentInputFormat instead, the input key-value pairs have type (DocumentURI, DatabaseDocument). The sample data results in the following input pair, differing from the previous case only in the key:

key: /oscars/drama.xml
value: attribute node for title="Drama film"

For a list of InputFormat subclasses provided by the connector, include the types of keys and values generated by each, see InputFormat Subclasses.

Advanced Input Mode

The connector supports basic and advanced input modes through the mapreduce.marklogic.input.mode configuration property. Advanced mode gives your application complete control over input split creation and fragment selection, at the cost of added complexity. In advanced mode, you must supply an input split query and an input query which the connector uses to map phase input.

This section covers the following topics:

For details on basic mode, see Basic Input Mode. For a general discussion of input splits and key-value pair creation, see How Hadoop Partitions Map Input Data.

Creating Input Splits

This section covers the following topics:

Overview

In advanced mode, your application controls input split creation by supplying an input split query in the mapreduce.marklogic.input.splitquery configuration property. Your input split query must generate (forest-id, record-count, host-name) tuples.

You can express your split query using either XQuery or Server-Side JavaScript. Use the property mapreduce.marklogic.input.queryLanguage to signify the query language; XQuery is the default. In XQuery, build a split query using the XQuery library function hadoop:get-splits, in combination with your own XPath and cts:query. In JavaScript, build a split query using the JavaScript function hadoop.getSplits with your own XPath and cts:query.

The split query returns a host name and forest id because the MarkLogic Connector for Hadoop interacts with MarkLogic Server at the forest level. When a split is assigned to a map task, the connector running in the task submits the input query directly against the forest identified in the split tuple. The host-name and forest-id in the split tuple identify the target forest.

The record-count in the split tuple only needs to be an rough estimate of the number of input key-value pairs in the split. The estimate need not be accurate. What constitutes a record is job-dependent. For example, a record can be a document fragment, a node, or a value from a lexicon.

For example, basic input mode uses a simple estimate of the total number of documents in each forest. When the input query runs against a split, it can generate more or fewer input key-value pairs than estimated. The more accurate the record count estimate is, the more accurately Hadoop balances workload across tasks.

An input split never spans multiple forests, but the content in a single forest may span multiple splits. The maximum number of records in a split is determined by the com.marklogic.mapreduce.maxsplitsize configuration property. The connector, rather than your split query, handles bounding splits by this maximum.

For example, if the split query returns a count of 1000 fragments for a forest and the max split size is 600, the connector generates two splits for that forest, one for the first 500 fragments and the other for the next 500 fragments. The connector adjusts the actual split size downward as needed to generate splits of similar size, so you do not get one split of 600 fragments and another of 400 fragments. This rebalancing keeps the workload even across tasks.

Creating a Split Query with hadoop:get-splits

Use the XQuery library function hadoop:get-splits or the Server-Side JavaScript function hadoop.getSplits to create split tuples using a searchable expression and cts:query. The parameters to hadoop:get-splits and hadoop.getSplits determine the documents under consideration in each forest, equivalent to the $expression and $query parameters of cts:search. The function returns an estimate rather than a true count to improve performance.

The following split query example returns one tuple per forest, where the count is an estimate of the total number of documents in the forest.

Query Language Example
XQuery
<property>
  <name>mapreduce.marklogic.input.splitquery</name>
  <value><![CDATA[
    declare namespace
      wp="http://www.mediawiki.org/xml/export-0.4/";
    import module namespace hadoop = 
        "http://marklogic.com/xdmp/hadoop" at
        "/MarkLogic/hadoop.xqy";
    hadoop:get-splits('', 'fn:doc()', 'cts:and-query(())')
  ]]></value>
</property>
JavaScript
<property>
  <name>mapreduce.marklogic.input.querylanguage</name>
  <value>Javascript</value>
</property>
<property>
  <name>mapreduce.marklogic.input.splitquery</name>
  <value><![CDATA[
    var hadoop = require("/MarkLogic/hadoop.xqy");
    hadoop.getSplits("", "fn:doc()", "cts:and-query(())")
  ]]></value>
</property>

You can write an input split query without hadoop:get-splits (or hadoop.getSplits), but your job may require additional privileges if you do so. The example below is equivalent to the previous example, but it does not use hadoop:get-splits. Unlike the previous example, however, admin privileges are required to execute this sample query.

<property>
  <name>mapreduce.marklogic.input.splitquery</name>
  <value><![CDATA[
    declare namespace wp="http://marklogic.com/wikipedia";
    import module namespace admin = "http://marklogic.com/xdmp/admin" 
      at "/MarkLogic/admin.xqy";
    let $conf := admin:get-configuration()
    for $forest in xdmp:database-forests(xdmp:database())
      let $host_id := admin:forest-get-host($conf, $forest)
      let $host_name := admin:host-get-name($conf, $host_id)
      let $cnt := 
        xdmp:estimate(
          cts:search(fn:doc(), cts:and-query(()), (), 0.0, $forest))
      return if ($cnt > 0) then ($forest, $cnt, $host_name)
             else()
  ]]></value>
</property>

If you create a Server-Side JavaScript split query that does not use hadoop.getSplits, your script must return a Sequence in which the entries are tuples of the form (forest-id, record-count, host-name). That is, the Sequence contains values in the sequence:

forest1, count1, host1, forest2, count2, host2,...

You can create a Sequence from a JavaScript array using xdmp.arrayValues or Sequence.from.

When you create a split query that does not use hadoop:get-splits or hadoop.getSplits, do not return results for forests whose count is less than or equal to zero. This is why the previous example tests $cnt before returning the split data:

if ($cnt > 0) then ($forest, $cnt, $host_name)
else()

Creating Input Key-Value Pairs

As described in Understanding the MapReduce Job Life Cycle, map input key-value pairs are generated by each map task from the input split assigned to the task. In most cases, the MarkLogic Server content in the split is selected by the input query in the mapreduce.marklogic.input.query configuration property. ForestInputFormat selects documents differently; for details, see Direct Access Using ForestInputFormat.

The input query must return a sequence, but the nature of the items in the sequence is dependent on the configured InputFormat subclass. For example, if the job uses NodeInputFormat, the input key-value pairs are (NodePath, MarkLogicNode) pairs. Therefore, an input query for NodeInputFormat must return a sequence of nodes. The MarkLogic Connector for Hadoop then converts each node into a (NodePath, MarkLogicNode) pair for passage to the map function.

The table below summarizes the input query results expected by each InputFormat subclass provided by the MarkLogic Connector for Hadoop. For more information about the classes, see InputFormat Subclasses.

InputFormat subclass Input query result
DocumentInputFormat document-node()*
NodeInputFormat node()*
KeyValueInputFormat

A sequence of alternating keys and values, (key1, value1, ..., keyN, valueN). The

key and value types depend on the job configuration. See Using KeyValueInputFormat and ValueInputFormat.

ValueInputFormat A sequence of values, (value1,...,valueN). The value type depends on the job configuration. See Using KeyValueInputFormat and ValueInputFormat.
ForestInputFormat Not applicable. For details, see Direct Access Using ForestInputFormat.

KeyValueInputFormat and ValueInputFormat enable you to configure arbitrary key and/or value types, within certain constraints. For details, see Using KeyValueInputFormat and ValueInputFormat. When using these types, your input query must return keys and/or values compatible with the type conversions defined in Supported Type Transformations.

The following example uses ValueInputFormat with org.apache.hadoop.mapreduce.Text as the value type. (The key type with ValueInputFormat is always org.apache.hadoop.mapreduce.LongWritable. and is supplied automatically by the MarkLogic Connector for Hadoop.) The input query returns an attribute node, and the implicit type conversion built into ValueInputFormat converts the result into the attribute text for passage to the map function.

<property>
  <name>mapreduce.marklogic.input.query</name>
  <value><![CDATA[
    declare namespace wp="http://marklogic.com/wikipedia";
    for $t in fn:collection()/wp:nominee//wp:a[@title and @href]/@title
      return $t
  ]]></value>
</property>

Optimizing Your Input Query

In basic mode, the MarkLogic Connector for Hadoop optimizes the input query built from the configuration properties. In advanced input mode, you must optimize the input query yourself. Use the configuration property mapreduce.marklogic.input.bindsplitrange to optimize your input query by limiting the work your query does per split.

Usually, each input split covers a set of records (or fragments) in a specific forest; see the illustration in Creating Input Splits. The input split might cover only a subset of the content in a forest, but the input query runs against the entire forest. Constraining the content manipulated by the input query to just those under consideration for the split can significantly improve job performance.

For example, imagine an input query built around the following XPath expression:

fn:collection()//wp:a[@title]

In the worst case, evaluating the above expression examines every document in the forest for each split. If the forest contains 100 documents, covered by 4 splits of 25 fragments each, then the job might examine 4 * 100 documents to cover the content in the forest, and then discard all but 25 of the results in each split:

fn:collection()//wp:a[@title][1 to 25]
fn:collection()//wp:a[@title][26 to 50]
fn:collection()//wp:a[@title][51 to 75]
fn:collection()//wp:a[@title][76 to 100]

The range on each expression above, such as [1 to 25], is the split range. Constraining the set of documents by the split range earlier can be more efficient:

(fn:collection()[1 to 25])//wp:a[@title]
...

In practice MarkLogic Server might internally optimize such a simple XPath expression, but a more complex FLOWR expression can require hand tuning.

The exact optimization is query dependent, but the split start and end values are made available to your input query through external variables if you set the configuration property mapreduce.marklogic.input.bindsplitrange to true.

To use this feature, set the property to true and do the following in your input query:

  1. Declare the pre-defined namespace http://marklogic.com/hadoop. The split start and end variables are in this namespace. For example:
    declare namespace mlmr=http://marklogic.com/hadoop;
  2. Declare splitstart and splitend as external variables in the namespace declared in Step 1. You must use only these names. For example:
    declare variable $mlmr:splitstart as xs:integer external;
    declare variable $mlmr:splitend as xs:integer external;
  3. Use the variables to constrain your query.

For example, the following input query returns the first 1K bytes of each document in a collection of binary documents. Notice the query uses splitstart and splitend to construct a sub-binary node for just the documents relevant to each split:

xquery version "1.0-ml"; 
declare namespace mlmr="http://marklogic.com/hadoop";        (: 1 :)

declare variable $mlmr:splitstart as xs:integer external;    (: 2 :)
declare variable $mlmr:splitend as xs:integer external;

for $doc in fn:doc()[$mlmr:splitstart to $mlmr:splitend]     (: 3 :)
return xdmp:subbinary($doc/binary(), 0, 1000)

For a complete example, see BinaryReader.

Example: Counting Hrefs Using Advanced Mode

This example counts hyperlinks with titles using advanced input mode. The full code is available in the LinkCount example included in the MarkLogic Connector for Hadoop package. See LinkCount.

Suppose the input data has the following structure:

<wp:page>
  <title>The Topic</title>
  ...
  <text>
    <p>...
      <a href="Drama film" title="Drama film">
        Drama film
      </a>
    </p>
  </text>
</wp:page>

Notice the content includes embedded hrefs with titles. The input split query shown below estimates the number of documents in each forest by calling xdmp:estimate and returns (forest-id, count, host-name) tuples. The query is wrapped in a CDATA section so the contents are not interpreted by the Hadoop MapReduce configuration parser.

<property>
  <name>mapreduce.marklogic.input.splitquery</name>
  <value><![CDATA[
    declare namespace wp="http://www.mediawiki.org/xml/export-0.4/";
    import module namespace admin = 
      "http://marklogic.com/xdmp/admin" at "/MarkLogic/admin.xqy"; 
    let $conf := admin:get-configuration() 
    for $forest in xdmp:database-forests(xdmp:database()) 
      let $host_id := admin:forest-get-host($conf, $forest) 
      let $host_name := admin:host-get-name($conf, $host_id) 
      let $cnt := 
        xdmp:estimate(
          cts:search(fn:doc(), cts:and-query(()), (), 0.0, $forest))
      return ($forest, $cnt, $host_name)
  ]]></value>
</property>

The split query generates at least one split per forest. More than one split can be generated per forest, depending on the number of documents in the forest and the value of the configuration property mapreduce.marklogic.input.maxsplitsize. For details, see Creating Input Splits.

The example input query selects qualifying hrefs from each document in the split. A qualifying href in this example is one that has a title and refers to another Wikipedia topic, rather than to an image or an external file. The input query below uses an XPath expression to find qualifying hrefs and extract the text from the title attribute:

<property>
  <name>mapreduce.marklogic.input.query</name>
  <value><![CDATA[
    xquery version "1.0-ml"; 
    declare namespace wp="http://www.mediawiki.org/xml/export-0.4/";
    //wp:a[@title and @href and not
       (fn:starts-with(@href, "#")
        or fn:starts-with(@href, "http://") 
        or fn:starts-with(@href, "File:") 
        or fn:starts-with(@href, "Image:"))]/@title
  ]]></value>
</property>

The LinkCount sample uses the above split query and input query with ValueInputFormat configured to generate (LongWritable, Text) map input pairs, where the key is a unique (but uninteresting) number and the value is the text from the title attribute of qualifying hrefs. For example one input to the map function might be:

(42, Drama film)

For more information about ValueInputFormat, see Using KeyValueInputFormat and ValueInputFormat.

The map function of LinkCount discards the input key and generates a (Text, IntWritable) key-value pair where the key is the text from the title attribute (the value on the input pair), and the value is always one, to indicate a single reference to the topic:

public static class RefMapper 
extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text refURI = new Text();


    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException 
    {
        refURI.set(value);
        context.write(refURI, one);
    }
    ...

For example, if RefMapper.map receives (42, Drama film) as input, then it produces the output pair (Drama film, 1).

Using KeyValueInputFormat and ValueInputFormat

The KeyValueInputFormat and ValueInputFormat classes enable you to create input key-value pairs with types other than the connector specific types MarkLogicNode, NodePath, and DocumentURI.

This section covers the following topics about using KeyValueInputFormat and ValueInputFormat.

Overview

Use KeyValueInputFormat or ValueInputFormat to specify your own key-value type combinations, or when extracting input data from a MarkLogic Server lexicon. The connector performs a best-effort type conversioin between the key and/or value types from the input query or lexicon function and the target type. For details, see Supported Type Transformations.

Use KeyValueInputFormat type when your application depends upon both the key and value type. Use ValueInputFormat when only the value is interesting. In ValueInputFormat the key is simply a unique number with no significance.

Most lexicon functions require use of ValueInputFormat. You may use KeyValueInputFormat only with lexicon co-occurence functions. For details, see Using a Lexicon to Generate Key-Value Pairs.

For an example, see Example: Using KeyValueInputFormat.

Job Configuration

When using KeyValueInputFormat, specify the map phase input key and value type by setting the configuration properties mapreduce.marklogic.input.keyclass and mapreduce.marklogic.input.valueclass. For example, to set the properties in the configuration file:

<property>
  <name>mapreduce.marklogic.input.keyclass</name>
  <value>org.apache.hadoop.io.Text.class</value>
</property>
<property>
  <name>mapreduce.marklogic.input.valueclass</name>
  <value>com.marklogic.mapreduce.NodePath.class</value>
</property>

You can also use the org.apache.hadoop.mapreduce.Job API to set the property in code:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
...
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    ...
    conf.setClass("mapreduce.marklogic.input.valueClass", 
                  Text.class, Writable.class);

When using ValueInputFormat, specify the map phase value type by setting the configuration property mapreduce.marklogic.input.valueclass. The key type is built in and is always org.apache.hadoop.io.LongWritable.

Supported Type Transformations

The set of supported type transformations are listed in the table below. The list applies to both the key and value types when using KeyValueInputFormat, and to the value type when using ValueInputFormat.

If the target key type or value type is Then the query result type must be
org.apache.hadoop.io.Text Any XQuery type with a String representation. See com.marklog.xcc.types.XdmValue.asString().

org.apache.hadoop.io.IntWritable

org.apache.hadoop.io.VIntWritable

org.apache.hadoop.io.LongWritable

org.apache.hadoop.io.VLongWritable

xs:integer
org.apache.hadoop.io.BooleanWritable xs:boolean
org.apache.hadoop.io.FloatWritable xs:float
org.apache.hadoop.io.DoubleWritable xs:double
org.apache.hadoop.io.BytesWritable

xs:hexBinary

xs:base64Binary

com.marklogic.mapreduce.MarkLogicNode node()

If the query result type and target type do not correspond to one of the supported conversions, or the target type is Text but the result type does not have a String representation, java.lang.UnsupportedOperationException is raised.

Example: Using KeyValueInputFormat

Consider a job which uses KeyValueInputFormat to count the number of occurences of each href in the input document set and saves the results to HDFS. This is similar to the LinkCount sample included with the connector; see com.marklogic.mapreduce.examples.LinkCount.java and the conf/marklogic-advanced.xml configuration file.

This example uses advanced input mode with an input query that generates (xs:string, xs:integer) pairs containing the href title and a count from cts:frequency. The input query is shown below:

declare namespace wp="http://www.mediawiki.org/xml/export-0.4/";
declare variable $M := cts:element-attribute-value-co-occurrences(
    xs:QName("wp:a"),
    xs:QName("href"),
    xs:QName("wp:a"),
    xs:QName("title"),
    ("proximity=0", "map", 
     "collation=http://marklogic.com/collation/codepoint"),
    cts:directory-query("/space/wikipedia/enwiki/", "infinity")) ;
for $k in map:keys($M)[
    not(starts-with(., "#") or starts-with(., "http://") or
        starts-with(., "File:") or starts-with(., "Image:")) ]
let $v := map:get($M, $k)
where $v != ""
return 
    for $each in $v
    return ($each, cts:frequency($each))

KeyValueInputFormat generates (Text, IntWritable) map phase input key-value pairs from the above query. The key is the title from the href anchor and the value is the count. The following input configuration properties define the key and value types for KeyValueInputFormat:

<property>
    <name>mapreduce.marklogic.input.keyclass</name>
    <value>org.apache.hadoop.io.Text</value>
</property>
<property>
    <name>mapreduce.marklogic.input.valueclass</name>
    <value>org.apache.hadoop.io.IntWritable</value>
</property>

The job programmatically sets the InputFormat class to KeyValueFormat, and the Mapper.map method expects the corresponding input types, as shown below. KeyValueInputFormat handles converting the input query results for each pair from (xs:string, xs:integer) into (Text, IntWritable).

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import com.marklogic.mapreduce.KeyValueInputFormat;

public class LinkCount {
    public static class RefMapper 
    extends Mapper<Text, IntWritable, Text, IntWritable> {
        public void map(Text key, IntWritable value, Context context) {...}
    ...

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        ...
        Job job = new Job(conf);
        job.setInputFormatClass(KeyValueInputFormat.class);
        ...
    }

}

The same type conversions apply when using ValueInputFormat, but you do not need to configure mapreduce.marklogic.input.keyclass because the key class is always LongWritable. The LinkCount sample code uses ValueInputFormat and relies on the default key and value types to achieve the same effect as shown above.

Configuring a Map-Only Job

Some jobs do not require a reduce step. For example, a job which ingests documents in HDFS into MarkLogic Server may complete the ingestion during the map step. For an example of a map-only job, see ContentLoader.

To stop a job after the map completes, set the number of reduce tasks to zero. You may set the number of reduce tasks in a configuration file or programmatically. For example, to set the number of reduce tasks to 0 in a configuration, include the following setting:

<property>
  <name>mapred.reduce.tasks</name>
  <value>0</value>
</property>

To set the number of reduce tasks to 0 programmatically, use the Hadoop API function org.apache.hadoop.Configuration.setNumReduceTasks.

If your map-only job uses MarkLogic Server for output, you must disable the Hadoop MapReduce speculative execution feature. For details, see Disabling Speculative Execution.

Direct Access Using ForestInputFormat

Direct Access enables you to bypass MarkLogic Server and extract documents from a database by reading them directly from the on-disk representation of a forest. Use ForestInputFormat for Direct Access in a MapReduce job.

This advanced feature circumvents MarkLogic Server document management and is intended primarily for accessing offline and read-only forests as part of a tiered storage strategy. Most applications should use the other MarkLogicInputFormat implementations, such as DocumentInputFormat or NodeInputFormat.

This section covers the following Direct Access topics:

When to Consider ForestInputFormat

A forest is the internal representation of a collection of documents in a MarkLogic database; for details, see Understanding Forests in the Administrator's Guide. A database can contain many forests, and forests can have various states, including detached and readonly.

Direct Access enables you to extract documents directly from a detached or read-only forest without going through MarkLogic Server. Direct Access and ForestInputFormat are primarily intended for accessing archived data that is part of a tiered storage deployment; for details, see Tiered Storage in the Administrator's Guide. You should only use Direct Access on a forest that is offline or read-only; for details, see Limitations of Direct Access.

For example, if you have data that ages out over time such that you need to retain it, but you do not need to have it available for real time queries through MarkLogic Server, you can archive the data by taking the containing forests offline. Such data is still accessible using Direct Access.

You can store archived forests on HDFS or another filesystem, and access the documents stored in them from your MapReduce job, even if you do not have an active MarkLogic instance available. For example, ForestInputFormat can be used to support large scale batch data analytics without requiring all the data to be active in MarkLogic Server.

Since Direct Access bypasses the active data management performed by MarkLogic Server, you should not use it on forests receiving document updates. For details, see Limitations of Direct Access.

Limitations of Direct Access

You should only use ForestInputFormat on forests that meet one of the following criteria:

The following additional limitations apply to using Direct Access:

  • Accessing documents with Direct Access bypasses security roles and privileges. The content is protected only by the filesystem permissions on the forest data.
  • Direct Access cannot take advantage of indexing or caching when accessing documents. Every document in each participating forest is read, even when you use filtering criteria. Filtering can only be applied after reading a document off disk.
  • Direct Access skips property fragments.
  • Direct Access skips documents partitioned into multiple fragments. For details, see Fragments in the Administrator's Guide.

ForestInputFormat skips any forest (or a stand within a forest) that is receiving updates or is in an error state, as well as property fragments and fragmented documents. Processing continues even when some documents are skipped.

Your forest data must be reachable from the hosts in your Hadoop cluster. If your job accesses the contents of large or external binary documents retrieved with ForestInputFormat, the following additional reachability requirements apply:

  • If your job accesses the contents of a large binary, the large data directory must be reachable using the same path as when the forest was online.
  • If your job accesses the contents of an external binary, the directory containing the external content should be reachable using the same path that xdmp:external-binary-path returns when the containing forest is online.

If your job does not access the contents of any large or external binary documents, then the large data directory and/or external binary directory need not be reachable.

For example, consider a forest configured to use hdfs://my/large/data as a large data directory when it was live. If your map function is called with a LargeBinaryDocument from this forest, you can safely call LargeBinaryDocument.getContentSize even if the large data directory is not reachable. However, you can only successfully call LargeBinaryDocument.getContentAsMarkLogicNode if hdfs://my/large/data is reachable.

Similarly, consider a forest that contains an external binary document inserted into the database with /my/external-images/huge.jpg as the path to the external data. If your map function is called with a LargeBinaryDocument corresponding to this binary, you can safely call LargeBinaryDocument.getPath even if /my/external-images is not reachable. However, you can only successfully call LargeBinaryDocument.getContentAsByteArray if /my/external-images/huge.jpg is reachable.

To learn more about how MarkLogic Server stores content, see Where to Find More Information.

Controlling Input Document Selection

You cannot specify an input query or document selector with ForestInputFormat. Use the following configuration properties to filter the input documents instead.

Even with filtering, all documents in the forest(s) are accessed because indexing does not apply. Filtering only limits which documents are passed to your map or reduce function.

Filter Description
directory Include only documents in specific database directories. To specify a directory filter, use the configuration property mapreduce.marklogic.input.filter.directory.
collection Include only documents in specific collections. To specify a collection filter, use the configuration property mapreduce.marklogic.input.filter.collection.
document type Include only documents of the specified types. You can select one or more of XML, TEXT, and BINARY. To specify a document type filter, use the configuration property mapreduce.marklogic.input.filter.type.

For more details, see Input Configuration Properties.

You can specify multiple directories, collections or document types by using a comma separated list of values. You can specify combinations of directory, collection, and document type filters. The following example configuration settings select XML and text documents in the database directory /documents/ that are in either the invoices or receipts collections..

<property>
  <name>mapreduce.marklogic.input.filter.directory</name>
  <value>/documents/</value>
</property>
<property>
  <name>mapreduce.marklogic.input.filter.collection</name>
  <value>invoices,receipts</value>
</property>
<property>
  <name>mapreduce.marklogic.input.filter.type</name>
  <value>XML,TEXT</value>
</property>

Specifying the Input Forest Directories

When you use ForestInputFormat, your job configuration must include the filesystem or HDFS directories containing the input forest data. ForestInputFormat is a subclass of the Apache Hadoop FileInputFormat class, so you can configure the forest directories using FileInputFormat.setInputPaths.

For example, the following code assumes the forest input directories are passed in as a command line argument on the hadoop command line that executes the job:

public void main(final String[] args) throws Exception {
    ...
    Job job = new Job(super.getConf());
    job.setJarByClass(MapTreeReduceTree.class);
    
    // Map related configuration
    job.setInputFormatClass(ForestInputFormat.class);
    job.setMapperClass(MyMapper.class);
    job.setMapOutputKeyClass(DocumentURI.class);
    job.setMapOutputValueClass(DOMDocument.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    ...
}

The directory(s) passed to setInputPaths should be the directory that contains the entire forest. For example, the default location of a forest named my-forest on Linux is /var/opt/MarkLogic/Forests/my-forest, and that is the path you would pass to use my-forest as an input source for your job. Similarly, if you configure your data directory to be hdfs://MarkLogic, then the path to the forest might be hdfs://MarkLogic/Forests/my-forest.

Determining Input Document Type in Your Code

When you use ForestInputFormat, your map or reduce function receives (DocumentURI, ForestDocument) input key-value pairs. ForestDocument is an abstract class. The concrete document object type depends on the content type of the data in the document. Use MarkLogicDocument.getContentType to determine the appropriate concrete type.

The following table shows the correspondence between the com.marklogic.mapreduce.ContentType returned by MarkLogicDocument.getContentType and the concrete type of the document in a key-value pair. Binary documents are further specialized into RegularBinaryDocument and LargBinaryDocument; for details, see Working With Binary Documents in the Application Developer's Guide.

Content Type ForestDocument Subclass
XML
TEXT
com.marklogic.mapreduce.DOMDocument
BINARY
com.marklogic.mapreduce.BinaryDocument
  com.marklogic.mapreduce.RegularBinaryDocument
  com.marklogic.mapreduce.LargeBinaryDocument

DOMDocument is a read-only representation of a document as it is stored in the database. Accessors allow you to convert the content into other forms for further manipulation. For example, you can convert the contents to String, MarkLogicNode, or org.w3c.dom.Document. For details, see the JavaDoc for com.marklogic.mapreduce.DOMDocument.

The following example demonstrates coercing a ForestDocument into an appropriate concrete type based on the content type.

public static class MyMapper 
extends Mapper<DocumentURI, ForestDocument, DocumentURI, DOMDocument> {
    public static final Log LOG = LogFactory.getLog(MyMapper.class);
        
    public void map(
        DocumentURI key, ForestDocument value, Context context) 
    throws IOException, InterruptedException {
        if (value != null && 
            value.getContentType() != ContentType.BINARY) {
            DOMDocument domdoc = (DOMDocument)value;
            // work with the document...
        } else if (value != null) {
            if (value instanceof LargeBinaryDocument) {
                LargeBinaryDocument lbd = (LargeBinaryDocument)value;
                // work with the document...
            } else if (value instanceof RegularBinaryDocument) {
                RegularBinaryDocument rbd =
                    (RegularBinaryDocument)value;
                // work with the document...
            }
        }
    }
}

Input Configuration Properties

The table below summarizes connector configuration properties for using MarkLogic Server as an input source. Some properties can only be used with basic input mode and others can only be used with advanced input mode. For more information, see com.marklogic.mapreduce.MarkLogicConstants in the MarkLogic Hadoop MapReduce Connector API.

Property Description
mapreduce.marklogic.input.username Username privileged to read from the database attached to your XDBC App Server.
mapreduce.marklogic.input.password Cleartext password for the input.username user.
mapreduce.marklogic.input.host Hostname of the server hosting your input XDBC App Server.
mapreduce.marklogic.input.port The port configured for your XDBC App Server on the input host.
mapreduce.marklogic.input.usesll Whether or not to use an SSL connection to the server. Set to true or false.
mapreduce.marklogic.input.ssloptionsclass The name of a class implementing SslConfigOptions, used to configure the SSL connection when input.usessl is true.
mapreduce.marklogic.input.documentselector An XQuery path expression step specifying a sequence of document nodes to use for input. Basic mode only. Not usable with ForestInputFormat. Default: fn:collection(). See Basic Input Mode.
mapreduce.marklogic.input.subdocumentexpr An XQuery path expression used with input.documentselector to select sub-document items to use in input key-value pairs. Basic mode only. Not usable with ForestInputFormat. See Basic Input Mode.
mapreduce.marklogic.input.namespace A comma-separated list of namespace name-URI pairs to use when evaluating the path expression constructed from input.documentselector and input.subdocumentexpr. Basic mode only. See Basic Input Mode.
mapreduce.marklogic.input.lexiconfunctionclass The class type of a lexicon function subclass. Use with KeyValueInputFormat and ValueInputFormat. Only usable in basic input mode. See Using a Lexicon to Generate Key-Value Pairs.
mapreduce.marklogic.input.splitquery In advanced input mode, the query used to generate input splits. See Creating Input Splits.
mapreduce.marklogic.input.query In advanced input mode, the query used to select input fragments from MarkLogic Server. See Creating Input Key-Value Pairs.
mapreduce.marklogic.input.queryLanguage The implementation language of mapreduce.marklogic.input.query. Allowed values: xquery, javascript. Default: xquery.
mapreduce.marklogic.input.bindsplitrange Indicates whether or not your input query requires access to the external split range variables, splitstart and splitend. Only usable in advanced input mode. Default: false. See Optimizing Your Input Query.
mapreduce.marklogic.input.maxsplitsize The maximum number of records (fragments) per input split. Default: 50,000.
mapreduce.marklogic.input.keyclass The class type of the map phase input keys. Use with KeyValueInputFormat. See Using KeyValueInputFormat and ValueInputFormat.
mapreduce.marklogic.input.valueclass The class type of the map phase input values. Use with KeyValueInputFormat and ValueInputFormat. See Using KeyValueInputFormat and ValueInputFormat.
mapreduce.marklogic.input.recordtofragmentratio Defines the ratio of the number of retrieved input records to fragments. Used only for MapReduce's progress reporting. Default: 1.0.
mapreduce.marklogic.input.indented Whether or not to indent (prettyprint) XML extracted from MarkLogic Server. Set to true, false, or serverdefault. Default: false.
mapreduce.marklogic.input.filter.collection A comma-separated list of collection names, specifying which document collections to use for input with ForestInputFormat. Optional. Default: Do not filter input by collection.
mapreduce.marklogic.input.filter.directory A comma-separated list of database directory paths. Only documents in these directories are included in the input document set with ForestInputFormat. Optional. Default: Do not filter input by database directory.
mapreduce.marklogic.input.filter.type Filter input by content type when using ForestInputFormat. Allowed values: XML, TEXT, BINARY. You can specify more than one value. Optional. Default: Include all content types.

InputFormat Subclasses

The MarkLogic Connector for Hadoop API provides subclasses of InputFormat for defining your map phase input splits and key-value pairs when using MarkLogic Server as an input source. Specify the InputFormat subclass appropriate for your job using the org.apache.hadoop.mapreduce.job.setInputFormatClass function. For example:

import com.marklogic.mapreduce.NodeInputFormat;
import org.apache.hadoop.mapreduce.Job;
...
public class LinkCountInDoc {
    ...
    public static void main(String[] args) throws Exception {
        Job job = new Job(conf);
        job.setInputFormatClass(NodeInputFormat.class);
        ...
    }

}

The following table summarizes the InputFormat subclasses provided by the connector and the key and value types produced by each class. All classes referenced below are in the package com.marklogic.mapreduce. For more details, see the MarkLogic Hadoop MapReduce Connector API.

Class Key Type Value Type Description
MarkLogicInputFormat
any any Abstract superclass for all connector specific InputFormat types. Generates input splits and key-value pairs from MarkLogic Server.
DocumentInputFormat
DocumentURI
DatabaseDocument
Generates key-value pairs using each node selected by the input query as a value and the URI of the document containing the node as a key.
NodeInputFormat
NodePath
MarkLogicNode
Generates key-value pairs using each node selected by the input query as a value and the node path of the node as a key.
ForestInputFormat
DocumentURI
ForestDocument
Generates key-value pairs using each document selected by the input filters as a value and the URI of the document as a key. For details, see Direct Access Using ForestInputFormat.
KeyValueInputFormat
any any Uses input from MarkLogic Server to generate input key-value pairs with user defined types. Use mapreduce.marklogic.input.keyclass and mapreduce.marklogic.input.valueclass to specify the key and value types. See Using KeyValueInputFormat and ValueInputFormat.
ValueInputFormat
Int
any Generates input key-value pairs where only the value is of interest. The key is simply a count of the number of items seen when the pair is generated. Use mapreduce.marklogic.input.valueclass to specify the value type. See Using KeyValueInputFormat and ValueInputFormat.

Lexicon Function Subclasses

The following table shows the correspondence between MarkLogic Server lexicon built-in functions and MarkLogic Connector for Hadoop lexicon function wrapper classes. Locate the lexicon function you wish to use and include a subclass of the corresponding wrapper class in your job. For details, see Using a Lexicon to Generate Key-Value Pairs.

Lexicon Wrapper Superclass XQuery Lexicon Function
ElemAttrValueCooccurrences cts:element-attribute-value-co-occurrences
ElemValueCooccurrences cts:element-value-co-occurrences
FieldValueCooccurrences cts:field-value-co-occurrences
ValueCooccurrences cts:value-co-occurrences
CollectionMatch cts:collection-match
ElementAttributeValueMatch cts:element-attribute-value-match
ElementAttributeWordMatch cts:element-attribute-word-match
ElementValueMatch cts:element-value-match
ElementWordMatch cts:element-word-match
FieldValueMatch cts:field-value-match
FieldWordMatch cts:field-word-match
UriMatch cts:uri-match
ValueMatch cts:value-match
WordMatch cts:word-match
Collections cts:collections
ElementAttributeValues cts:element-attribute-values
ElementAttributeWords cts:element-attribute-words
ElementValues cts:element-values
ElementWords cts:element-words
FieldValues cts:field-values
FieldWords cts:field-words
Uris cts:uris
Values cts:values
Words cts:words
« Previous chapter
Next chapter »