カスタムフックモジュールの作成
カスタムのフックモジュール
カスタムフックを使うと、 データハブの外部にあるタスクを実行できます。カスタムフックモジュールは、ステップのコアプロセスの直前(「プレステップフック」)あるいは直後(「ポストステップフック」)に実行できます。
ステップは通常、以下のように処理されます。
- 選択されたデータがソースデータベースから読み込まれる。
- プレステップフックモジュールが実行される(存在する場合)。
- メインのステップモジュールが実行される(デフォルトのデータハブステップの機能あるいはカスタムステップのモジュール)。
- ポストステップフックモジュールが実行される(存在する場合)。
- 処理されたデータがターゲットデータベースに書き込まれる。
カスタムフックは、フロー定義内のステップにJSONノードとして追加されます。
カスタムフックモジュールの処理がステップのコアモジュールや機能と競合する場合、コアモジュール/機能がフックモジュールの処理よりも優先されます。
各カスタムフックモジュールは、それ自身の環境内で実行されます(データハブの処理やその他のモジュールとは分離されています)。
カスタムフックは、あらゆるステップ(読み込み、マッピング、マッチング、マージング、マスタリング、カスタム)に追加できます。
カスタムモジュールはすべて今回のプロジェクトルート/src/main/ml-modules/root/custom-modulesに格納します。GradleタスクmlDeployは、このディレクトリのコンテンツをMODULESデータベースにデプロイします。
必要なインプットおよびアウトプット
カスタムフックモジュールでは、特定のインプットやアウトプットは不要です。しかし、コード内で以下の変数を宣言・使用することで、いくつかの情報にアクセス可能です。
// A custom hook module receives values for the following parameters via データハブ.You can declare only the ones you need and ignore the rest.
var uris; // An array of one or more URIs being processed.
var content; // An array of objects that represent each document being processed.
var options; // The Options object passed to the step by データハブ.
var flowName; // The name of the flow being processed.
var stepNumber; // The index of the step within the flow being processed.The stepNumber of the first step is 1.
var step; // The step definition object.
var database; // The target database.
例
以下のカスタムフックモジュールの例では、URIが同じレコードを読み込む前にこれまでのレコードをアーカイブ化します。これにより、新しいレコードの履歴は刷新されたものとなります(GitHubで確認する)。
/**
* This is a simple example of a custom hook that determines if the incoming order is a duplicate of an existing order
* in the STAGING database.If so, the existing order is archived.
* An update transaction in a custom hook has less impact than an update in the main module.
*/
declareUpdate();
// A custom hook module receives values for the following parameters via データハブ.You can declare only the ones you need and ignore the rest.
var uris; // An array of one or more URIs being processed.
var content; // An array of objects that represent each document being processed.
var options; // The Options object passed to the step by データハブ.
var flowName; // The name of the flow being processed.
var stepNumber; // The index of the step within the flow being processed.The stepNumber of the first step is 1.
var step; // The step definition object.
var database; // The target database.
// Custom hooks can define zero or more properties in the step definition that declares them.
var archiveCollection;
for (const contentObject of content) {
const order = contentObject.value;
/**
* If a hook is configured with runBefore = true, then the content value will be the "raw" data, not yet wrapped in
* an envelope.If it's configured with runBefore = false, as in this example, then the content value
* will be an envelope.
*/
const instance = order.envelope.instance;
/**
* Note that for better performance, a single query should be done based on all of the objects in the content
* array.This works fine for the small set of data being ingested in this example.
*/
const existingDuplicateOrders = cts.search(cts.andQuery([
cts.collectionQuery("IngestOrders"),
cts.jsonPropertyValueQuery("id", instance.id),
cts.jsonPropertyValueQuery("customer", instance.customer),
cts.jsonPropertyValueQuery("order_date", instance.order_date),
cts.jsonPropertyValueQuery("product_id", instance.product_id)
]));
for (const duplicateOrder of existingDuplicateOrders) {
const duplicateUri = xdmp.nodeUri(duplicateOrder);
// Generate a random URI so that previously archived documents are never overwritten
const archiveUri = "/archive/" + sem.uuidString() + duplicateUri;
xdmp.documentInsert(archiveUri, duplicateOrder, xdmp.documentGetPermissions(duplicateUri), archiveCollection);
xdmp.documentDelete(duplicateUri);
}
}
次のステップ
カスタムフックモジュールを作成したならば、フロー内のステップにカスタムフックを追加し、この新しいカスタムフックモジュールへのパスを指定します。