public interface QueryBatcher extends Batcher
To facilitate long-running read, update, and delete use cases,
coordinates threads to process batches of uris matching a query or
coming from an Iterator. Each batch of uris matching a query will
come from a single forest. The host for that forest is the target
of the DatabaseClient provided to the listener's processEvent
method. The query is performed directly on each forest associated
with the database for the DatabaseClient provided to
DataMovementManager. The end goal of each job is determined by the
listeners registered with onUrisReady. The data set from which
batches are made and on which processing is performed is determined
by the
query
or
Iterator
used to construct this instance.
ApplyTransformListener
,
DeleteListener
,
ExportListener
,
and ExportToWriterListener
.
The provided listeners are used by adding an instance via
onUrisReady like so:
QueryBatcher qhb = dataMovementManager.newQueryBatcher(query)
.withConsistentSnapshot()
.onUrisReady( new DeleteListener() )
.onQueryFailure(exception -> exception.printStackTrace());
JobTicket ticket = dataMovementManager.startJob(qhb);
qhb.awaitCompletion();
dataMovementManager.stopJob(ticket);
Custom listeners will generally use the [MarkLogic Java Client API][] to manipulate the documents for the uris in each batch.
QueryBatcher is designed to be highly scalable and performant. To accommodate the largest result sets, QueryBatcher paginates through matches rather than loading matches into memory. To prevent queueing too many tasks when running a query, QueryBatcher only adds another task when one completes the query and is about to send the matching uris to the onUrisReady listeners.
For pagination to succeed, you must not modify the result set during pagination. This means you must
withConsistentSnapshot()
, orIterator
instead of a
query
.Sample usage using withConsistentSnapshot():
QueryDefinition query = new StructuredQueryBuilder().collection("myCollection");
QueryBatcher qhb = dataMovementManager.newQueryBatcher(query)
.withBatchSize(1000)
.withThreadCount(20)
.withConsistentSnapshot()
.onUrisReady(batch -> {
for ( String uri : batch.getItems() ) {
if ( uri.endsWith(".txt") ) {
client.newDocumentManager().delete(uri);
}
}
})
.onQueryFailure(exception -> exception.printStackTrace());
JobTicket ticket = dataMovementManager.startJob(qhb);
qhb.awaitCompletion();
dataMovementManager.stopJob(ticket);
Example of queueing uris in memory instead of using
withConsistentSnapshot():
ArrayList<String> uris = Collections.synchronizedList(new ArrayList<>());
QueryBatcher getUris = dataMovementManager.newQueryBatcher(query)
.withBatchSize(5000)
.onUrisReady( batch -> uris.addAll(Arrays.asList(batch.getItems())) )
.onQueryFailure(exception -> exception.printStackTrace());
JobTicket getUrisTicket = dataMovementManager.startJob(getUris);
getUris.awaitCompletion();
dataMovementManager.stopJob(getUrisTicket);
// now we have the uris, let's step through them
QueryBatcher performDelete = moveMgr.newQueryBatcher(uris.iterator())
.onUrisReady(new DeleteListener())
.onQueryFailure(exception -> exception.printStackTrace());
JobTicket ticket = dataMovementManager.startJob(performDelete);
performDelete.awaitCompletion();
dataMovementManager.stopJob(ticket);
To queue uris to disk (if not enough memory is available) see
UrisToWriterListener
.Modifier and Type | Method and Description |
---|---|
boolean |
awaitCompletion()
Blocks until the job is complete.
|
boolean |
awaitCompletion(long timeout,
java.util.concurrent.TimeUnit unit)
Blocks until the job is complete.
|
int |
getDefaultDocBatchSize()
Returns defaultDocBatchSize, which is calculated
according to server status
|
int |
getDocToUriBatchRatio()
Returns docToUriBatchRatio set to the
QueryBatcher
|
JobTicket |
getJobTicket()
After the job has been started, returns the
JobTicket generated when the job was started.
|
long |
getMaxBatches()
Returns the maximum number of Batches for the
current job.
|
int |
getMaxDocToUriBatchRatio()
Returns maxDocToUriBatchRatio, which is
calculated according to server status
|
int |
getMaxUriBatchSize()
Returns maxUriBatchSize, which is calculated
according to server status
|
QueryFailureListener[] |
getQueryFailureListeners()
Get the array of QueryFailureListener instances
registered via onBatchFailure including the
HostAvailabilityListener registered by default.
|
QueryBatcherListener[] |
getQueryJobCompletionListeners()
Get the array of QueryBatcherListener instances
registered via onJobCompletion.
|
java.lang.Long |
getServerTimestamp()
If
withConsistentSnapshot was used
before starting the job, will return the MarkLogic server timestamp
associated with the snapshot. |
QueryBatchListener[] |
getUrisReadyListeners()
Get the array of QueryBatchListener instances
registered via onUrisReady.
|
boolean |
isStopped()
true if the job is terminated (last batch was
finished or
DataMovementManager.stopJob was called), false
otherwise |
QueryBatcher |
onJobCompletion(QueryBatcherListener listener)
Add a listener to run when the Query job is
completed i.e.
|
QueryBatcher |
onQueryFailure(QueryFailureListener listener)
Add a listener to run each time there is an
exception retrieving a batch of uris.
|
QueryBatcher |
onUrisReady(QueryBatchListener listener)
Add a listener to run each time a batch of uris
is ready.
|
void |
retry(QueryEvent queryEvent)
Retry in the same thread to query a batch that
failed.
|
void |
retryListener(QueryBatch batch,
QueryBatchListener queryBatchListener)
Retries processing the listener to the batch of
URIs, when the batch has been successfully retrieved from the
server but applying the listener on the batch failed.
|
void |
retryWithFailureListeners(QueryEvent queryEvent)
Retry in the same thread to query a batch that
failed.
|
void |
setMaxBatches()
Caps the query at the current batch.
|
void |
setMaxBatches(long maxBatches)
Sets the limit for the maximum number of batches
that can be collected.
|
void |
setQueryFailureListeners(QueryFailureListener... listeners)
Remove any existing QueryFailureListener
instances registered via onBatchFailure including the
HostAvailabilityListener registered by default and replace them
with the provided listeners.
|
void |
setQueryJobCompletionListeners(QueryBatcherListener... listeners)
Remove any existing QueryBatcherListener
instances registered via onJobCompletion and replace them with the
provided listeners.
|
void |
setUrisReadyListeners(QueryBatchListener... listeners)
Remove any existing QueryBatchListener instances
registered via onUrisReady and replace them with the provided
listeners.
|
QueryBatcher |
withBatchSize(int docBatchSize)
Sets the number of documents processed in a
batch.
|
QueryBatcher |
withBatchSize(int docBatchSize,
int docToUriBatchRatio)
Sets the number of documents processed in a
batch and the ratio of the document processing batch to the
document uri collection batch.
|
QueryBatcher |
withConsistentSnapshot()
Specifies that matching uris should be retrieved
as they were when this QueryBatcher job started.
|
QueryBatcher |
withForestConfig(ForestConfiguration forestConfig)
If the server forest configuration changes
mid-job, it can be re-fetched with
DataMovementManager.readForestConfig() then set
via withForestConfig. |
QueryBatcher |
withJobId(java.lang.String jobId)
Sets the unique id of the job to help with
managing multiple concurrent jobs and start the job with the
specified job id.
|
QueryBatcher |
withJobName(java.lang.String jobName)
Sets the job name.
|
QueryBatcher |
withThreadCount(int threadCount)
Sets the number of threads added to the internal
thread pool for this instance to use for retrieving or processing
batches of uris.
|
getBatchSize,
getForestConfig,
getJobEndTime,
getJobId,
getJobName,
getJobStartTime,
getPrimaryClient,
getThreadCount,
isStarted
QueryBatcher onUrisReady(QueryBatchListener listener)
listener
- the action which has to be done when
uris are readyQueryBatcher onQueryFailure(QueryFailureListener listener)
Add a listener to run each time there is an exception retrieving a batch of uris.
These listeners will not run when an exception is thrown by a listener registered with onUrisReady. To learn more, please see Handling Exceptions in Listeners
listener
- the code to run when a failure
occursQueryBatcher onJobCompletion(QueryBatcherListener listener)
Add a listener to run when the Query job is completed i.e. when all the document URIs are retrieved and the associated listeners are completed
listener
- the code to run when the Query job is
completedvoid retry(QueryEvent queryEvent)
queryEvent
- the information about the batch that
failedQueryBatchListener[] getUrisReadyListeners()
QueryBatcherListener[] getQueryJobCompletionListeners()
QueryFailureListener[] getQueryFailureListeners()
void setUrisReadyListeners(QueryBatchListener... listeners)
listeners
- the QueryBatchListener instances this
batcher should usevoid setQueryFailureListeners(QueryFailureListener... listeners)
listeners
- the QueryFailureListener instances
this batcher should usevoid setQueryJobCompletionListeners(QueryBatcherListener... listeners)
listeners
- the QueryBatcherListener instances
this batcher should useQueryBatcher withConsistentSnapshot()
query
, not with an
Iterator
. This is required when performing a
delete of documents matching the query or any modification
(including ApplyTransformListener) of matching documents which
would cause them to no longer match the query (otherwise pagination
through the result set would fail because pages shift as documents
are deleted or modfied to no longer match the query).QueryBatcher withForestConfig(ForestConfiguration forestConfig)
DataMovementManager.readForestConfig()
then set
via withForestConfig.
withForestConfig
in interface Batcher
forestConfig
- the updated
ForestConfigurationQueryBatcher withJobName(java.lang.String jobName)
withJobName
in interface Batcher
jobName
- the name you would like to assign to
this jobQueryBatcher withJobId(java.lang.String jobId)
QueryBatcher withBatchSize(int docBatchSize)
withBatchSize
in interface Batcher
docBatchSize
- the number of documents processed
in a batchQueryBatcher withBatchSize(int docBatchSize, int docToUriBatchRatio)
docBatchSize
- the number of documents processed
in a batchdocToUriBatchRatio
- the ratio of the document
processing batch to the document uri collection batch. The
docToUriBatchRatio should ordinarily be larger than 1 because URIs
are small relative to full documents and because collecting URIs
from indexes is ordinarily faster than processing documents.int getDocToUriBatchRatio()
int getDefaultDocBatchSize()
int getMaxUriBatchSize()
int getMaxDocToUriBatchRatio()
QueryBatcher withThreadCount(int threadCount)
startJob
) is used to queue all batches--so
startJob will not return until all iteration is complete and all
batches are queued. For Iterators this thread count is the number
of threads used for processing the queued batches (running
processEvent on the listeners regiested with onUrisReady). As of
the 6.2.0 release, this can now be adjusted after the batcher has
been started. The underlying Java ThreadPoolExecutor
will have both its core and max pool sizes set to the given thread
count. Use caution when reducing this to a value of 1 while the
batcher is running; in some cases, the underlying
ThreadPoolExecutor
may halt execution of any tasks.
Execution can be resumed by increasing the thread count to a value
of 2 or higher.
withThreadCount
in interface Batcher
threadCount
- the number of threads to use in this
Batcherboolean awaitCompletion()
boolean awaitCompletion(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
timeout
- the maximum time to waitunit
- the time unit of the timeout argumentjava.lang.InterruptedException
- if interrupted
while waitingboolean isStopped()
DataMovementManager.stopJob
was called), false
otherwise
isStopped
in interface Batcher
DataMovementManager.stopJob
was called), false
otherwiseJobTicket getJobTicket()
getJobTicket
in interface Batcher
java.lang.IllegalStateException
- if this job has
not yet been startedvoid retryListener(QueryBatch batch, QueryBatchListener queryBatchListener)
batch
- the QueryBatch for which we need to
process the listenerqueryBatchListener
- the QueryBatchListener which
needs to be appliedvoid retryWithFailureListeners(QueryEvent queryEvent)
queryEvent
- the information about the batch that
failedvoid setMaxBatches(long maxBatches)
maxBatches
- is the value of the limit.void setMaxBatches()
long getMaxBatches()
java.lang.Long getServerTimestamp()
withConsistentSnapshot
was used
before starting the job, will return the MarkLogic server timestamp
associated with the snapshot. Returns null otherwise.Copyright © 2024 MarkLogic Corporation. All Rights Reserved.