カスタムフックモジュールの作成

カスタムのフックモジュール

カスタムフックを使うと、 データハブの外部にあるタスクを実行できます。カスタムフックモジュールは、ステップのコアプロセスの直前(「プレステップフック」)あるいは直後(「ポストステップフック」)に実行できます。

ステップは通常、以下のように処理されます。

  1. 選択されたデータがソースデータベースから読み込まれる。
  2. プレステップフックモジュールが実行される(存在する場合)。
  3. メインのステップモジュールが実行される(デフォルトのデータハブステップの機能あるいはカスタムステップのモジュール)。
  4. ポストステップフックモジュールが実行される(存在する場合)。
  5. 処理されたデータがターゲットデータベースに書き込まれる。

カスタムフックは、フロー定義内のステップにJSONノードとして追加されます。

カスタムフックモジュールの処理がステップのコアモジュールや機能と競合する場合、コアモジュール/機能がフックモジュールの処理よりも優先されます。

各カスタムフックモジュールは、それ自身の環境内で実行されます(データハブの処理やその他のモジュールとは分離されています)。

カスタムフックは、あらゆるステップ(読み込み、マッピング、マッチング、マージング、マスタリング、カスタム)に追加できます。

カスタムモジュールはすべて今回のプロジェクトルート/src/main/ml-modules/root/custom-modulesに格納します。GradleタスクmlDeployは、このディレクトリのコンテンツをMODULESデータベースにデプロイします。

必要なインプットおよびアウトプット

カスタムフックモジュールでは、特定のインプットやアウトプットは不要です。しかし、コード内で以下の変数を宣言・使用することで、いくつかの情報にアクセス可能です。

   // A custom hook module receives values for the following parameters via データハブ.You can declare only the ones you need and ignore the rest.
  var uris;         // An array of one or more URIs being processed.
  var content;      // An array of objects that represent each document being processed.
  var options;      // The Options object passed to the step by データハブ.
  var flowName;     // The name of the flow being processed.
  var stepNumber;   // The index of the step within the flow being processed.The stepNumber of the first step is 1.
  var step;         // The step definition object.
  var database;     // The target database.
ヒント: カスタムタスクにデータベースからの読み取り/データベースへの書き込みが含まれている場合、別個にカスタムステップをフローに埋め込むのではなく、カスタムステップモジュールを作成します。

以下のカスタムフックモジュールの例では、URIが同じレコードを読み込む前にこれまでのレコードをアーカイブ化します。これにより、新しいレコードの履歴は刷新されたものとなります(GitHubで確認する)。

   /**
   * This is a simple example of a custom hook that determines if the incoming order is a duplicate of an existing order
   * in the STAGING database.If so, the existing order is archived.
   * An update transaction in a custom hook has less impact than an update in the main module.
   */
  declareUpdate();

  // A custom hook module receives values for the following parameters via データハブ.You can declare only the ones you need and ignore the rest.
  var uris;         // An array of one or more URIs being processed.
  var content;      // An array of objects that represent each document being processed.
  var options;      // The Options object passed to the step by データハブ.
  var flowName;     // The name of the flow being processed.
  var stepNumber;   // The index of the step within the flow being processed.The stepNumber of the first step is 1.
  var step;         // The step definition object.
  var database;     // The target database.

  // Custom hooks can define zero or more properties in the step definition that declares them.
  var archiveCollection;

  for (const contentObject of content) {
    const order = contentObject.value;

    /**
     * If a hook is configured with runBefore = true, then the content value will be the "raw" data, not yet wrapped in
     * an envelope.If it's configured with runBefore = false, as in this example, then the content value
     * will be an envelope.
     */
    const instance = order.envelope.instance;

    /**
     * Note that for better performance, a single query should be done based on all of the objects in the content
     * array.This works fine for the small set of data being ingested in this example.
     */
    const existingDuplicateOrders = cts.search(cts.andQuery([
      cts.collectionQuery("IngestOrders"),
      cts.jsonPropertyValueQuery("id", instance.id),
      cts.jsonPropertyValueQuery("customer", instance.customer),
      cts.jsonPropertyValueQuery("order_date", instance.order_date),
      cts.jsonPropertyValueQuery("product_id", instance.product_id)
    ]));

    for (const duplicateOrder of existingDuplicateOrders) {
      const duplicateUri = xdmp.nodeUri(duplicateOrder);
      // Generate a random URI so that previously archived documents are never overwritten
      const archiveUri = "/archive/" + sem.uuidString() + duplicateUri;
      xdmp.documentInsert(archiveUri, duplicateOrder, xdmp.documentGetPermissions(duplicateUri), archiveCollection);
      xdmp.documentDelete(duplicateUri);
    }
  }

次のステップ

カスタムフックモジュールを作成したならば、フロー内のステップにカスタムフックを追加し、この新しいカスタムフックモジュールへのパスを指定します。