public interface WriteBatcher extends Batcher
To facilitate long-running write jobs, batches documents added
by many external threads and coordinates internal threads to send
the batches round-robin to all appropriate hosts in the cluster.
Appropriate hosts are those containing a forest associated with the
database for the DatabaseClient provided to DataMovementManager.
Many external threads (threads not managed by WriteBatcher) can
concurrently add documents by calling WriteBatcher
add
or
addAs
. Each time enough documents are added to
make a batch, the batch is added to an internal queue where the
first available internal thread will pick it up and write it to the
server. Since batches are not written until they are full, you
should always call
flushAsync()
or
flushAndWait()
when no more documents will be
written to ensure that any partial batch is written.
WriteBatcher whb = dataMovementManager.newWriteBatcher()
.withBatchSize(100)
.withThreadCount(20)
.onBatchSuccess(batch -> {
logger.debug("batch # {}, so far: {}", batch.getJobBatchNumber(), batch.getJobWritesSoFar());
})
.onBatchFailure((batch,throwable) -> throwable.printStackTrace() );
JobTicket ticket = dataMovementManager.startJob(whb);
whb.add ("doc1.txt", new StringHandle("doc1 contents"));
whb.addAs("doc2.txt", "doc2 contents");
whb.flushAndWait(); // send the two docs even though they're not a full batch
dataMovementManager.stopJob(ticket);
Note: All Closeable content or metadata handles passed to
add
methods will be closed as soon as possible
(after the batch is written). This is to avoid IO resource leakage.
This differs from the normal usage of the Java Client API because
WriteBatcher is asynchronous so there's no easy way to know which
handles have finished writing and can therefore be closed. So to
save confusion we close all handles for you. If you have a resource
that must be closed after a batch is written, but is not closed by
your handle, override the close method of any Closeable handle and
close your resource there.
Modifier and Type | Method and Description |
---|---|
WriteBatcher |
add(DocumentWriteOperation writeOperation)
Add a document, by passing in a
DocumentWriteOperation ,
to be batched and then written to the server when a batch is full
or
flushAsync() or
flushAndWait() is called. |
WriteBatcher |
add(java.lang.String uri,
AbstractWriteHandle contentHandle)
Add a document to be batched then written to the
server when a batch is full or
flushAsync() or
flushAndWait() is called. |
WriteBatcher |
add(java.lang.String uri,
DocumentMetadataWriteHandle metadataHandle,
AbstractWriteHandle contentHandle)
Add a document to be batched then written to the
server when a batch is full or
flushAsync() or
flushAndWait() is called. |
WriteBatcher |
add(WriteEvent... docs)
Add docs in the form of WriteEvents.
|
void |
addAll(java.util.stream.Stream<?
extends DocumentWriteOperation> operations)
Writes a document stream to the database.
|
WriteBatcher |
addAs(java.lang.String uri,
DocumentMetadataWriteHandle metadataHandle,
java.lang.Object content)
Add a document to be batched then written to the
server when a batch is full or
flushAsync() or
flushAndWait() is called. |
WriteBatcher |
addAs(java.lang.String uri,
java.lang.Object content)
Add a document to be batched then written to the
server when a batch is full or
flushAsync() or
flushAndWait() is called. |
boolean |
awaitCompletion()
Blocks until the job has finished or cancelled
all queued tasks.
|
boolean |
awaitCompletion(long timeout,
java.util.concurrent.TimeUnit unit)
Blocks until the job has finished or cancelled
all queued tasks.
|
void |
flushAndWait()
Create a batch from any unbatched documents and
write that batch, then wait for all batches to complete (the same
as awaitCompletion().
|
void |
flushAsync()
Create a batch from any unbatched documents and
write that batch asynchronously.
|
WriteFailureListener[] |
getBatchFailureListeners()
Get the array of WriteFailureListener instances
registered via onBatchFailure including the
HostAvailabilityListener registered by default.
|
WriteBatchListener[] |
getBatchSuccessListeners()
Get the array of WriteBatchListener instances
registered via onBatchSuccess.
|
DocumentMetadataHandle |
getDocumentMetadata() |
JobTicket |
getJobTicket()
After the job has been started, returns the
JobTicket generated when the job was started.
|
java.lang.String |
getTemporalCollection()
The temporal collection configured for temporal
document inserts
|
ServerTransform |
getTransform() |
WriteBatcher |
onBatchFailure(WriteFailureListener listener)
Add a listener to run each time there is an
exception writing a batch.
|
WriteBatcher |
onBatchSuccess(WriteBatchListener listener)
Add a listener to run each time a batch is
successfully written.
|
void |
retry(WriteBatch queryEvent)
Retry in the same thread to send a batch that
failed.
|
void |
retryWithFailureListeners(WriteBatch writeBatch)
Retry in the same thread to send a batch that
failed.
|
void |
setBatchFailureListeners(WriteFailureListener... listeners)
Remove any existing WriteFailureListener
instances registered via onBatchFailure including the
HostAvailabilityListener registered by default and replace them
with the provided listeners.
|
void |
setBatchSuccessListeners(WriteBatchListener... listeners)
Remove any existing WriteBatchListener instances
registered via onBatchSuccess and replace them with the provided
listeners.
|
WriteBatcher |
withBatchSize(int batchSize)
Sets the number of documents to send per
batch.
|
WriteBatcher |
withDefaultMetadata(DocumentMetadataHandle handle)
Sets the DocumentMetadataHandle for write
operations.
|
WriteBatcher |
withForestConfig(ForestConfiguration forestConfig)
If the server forest configuration changes
mid-job, it can be re-fetched with
DataMovementManager.readForestConfig() then set
via withForestConfig. |
WriteBatcher |
withJobId(java.lang.String jobId)
Sets the unique id of the job to help with
managing multiple concurrent jobs and start the job with the
specified job id.
|
WriteBatcher |
withJobName(java.lang.String jobName)
Sets the job name.
|
WriteBatcher |
withTemporalCollection(java.lang.String collection)
The temporal collection to use for a temporal
document insert
|
WriteBatcher |
withThreadCount(int threadCount)
Sets the number of threads added to the internal
thread pool for this instance to use for writing or reporting on
batches of uris.
|
WriteBatcher |
withTransform(ServerTransform transform)
The ServerTransform to modify each document from
each batch before it is written to the database.
|
getBatchSize,
getForestConfig,
getJobEndTime,
getJobId,
getJobName,
getJobStartTime,
getPrimaryClient,
getThreadCount,
isStarted,
isStopped
WriteBatcher withDefaultMetadata(DocumentMetadataHandle handle)
handle
- the passed in DocumentMetadataHandlevoid addAll(java.util.stream.Stream<? extends DocumentWriteOperation> operations)
operations
- is the DocumentWriteOperation stream
passed in.DocumentMetadataHandle getDocumentMetadata()
WriteBatcher add(java.lang.String uri, AbstractWriteHandle contentHandle)
Add a document to be batched then written to the server when a
batch is full or
flushAsync()
or
flushAndWait()
is called.
uri
- the document uricontentHandle
- the document contentsWriteBatcher addAs(java.lang.String uri, java.lang.Object content)
Add a document to be batched then written to the server when a
batch is full or
flushAsync()
or
flushAndWait()
is called.
uri
- the document uricontent
- the document contentsWriteBatcher add(java.lang.String uri, DocumentMetadataWriteHandle metadataHandle, AbstractWriteHandle contentHandle)
Add a document to be batched then written to the server when a
batch is full or
flushAsync()
or
flushAndWait()
is called.
uri
- the document urimetadataHandle
- the metadata (collection,
permissions, metdata values, properties, quality)contentHandle
- the document contentsWriteBatcher addAs(java.lang.String uri, DocumentMetadataWriteHandle metadataHandle, java.lang.Object content)
Add a document to be batched then written to the server when a
batch is full or
flushAsync()
or
flushAndWait()
is called.
uri
- the document urimetadataHandle
- the metadata (collection,
permissions, metdata values, properties, quality)content
- the document contentsWriteBatcher add(WriteEvent... docs)
docs
- the batch of WriteEvents where each
WriteEvent represents one documentWriteBatcher add(DocumentWriteOperation writeOperation)
Add a document, by passing in a DocumentWriteOperation
,
to be batched and then written to the server when a batch is full
or
flushAsync()
or
flushAndWait()
is called.
writeOperation
- the DocumentWriteOperation object
containing the document's details to be written to the serverWriteBatcher onBatchSuccess(WriteBatchListener listener)
listener
- the action which has to be done when
the batch gets written successfullyWriteBatcher onBatchFailure(WriteFailureListener listener)
Add a listener to run each time there is an exception writing a batch.
These listeners will not run when an exception is thrown by a listener registered with onBatchSuccess. To learn more, please see Handling Exceptions in Listeners
listener
- the code to run when a failure
occursvoid retry(WriteBatch queryEvent)
queryEvent
- the information about the batch that
failedWriteBatchListener[] getBatchSuccessListeners()
WriteFailureListener[] getBatchFailureListeners()
void setBatchSuccessListeners(WriteBatchListener... listeners)
listeners
- the WriteBatchListener instances this
batcher should usevoid setBatchFailureListeners(WriteFailureListener... listeners)
listeners
- the WriteFailureListener instances
this batcher should useWriteBatcher withTemporalCollection(java.lang.String collection)
collection
- The temporal collection to use for a
temporal document insertjava.lang.String getTemporalCollection()
WriteBatcher withTransform(ServerTransform transform)
transform
- The ServerTransform to run on each
document from each batch.ServerTransform getTransform()
WriteBatcher withForestConfig(ForestConfiguration forestConfig)
DataMovementManager.readForestConfig()
then set
via withForestConfig.
withForestConfig
in interface Batcher
forestConfig
- the updated
ForestConfigurationWriteBatcher withJobName(java.lang.String jobName)
withJobName
in interface Batcher
jobName
- the name you would like to assign to
this jobWriteBatcher withJobId(java.lang.String jobId)
WriteBatcher withBatchSize(int batchSize)
withBatchSize
in interface Batcher
batchSize
- the batch size -- must be 1 or
greaterWriteBatcher withThreadCount(int threadCount)
withThreadCount
in interface Batcher
threadCount
- the number of threads to use in this
Batchervoid flushAsync()
void flushAndWait()
boolean awaitCompletion()
boolean awaitCompletion(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
timeout
- the maximum time to waitunit
- the time unit of the timeout argumentjava.lang.InterruptedException
- if interrupted
while waitingJobTicket getJobTicket()
getJobTicket
in interface Batcher
java.lang.IllegalStateException
- if this job has
not yet been startedvoid retryWithFailureListeners(WriteBatch writeBatch)
Retry in the same thread to send a batch that failed. If it fails again, all the failure listeners associated with the batcher using onBatchFailure method would be processed.
Note : Use this method with caution as there is a possibility of infinite loops. If a batch fails and one of the failure listeners calls this method to retry with failure listeners and if the batch again fails, this would go on as an infinite loop until the batch succeeds.
writeBatch
- the information about the batch that
failedCopyright © 2024 MarkLogic Corporation. All Rights Reserved.