The MarkLogic Data Movement SDK supports long-running write,
read, delete, or transform jobs. Long-running write jobs are
enabled by WriteBatcher.
Long-running read, delete, or transform jobs are enabled by
QueryBatcher
which can perform actions
on all uris matching a query or
on all uris provided by an Iterator<String>.
add from many threadsWhen using QueryBatcher, your custom listeners provided to
onUrisReady can do anything with each batch of
uris and will usually use the MarkLogic Java Client
API to do things. However, to simplify common use cases, the
following listeners are also provided:
ApplyTransformListener- Modifies documents in-place in the database by applying aserver-side transformExportListener- Downloads each document for further processing in JavaExportToWriterListener- Downloads each document and writes it to a Writer (could be a file, HTTP response, in-memory Writer, etc.DeleteListener- Deletes each batch of documents from the serverUrisToWriterListener- Writes each uri to a Writer (could be a file, HTTP response, etc.).
When you need to perform actions on server documents beyond what can be done with the provided listeners, register your custom code with onUrisReady and your code will be run for each batch of uris.
For Example:
QueryBatcher qhb = dataMovementManager.newQueryBatcher(query)
.withBatchSize(1000)
.withThreadCount(20)
.withConsistentSnapshot()
.onUrisReady(batch -> {
for ( String uri : batch.getItems() ) {
if ( uri.endsWith(".txt") ) {
client.newDocumentManager().delete(uri);
}
}
})
.onQueryFailure(queryBatchException -> queryBatchException.printStackTrace());
JobTicket ticket = dataMovementManager.startJob(qhb);
qhb.awaitCompletion();
dataMovementManager.stopJob(ticket);
When you need to write a very large volume of documents and mlcp cannot meet your requirements, use WriteBatcher.
For Example:
WriteBatcher whb = dataMovementManager.newWriteBatcher()
.withBatchSize(100)
.withThreadCount(20)
.onBatchSuccess(batch -> {
logger.debug("batch # {}, so far: {}", batch.getJobBatchNumber(), batch.getJobResultsSoFar());
})
.onBatchFailure((batch,throwable) -> throwable.printStackTrace() );
JobTicket ticket = dataMovementManager.startJob(whb);
// the add or addAs methods could be called in separate threads on the
// single whb instance
whb.add ("doc1.txt", new StringHandle("doc1 contents"));
whb.addAs("doc2.txt", "doc2 contents");
whb.flushAndWait(); // send the two docs even though they're not a full batch
dataMovementManager.stopJob(ticket);
As demonstrated above, listeners should be added to each instance of QueryBatcher or WriteBatcher. Ad-hoc listeners can be written as Java 8 lambda expressions. More sophisticated custom listeners can implement the appropriate listener interface or extend one of the provided listeners listed above.
QueryBatchListener (onUrisReady) instances are necessary to do something with the uris fetched by QueryBatcher. What a custom QueryBatchListener does is completely up to it, but any operation which operates on uris offered by any part of the Java Client API could be used, as could any read or write to an external system. QueryFailureListener (onQueryFailure) instances handle any exceptions encoutnered fetching the uris. WriteBatchListener (onBatchSuccess) instances handle any custom tracking requirements during a WriteBatcher job. WriteFailureListener (onBatchFailure) instances handle any exceptions encountered writing the batches formed from docs send to the WriteBatcher instance. See the javadocs for each provided listener for an explantion of the various listeners that can be registered for it to call. See javadocs, the Java Application Developer's Guide, source code for provided listeners, cookbook examples, and unit tests for more examples of listener implementation ideas.
Since listeners are called asynchronously by all threads in the
pool inside the QueryBatcher or WriteBatcher instance, they must
only perform thread-safe operations. For example, accumulating to a
collection should only be done with collections wrapped as synchronized Collections rather
than directly using un-synchronized collections such as HashMap or
ArrayList which are not thread-safe. Similarly, accumulating to a
string should use StringBuffer insted of StringBuilder since
StringBuffer is synchronized (and thus thread-safe). We also
recommend java.util.concurrent.atomic classes.
Listeners should handle their own exceptions as described below in Handling Exceptions in Listeners.
logger.error("Exception thrown by an onBatchSuccess listener", throwable);
This achieves logging of exceptions without allowing them to prevent the job from continuing.
A QueryFailureListener or WriteFailureListener will not be notified of exceptions thrown by other listeners. Instead, these failure listeners are notified exclusively of exceptions in the operation of QueryBatcher or WriteBatcher.
If you wish a custom QueryBatchListener or WriteBatchListener to
trap its own exceptions and pass them along to callbacks registered
with it for exception handling, it can of course do that in a
custom way. Examples of this pattern can be seen in the interface
of ApplyTransformListener.
Every time you create a new QueryBatcher or WriteBatcher it
comes with some pre-installed listeners such as HostAvailabilityListener
and a listener to track counts for JobReport. If you wish to remove
these listeners and their associated functionality call one of the
following:
setUrisReadyListeners,
setQueryFailureListeners,
setBatchSuccessListeners, or
setBatchFailureListeners. Obviously, removing the
functionality of HostAvailabilityListener means it won't do its job
of handling black-listing hosts or retrying batches that occur when
a host is unavailable. And removing the functionality of the
listeners that track counts for JobReport means JobReport should no
longer be used. If you would just like to change the settings on
HostAvailabilityListener or NoResponseListener, you can do
something like the following:
HostAvailabilityListener.getInstance(batcher)
.withSuspendTimeForHostUnavailable(Duration.ofMinutes(60))
.withMinHosts(2);
We have made efforts to provide helpful logging as you use QueryBatcher and WriteBatcher. Please make sure to enable your slf4j-compliant logging framework.
server-side
transform.ApplyTransformListener.ApplyResult.REPLACE
each document with the result of the transform, or run the
transform with each document as input, but ApplyTransformListener.ApplyResult.IGNORE
the result.QueryBatcher
and WriteBatcher.
delete request for all the documents from each
batch.
onDocumentReady for further processing or writing to any
target supported by Java.DataMovementManager.readForestConfig().WriteBatcher
or a QueryBatcher
job at any point of time after it is started and provide a snapshot
of the job's status at that time.withConsistentSnapshot
is not an option, but you need to run DeleteListener or
ApplyTransformListener.Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.