Interface QueryBatcher

All Superinterfaces:
Batcher

public interface QueryBatcherextends Batcher

To facilitate long-running read, update, and delete use cases, coordinates threads to process batches of uris matching a query or coming from an Iterator. Each batch of uris matching a query will come from a single forest. The host for that forest is the target of the DatabaseClient provided to the listener's processEvent method. The query is performed directly on each forest associated with the database for the DatabaseClient provided to DataMovementManager. The end goal of each job is determined by the listeners registered with onUrisReady. The data set from which batches are made and on which processing is performed is determined by the query or Iterator used to construct this instance.

While the most custom use cases will be addressed by custom listeners, the common use cases are addressed by provided listeners, including ApplyTransformListener, DeleteListener, ExportListener, and ExportToWriterListener. The provided listeners are used by adding an instance via onUrisReady like so:

     QueryBatcher qhb = dataMovementManager.newQueryBatcher(query)
         .withConsistentSnapshot()
         .onUrisReady( new DeleteListener() )
         .onQueryFailure(exception -> exception.printStackTrace());
     JobTicket ticket = dataMovementManager.startJob(qhb);
     qhb.awaitCompletion();
     dataMovementManager.stopJob(ticket);

Custom listeners will generally use the [MarkLogic Java Client API][] to manipulate the documents for the uris in each batch.

QueryBatcher is designed to be highly scalable and performant. To accommodate the largest result sets, QueryBatcher paginates through matches rather than loading matches into memory. To prevent queueing too many tasks when running a query, QueryBatcher only adds another task when one completes the query and is about to send the matching uris to the onUrisReady listeners.

For pagination to succeed, you must not modify the result set during pagination. This means you must

  1. perform a read-only operation, or
  2. make sure modifications do not modify the result set by deleting matches or modifying them to no longer match, or
  3. set a merge timestamp and use withConsistentSnapshot(), or
  4. use Iterator instead of a query.

Sample usage using withConsistentSnapshot():


     QueryDefinition query = new StructuredQueryBuilder().collection("myCollection");
     QueryBatcher qhb = dataMovementManager.newQueryBatcher(query)
         .withBatchSize(1000)
         .withThreadCount(20)
         .withConsistentSnapshot()
         .onUrisReady(batch -> {
             for ( String uri : batch.getItems() ) {
                 if ( uri.endsWith(".txt") ) {
                     client.newDocumentManager().delete(uri);
                 }
             }
         })
         .onQueryFailure(exception -> exception.printStackTrace());
     JobTicket ticket = dataMovementManager.startJob(qhb);
     qhb.awaitCompletion();
     dataMovementManager.stopJob(ticket);

Example of queueing uris in memory instead of using withConsistentSnapshot():

     ArrayList<String> uris = Collections.synchronizedList(new ArrayList<>());
     QueryBatcher getUris = dataMovementManager.newQueryBatcher(query)
       .withBatchSize(5000)
       .onUrisReady( batch -> uris.addAll(Arrays.asList(batch.getItems())) )
       .onQueryFailure(exception -> exception.printStackTrace());
     JobTicket getUrisTicket = dataMovementManager.startJob(getUris);
     getUris.awaitCompletion();
     dataMovementManager.stopJob(getUrisTicket);

     // now we have the uris, let's step through them
     QueryBatcher performDelete = moveMgr.newQueryBatcher(uris.iterator())
       .onUrisReady(new DeleteListener())
       .onQueryFailure(exception -> exception.printStackTrace());
     JobTicket ticket = dataMovementManager.startJob(performDelete);
     performDelete.awaitCompletion();
     dataMovementManager.stopJob(ticket);

To queue uris to disk (if not enough memory is available) see UrisToWriterListener.