Package com.marklogic.client.datamovement

The MarkLogic Data Movement SDK supports long-running write, read, delete, or transform jobs. Long-running write jobs are enabled by WriteBatcher. Long-running read, delete, or transform jobs are enabled by QueryBatcher which can perform actions on all uris matching a query or on all uris provided by an Iterator<String>.

Features:

Using Provided Listeners

When using QueryBatcher, your custom listeners provided to onUrisReady can do anything with each batch of uris and will usually use the MarkLogic Java Client API to do things. However, to simplify common use cases, the following listeners are also provided:

   ApplyTransformListener  - Modifies documents in-place in the database by applying a server-side transform
   ExportListener          - Downloads each document for further processing in Java
   ExportToWriterListener  - Downloads each document and writes it to a Writer (could be a file, HTTP response, in-memory Writer, etc.
   DeleteListener          - Deletes each batch of documents from the server
   UrisToWriterListener    - Writes each uri to a Writer (could be a file, HTTP response, etc.).
 

Using QueryBatcher

When you need to perform actions on server documents beyond what can be done with the provided listeners, register your custom code with onUrisReady and your code will be run for each batch of uris.

For Example:

     QueryBatcher qhb = dataMovementManager.newQueryBatcher(query)
         .withBatchSize(1000)
         .withThreadCount(20)
           .withConsistentSnapshot()
         .onUrisReady(batch -> {
             for ( String uri : batch.getItems() ) {
                 if ( uri.endsWith(".txt") ) {
                     client.newDocumentManager().delete(uri);
                 }
             }
         })
         .onQueryFailure(queryBatchException -> queryBatchException.printStackTrace());
     JobTicket ticket = dataMovementManager.startJob(qhb);
     qhb.awaitCompletion();
     dataMovementManager.stopJob(ticket);

Using WriteBatcher

When you need to write a very large volume of documents and mlcp cannot meet your requirements, use WriteBatcher.

For Example:

     WriteBatcher whb = dataMovementManager.newWriteBatcher()
         .withBatchSize(100)
         .withThreadCount(20)
         .onBatchSuccess(batch -> {
             logger.debug("batch # {}, so far: {}", batch.getJobBatchNumber(), batch.getJobResultsSoFar());
         })
         .onBatchFailure((batch,throwable) -> throwable.printStackTrace() );
     JobTicket ticket = dataMovementManager.startJob(whb);
     // the add or addAs methods could be called in separate threads on the
     // single whb instance
     whb.add  ("doc1.txt", new StringHandle("doc1 contents"));
     whb.addAs("doc2.txt", "doc2 contents");

     whb.flushAndWait(); // send the two docs even though they're not a full batch
     dataMovementManager.stopJob(ticket);

Writing Custom Listeners

As demonstrated above, listeners should be added to each instance of QueryBatcher or WriteBatcher. Ad-hoc listeners can be written as Java 8 lambda expressions. More sophisticated custom listeners can implement the appropriate listener interface or extend one of the provided listeners listed above.

QueryBatchListener (onUrisReady) instances are necessary to do something with the uris fetched by QueryBatcher. What a custom QueryBatchListener does is completely up to it, but any operation which operates on uris offered by any part of the Java Client API could be used, as could any read or write to an external system. QueryFailureListener (onQueryFailure) instances handle any exceptions encoutnered fetching the uris. WriteBatchListener (onBatchSuccess) instances handle any custom tracking requirements during a WriteBatcher job. WriteFailureListener (onBatchFailure) instances handle any exceptions encountered writing the batches formed from docs send to the WriteBatcher instance. See the javadocs for each provided listener for an explantion of the various listeners that can be registered for it to call. See javadocs, the Java Application Developer's Guide, source code for provided listeners, cookbook examples, and unit tests for more examples of listener implementation ideas.

Listners Must Be Thread-Safe

Since listeners are called asynchronously by all threads in the pool inside the QueryBatcher or WriteBatcher instance, they must only perform thread-safe operations. For example, accumulating to a collection should only be done with collections wrapped as synchronized Collections rather than directly using un-synchronized collections such as HashMap or ArrayList which are not thread-safe. Similarly, accumulating to a string should use StringBuffer insted of StringBuilder since StringBuffer is synchronized (and thus thread-safe). We also recommend java.util.concurrent.atomic classes.

Listeners should handle their own exceptions as described below in Handling Exceptions in Listeners.

Handling Exceptions in Listeners

Since listeners are called asynchrounously, external exception handling cannot wrap the call in a try-catch block. Instead, a listener can and should handle its own exceptions by wrapping the calls in its body in a try-catch block. When any listener does not handle its own exceptions and throws any exception (Throwable), the exception is logged at error level with a call like:

     logger.error("Exception thrown by an onBatchSuccess listener", throwable);

This achieves logging of exceptions without allowing them to prevent the job from continuing.

A QueryFailureListener or WriteFailureListener will not be notified of exceptions thrown by other listeners. Instead, these failure listeners are notified exclusively of exceptions in the operation of QueryBatcher or WriteBatcher.

If you wish a custom QueryBatchListener or WriteBatchListener to trap its own exceptions and pass them along to callbacks registered with it for exception handling, it can of course do that in a custom way. Examples of this pattern can be seen in the interface of ApplyTransformListener.

Pre-installed Listeners

Every time you create a new QueryBatcher or WriteBatcher it comes with some pre-installed listeners such as HostAvailabilityListener and a listener to track counts for JobReport. If you wish to remove these listeners and their associated functionality call one of the following: setUrisReadyListeners, setQueryFailureListeners, setBatchSuccessListeners, or setBatchFailureListeners. Obviously, removing the functionality of HostAvailabilityListener means it won't do its job of handling black-listing hosts or retrying batches that occur when a host is unavailable. And removing the functionality of the listeners that track counts for JobReport means JobReport should no longer be used. If you would just like to change the settings on HostAvailabilityListener or NoResponseListener, you can do something like the following:


    HostAvailabilityListener.getInstance(batcher)
      .withSuspendTimeForHostUnavailable(Duration.ofMinutes(60))
      .withMinHosts(2);

Enable Logging

We have made efforts to provide helpful logging as you use QueryBatcher and WriteBatcher. Please make sure to enable your slf4j-compliant logging framework.











































Skip navigation links

Copyright © 2022 MarkLogic Corporation