The MarkLogic Data Movement SDK supports long-running write,
read, delete, or transform jobs. Long-running write jobs are
enabled by WriteBatcher
.
Long-running read, delete, or transform jobs are enabled by
QueryBatcher
which can perform actions
on all uris matching a query
or
on all uris provided by an
Iterator<String>
.
add
from many threadsWhen using QueryBatcher, your custom listeners provided to
onUrisReady
can do anything with each batch of
uris and will usually use the MarkLogic Java Client
API to do things. However, to simplify common use cases, the
following listeners are also provided:
ApplyTransformListener
- Modifies documents in-place in the database by applying aserver-side transform
ExportListener
- Downloads each document for further processing in JavaExportToWriterListener
- Downloads each document and writes it to a Writer (could be a file, HTTP response, in-memory Writer, etc.DeleteListener
- Deletes each batch of documents from the serverUrisToWriterListener
- Writes each uri to a Writer (could be a file, HTTP response, etc.).
When you need to perform actions on server documents beyond what can be done with the provided listeners, register your custom code with onUrisReady and your code will be run for each batch of uris.
For Example:
QueryBatcher qhb = dataMovementManager.newQueryBatcher(query)
.withBatchSize(1000)
.withThreadCount(20)
.withConsistentSnapshot()
.onUrisReady(batch -> {
for ( String uri : batch.getItems() ) {
if ( uri.endsWith(".txt") ) {
client.newDocumentManager().delete(uri);
}
}
})
.onQueryFailure(queryBatchException -> queryBatchException.printStackTrace());
JobTicket ticket = dataMovementManager.startJob(qhb);
qhb.awaitCompletion();
dataMovementManager.stopJob(ticket);
When you need to write a very large volume of documents and mlcp cannot meet your requirements, use WriteBatcher.
For Example:
WriteBatcher whb = dataMovementManager.newWriteBatcher()
.withBatchSize(100)
.withThreadCount(20)
.onBatchSuccess(batch -> {
logger.debug("batch # {}, so far: {}", batch.getJobBatchNumber(), batch.getJobResultsSoFar());
})
.onBatchFailure((batch,throwable) -> throwable.printStackTrace() );
JobTicket ticket = dataMovementManager.startJob(whb);
// the add or addAs methods could be called in separate threads on the
// single whb instance
whb.add ("doc1.txt", new StringHandle("doc1 contents"));
whb.addAs("doc2.txt", "doc2 contents");
whb.flushAndWait(); // send the two docs even though they're not a full batch
dataMovementManager.stopJob(ticket);
As demonstrated above, listeners should be added to each instance of QueryBatcher or WriteBatcher. Ad-hoc listeners can be written as Java 8 lambda expressions. More sophisticated custom listeners can implement the appropriate listener interface or extend one of the provided listeners listed above.
QueryBatchListener (onUrisReady) instances are necessary to do something with the uris fetched by QueryBatcher. What a custom QueryBatchListener does is completely up to it, but any operation which operates on uris offered by any part of the Java Client API could be used, as could any read or write to an external system. QueryFailureListener (onQueryFailure) instances handle any exceptions encoutnered fetching the uris. WriteBatchListener (onBatchSuccess) instances handle any custom tracking requirements during a WriteBatcher job. WriteFailureListener (onBatchFailure) instances handle any exceptions encountered writing the batches formed from docs send to the WriteBatcher instance. See the javadocs for each provided listener for an explantion of the various listeners that can be registered for it to call. See javadocs, the Java Application Developer's Guide, source code for provided listeners, cookbook examples, and unit tests for more examples of listener implementation ideas.
Since listeners are called asynchronously by all threads in the
pool inside the QueryBatcher or WriteBatcher instance, they must
only perform thread-safe operations. For example, accumulating to a
collection should only be done with collections wrapped as
synchronized Collections
rather than directly using
un-synchronized collections such as HashMap or ArrayList which are
not thread-safe. Similarly, accumulating to a string should use
StringBuffer insted of StringBuilder since StringBuffer is
synchronized (and thus thread-safe). We also recommend
java.util.concurrent.atomic classes
.
Listeners should handle their own exceptions as described below in Handling Exceptions in Listeners.
logger.error("Exception thrown by an onBatchSuccess listener", throwable);
This achieves logging of exceptions without allowing them to prevent the job from continuing.
A QueryFailureListener or WriteFailureListener will not be notified of exceptions thrown by other listeners. Instead, these failure listeners are notified exclusively of exceptions in the operation of QueryBatcher or WriteBatcher.
If you wish a custom QueryBatchListener or WriteBatchListener to
trap its own exceptions and pass them along to callbacks registered
with it for exception handling, it can of course do that in a
custom way. Examples of this pattern can be seen in the interface
of ApplyTransformListener
.
Every time you create a new QueryBatcher or WriteBatcher it
comes with some pre-installed listeners such as HostAvailabilityListener
and a listener to track counts for JobReport. If you wish to remove
these listeners and their associated functionality call one of the
following:
setUrisReadyListeners
,
setQueryFailureListeners
,
setBatchSuccessListeners
, or
setBatchFailureListeners
. Obviously, removing the
functionality of HostAvailabilityListener means it won't do its job
of handling black-listing hosts or retrying batches that occur when
a host is unavailable. And removing the functionality of the
listeners that track counts for JobReport means JobReport should no
longer be used. If you would just like to change the settings on
HostAvailabilityListener or NoResponseListener, you can do
something like the following:
HostAvailabilityListener.getInstance(batcher)
.withSuspendTimeForHostUnavailable(Duration.ofMinutes(60))
.withMinHosts(2);
We have made efforts to provide helpful logging as you use QueryBatcher and WriteBatcher. Please make sure to enable your slf4j-compliant logging framework.
Interface | Description |
---|---|
Batch<T> |
A group of items (generally documents or uris)
and context representing a completed action in a datamovement
job.
|
Batcher |
The base class (shared methods) for
QueryBatcher
and WriteBatcher . |
BatchEvent |
A completed action in a datamovement job.
|
BatchFailureListener<T extends BatchEvent> |
A generic interface for listeners which process
failures on batches.
|
BatchListener<T extends BatchEvent> |
Runs processEvent on each batch as it is ready
during a QueryBatcher or WriteBatcher job.
|
DataMovementManager |
DataMovementManager is the starting point for
getting new instances of QueryBatcher, WriteBatcher and RowBatcher,
configured with a DatabaseClient and ForestConfiguration.
|
ExportToWriterListener.OutputListener |
The listener interface required by
onGenerateOutput.
|
FailureListener<T extends java.lang.Throwable> |
A generic base interface for listeners
implemented by QueryFailureListener for processing a Throwable that
caused a failure.
|
Forest |
Some details about a MarkLogic forest.
|
ForestConfiguration |
A reflection of the forest configuration
associated with the specified database (or the default database for
the specified port) in the MarkLogic cluster.
|
JacksonCSVSplitter.UriMaker |
UriMaker which generates URI for each split
file
|
JobReport |
A Job report is used to report status on a
WriteBatcher
or a QueryBatcher
job at any point of time after it is started and provide a snapshot
of the job's status at that time. |
JobTicket |
JobTicket is used to uniquely identify a
job.
|
JSONSplitter.UriMaker |
UriMaker which generates URI for each split
file
|
LineSplitter.UriMaker |
UriMaker which generates URI for each split
file
|
ProgressListener.ProgressUpdate |
Captures data of interest for a progress
update.
|
QueryBatch |
A group of uris retrieved from the Iterator or
matches to the QueryDefinition for this QueryBatcher job.
|
QueryBatcher |
To facilitate long-running read, update, and
delete use cases, coordinates threads to process batches of uris
matching a query or coming from an Iterator.
|
QueryBatcherListener |
Runs processEvent on the QueryBatcher and it is
a generic listener which can be registered and run when you deal at
the Batcher level.
|
QueryBatchListener |
Runs processEvent on each batch as it is ready
during a QueryBatcher job.
|
QueryEvent |
The context for a QueryBatch passed to
QueryBatchListener or an exception passed to QueryFailureListener,
indicating the state at the time this event occurred.
|
QueryFailureListener |
A listener which can process an exception which
occurred when attempting to retrieve a batch of matches to a
query.
|
RowBatcher<T> |
Coordinates threads to export all of the rows
from a view in batches.
|
RowBatchFailureListener |
Provides a callback (typically as a lambda) to
process an exception when trying to retrieve a batch of rows for a
view.
|
RowBatchFailureListener.RowBatchFailureEvent |
An exception which occurred when attempting to
retrieve a batch of rows for a view.
|
RowBatchSuccessListener<T> |
Provides a callback (typically as a lambda) to
process a batch of rows retrieved for a view.
|
RowBatchSuccessListener.RowBatchResponseEvent<T> |
A batch of rows retrieved for a view.
|
Splitter<T extends AbstractWriteHandle> |
Splitter splits an input stream into a Java
stream of write handles.
|
Splitter.UriMaker |
UriMaker generates URI for each split
file.
|
UnarySplitter.UriMaker |
UriMaker which generates URI for each split
file
|
UrisToWriterListener.OutputListener | |
WriteBatch |
A batch of documents written successfully.
|
WriteBatcher |
To facilitate long-running write jobs, batches
documents added by many external threads and coordinates internal
threads to send the batches round-robin to all appropriate hosts in
the cluster.
|
WriteBatchListener |
Runs processEvent on each batch as it is ready
during a WriteBatcher job.
|
WriteEvent |
Each WriteBatch is composed of many WriteEvents,
each of which represents all the information about a single
document which was written to the server.
|
WriteFailureListener |
The listener interface for handling exceptions
occurring withing WriteBatcher.
|
XMLSplitter.StartElementReader |
The StartElementReader is used in visitor to
check if the current element is the one to split.
|
XMLSplitter.UriMaker |
UriMaker which generates URI for each split
file
|
ZipSplitter.UriMaker |
UriMaker which generates URI for each split
file
|
Class | Description |
---|---|
ApplyTransformListener |
Modifies documents in-place in the database by
applying a
server-side
transform . |
DeleteListener |
Sends a Java API bulk
delete request for all the documents from each
batch. |
ExportListener |
Reads document contents (and optionally
metadata) for each batch, then sends each document to any listeners
registered with
onDocumentReady for further processing or writing
to any target supported by Java. |
ExportToWriterListener |
An extension of ExportListener which facilitates
writing all documents to a single Writer output stream.
|
ExtractRowsViaTemplateListener |
This QueryBatchListener takes in one or more
uris for templates as defined by Marklogic TDE (Template Driven
Extraction) and applies them to each batch of documents.
|
FilteredForestConfiguration |
A utility class for wrapping a
ForestConfiguration retrieved from
DataMovementManager.readForestConfig() . |
HostAvailabilityListener |
HostAvailabilityListener is automatically
registered with all QueryBatcher and WriteBatcher instances to
monitor for failover scenarios.
|
JacksonCSVSplitter |
The JacksonCSVSplitter class uses the Jackson
CSV parser without attempting to abstract it capabilities.
|
JSONSplitter<T extends JSONWriteHandle> |
The JSONSplitter is used to split large JSON
file into separate payloads for writing to the database.
|
JSONSplitter.ArrayVisitor |
The basic visitor only splits objects or arrays
under top array.
|
JSONSplitter.Visitor<T extends AbstractWriteHandle> |
The Visitor class is used to accumulate and
inspect state during the depth-first traversal of the JSON tree and
make the decision of how to split the JSON file.
|
LineSplitter |
The LineSplitter class is used to separate lines
in line-delimited JSON, XML or TEXT files.
|
NoResponseListener |
NoResponseListener is a default listener like
HostAvailabilityListener that is automatically registered with the
QueryBatcher and WriteBatcher instances.
|
PathSplitter |
The PathSplitter utility class splits the Stream
of paths into a Stream of AbstractWriteHandles or
DocumentWriteOperations suitable for writing in batches.
|
PeekingIterator<T> | |
ProgressListener |
Reports on progress as batches are processed by
sending an instance of the nested ProgressUpdate interface to
instances of java.util.function.Consumer.
|
ProgressListener.SimpleProgressUpdate |
Simple implementation of ProgressUpdate; only
real thing of interest in here is how it generates the progress as
a string for display purposes.
|
TypedRow | |
UnarySplitter |
UnarySplitter utility class makes it possible to
add entire files when splitting paths, either as the default
splitter or for specific extensions.
|
UrisToWriterListener |
Facilitates writing uris to a file when
necessary because setting merge
timestamp and
withConsistentSnapshot is not an option, but you
need to run DeleteListener or ApplyTransformListener. |
XMLSplitter<T extends XMLWriteHandle> |
The XMLSplitter is used to split large XML file
into separate payloads for writing to the database.
|
XMLSplitter.BasicElementVisitor |
The basic visitor only splits elements with
matching element namespace URI and local name.
|
XMLSplitter.Visitor<T extends XMLWriteHandle> |
The Visitor class is used to check if the
current element is the target to split.
|
ZipSplitter |
The ZipSplitter class is used to split
compressed files.
|
Enum | Description |
---|---|
ApplyTransformListener.ApplyResult |
Either
ApplyTransformListener.ApplyResult.REPLACE each
document with the result of the transform, or run the transform
with each document as input, but
ApplyTransformListener.ApplyResult.IGNORE the
result. |
Forest.HostType |
Enum containing the list of host types a forest
can have.
|
JobTicket.JobType | |
NodeOperation |
Different operations to traverse the tree
DESCENT tells the application to go down the tree SKIP tells the
application to skip current branch PROCESS tells the application to
process current branch
|
RowBatchFailureListener.BatchFailureDisposition |
Specifies how the RowBatcher should respond to
the failure to retrieve a batch of rows.
|
Exception | Description |
---|---|
DataMovementException |
The generic base exception used throughout the
Data Movement SDK.
|
QueryBatchException |
An exception which occurred when attempting to
retrieve a batch of matches to a query.
|
Copyright © 2022 MarkLogic Corporation