BatcherTo facilitate long-running read, update, and delete use cases,
coordinates threads to process batches of uris matching a query or
coming from an Iterator. Each batch of uris matching a query will
come from a single forest. The host for that forest is the target
of the DatabaseClient provided to the listener's processEvent
method. The query is performed directly on each forest associated
with the database for the DatabaseClient provided to
DataMovementManager. The end goal of each job is determined by the
listeners registered with onUrisReady. The data set from which
batches are made and on which processing is performed is determined
by the
query or
Iterator used to construct this instance.
ApplyTransformListener,
DeleteListener,
ExportListener,
and ExportToWriterListener.
The provided listeners are used by adding an instance via
onUrisReady like so:
QueryBatcher qhb = dataMovementManager.newQueryBatcher(query)
.withConsistentSnapshot()
.onUrisReady( new DeleteListener() )
.onQueryFailure(exception -> exception.printStackTrace());
JobTicket ticket = dataMovementManager.startJob(qhb);
qhb.awaitCompletion();
dataMovementManager.stopJob(ticket);
Custom listeners will generally use the [MarkLogic Java Client API][] to manipulate the documents for the uris in each batch.
QueryBatcher is designed to be highly scalable and performant. To accommodate the largest result sets, QueryBatcher paginates through matches rather than loading matches into memory. To prevent queueing too many tasks when running a query, QueryBatcher only adds another task when one completes the query and is about to send the matching uris to the onUrisReady listeners.
For pagination to succeed, you must not modify the result set during pagination. This means you must
withConsistentSnapshot(),
or
Iterator instead of a
query.Sample usage using withConsistentSnapshot():
QueryDefinition query = new StructuredQueryBuilder().collection("myCollection");
QueryBatcher qhb = dataMovementManager.newQueryBatcher(query)
.withBatchSize(1000)
.withThreadCount(20)
.withConsistentSnapshot()
.onUrisReady(batch -> {
for ( String uri : batch.getItems() ) {
if ( uri.endsWith(".txt") ) {
client.newDocumentManager().delete(uri);
}
}
})
.onQueryFailure(exception -> exception.printStackTrace());
JobTicket ticket = dataMovementManager.startJob(qhb);
qhb.awaitCompletion();
dataMovementManager.stopJob(ticket);
Example of queueing uris in memory instead of using
withConsistentSnapshot():
ArrayList<String> uris = Collections.synchronizedList(new ArrayList<>());
QueryBatcher getUris = dataMovementManager.newQueryBatcher(query)
.withBatchSize(5000)
.onUrisReady( batch -> uris.addAll(Arrays.asList(batch.getItems())) )
.onQueryFailure(exception -> exception.printStackTrace());
JobTicket getUrisTicket = dataMovementManager.startJob(getUris);
getUris.awaitCompletion();
dataMovementManager.stopJob(getUrisTicket);
// now we have the uris, let's step through them
QueryBatcher performDelete = moveMgr.newQueryBatcher(uris.iterator())
.onUrisReady(new DeleteListener())
.onQueryFailure(exception -> exception.printStackTrace());
JobTicket ticket = dataMovementManager.startJob(performDelete);
performDelete.awaitCompletion();
dataMovementManager.stopJob(ticket);
To queue uris to disk (if not enough memory is available) see
UrisToWriterListener.booleanbooleanawaitCompletion (long timeout,
TimeUnit unit)intintlongintintwithConsistentSnapshot was used
before starting the job, will return the MarkLogic server timestamp
associated with the snapshot.booleanDataMovementManager.stopJob was called), false
otherwiseonJobCompletion (QueryBatcherListener listener)onQueryFailure (QueryFailureListener listener)onUrisReady (QueryBatchListener listener)voidretry (QueryEvent queryEvent)voidretryListener (QueryBatch batch,
QueryBatchListener queryBatchListener)voidretryWithFailureListeners (QueryEvent queryEvent)voidvoidsetMaxBatches (long maxBatches)voidsetQueryFailureListeners (QueryFailureListener... listeners)voidsetQueryJobCompletionListeners (QueryBatcherListener... listeners)voidsetUrisReadyListeners (QueryBatchListener... listeners)withBatchSize (int docBatchSize)withBatchSize (int docBatchSize,
int docToUriBatchRatio)withForestConfig (ForestConfiguration forestConfig)DataMovementManager.readForestConfig()
then set via withForestConfig.withJobName (String jobName)withThreadCount (int threadCount)getBatchSize,
getForestConfig,
getJobEndTime, getJobId, getJobName, getJobStartTime, getPrimaryClient, getThreadCount, isStartedlistener - the action which has to be done when
uris are readyAdd a listener to run each time there is an exception retrieving a batch of uris.
These listeners will not run when an exception is thrown by a listener registered with onUrisReady. To learn more, please see Handling Exceptions in Listeners
listener - the code to run when a failure
occursAdd a listener to run when the Query job is completed i.e. when all the document URIs are retrieved and the associated listeners are completed
listener - the code to run when the Query job is
completedqueryEvent - the information about the batch that
failedlisteners - the QueryBatchListener instances this
batcher should uselisteners - the QueryFailureListener instances
this batcher should uselisteners - the QueryBatcherListener instances
this batcher should use
query, not with an
Iterator. This is required when performing a delete of
documents matching the query or any modification (including
ApplyTransformListener) of matching documents which would cause
them to no longer match the query (otherwise pagination through the
result set would fail because pages shift as documents are deleted
or modfied to no longer match the query).DataMovementManager.readForestConfig()
then set via withForestConfig.
withForestConfig in interface BatcherforestConfig - the updated
ForestConfigurationwithJobName in
interface BatcherjobName - the name you would like to assign to
this jobwithBatchSize in
interface BatcherdocBatchSize - the number of documents processed
in a batchdocBatchSize - the number of documents processed
in a batchdocToUriBatchRatio - the ratio of the document
processing batch to the document uri collection batch. The
docToUriBatchRatio should ordinarily be larger than 1 because URIs
are small relative to full documents and because collecting URIs
from indexes is ordinarily faster than processing documents.startJob) is used to queue all batches--so
startJob will not return until all iteration is complete and all
batches are queued. For Iterators this thread count is the number
of threads used for processing the queued batches (running
processEvent on the listeners regiested with onUrisReady). As of
the 6.2.0 release, this can now be adjusted after the batcher has
been started. The underlying Java ThreadPoolExecutor
will have both its core and max pool sizes set to the given thread
count. Use caution when reducing this to a value of 1 while the
batcher is running; in some cases, the underlying
ThreadPoolExecutor may halt execution of any tasks.
Execution can be resumed by increasing the thread count to a value
of 2 or higher.withThreadCount in
interface BatcherthreadCount - the number of threads to use in this
Batchertimeout - the maximum time to waitunit - the time unit of the timeout argumentInterruptedException - if interrupted
while waitingDataMovementManager.stopJob was called), false
otherwiseisStopped in
interface BatcherDataMovementManager.stopJob was called), false
otherwisegetJobTicket in
interface BatcherIllegalStateException - if this job has
not yet been startedbatch - the QueryBatch for which we need to
process the listenerqueryBatchListener - the QueryBatchListener which
needs to be appliedqueryEvent - the information about the batch that
failedmaxBatches - is the value of the limit.withConsistentSnapshot was used
before starting the job, will return the MarkLogic server timestamp
associated with the snapshot. Returns null otherwise.Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.