Amazon Web Services ブログ

Deequ で大規模なデータ品質をテスト

一般的に、コード用のユニットテストを書くと思いますが、お使いのデータもテストしているのでしょうか? 不正確または不正なデータは、本番システムに大きな影響を与える可能性があります。データ品質問題の例は次のとおりです。

  • 値がない場合は、本番システムで null 以外の値を必要とするエラー (NullPointerException) が発生する可能性があります。
  • データ分布が変化すると、機械学習モデルで予期しない出力につながることがあります。
  • データの集計を誤ると、ビジネスでの判断を下す際に誤った意思決定につながる可能性があります。

このブログ記事では、Amazon で開発し、使用されているオープンソースツールである Deequ を紹介したいと思います。Deequ では、データセットのデータ品質メトリクスを計算したり、データ品質の制約を定義および確認したり、データ分布の変化について通知を受け取ったりすることができます。確認や検証のアルゴリズムを直接実装する代わりに、データの外観を記述することに集中できます。Deequ は確認を提案することでお客様をサポートします。Deequ は Apache Spark に実装されています。通常は分散型ファイルシステムまたはデータウェアハウスに存在する大規模なデータセット (数十億行の規模と考えられる) に合わせて拡張するように設計されています。

Amazon での Deequ

Deequ は、多くの大規模な本番データセットの品質を検証するために Amazon で内部的に使用されています。データセットの製作者は、データ品質の制約を追加および編集できます。このシステムは、定期的に (データセットの新しいバージョンごとに) データ品質メトリクスを計算し、データセットの製作者によって定義された制約を検証し、成功した場合にはデータセットをコンシューマーに公開します。エラーが発生した場合、データセットの公開は中止され、製作者はアクションをとるように通知を受けます。データの品質問題はコンシューマーのデータパイプラインに伝播しないため、問題が発生したときの影響範囲が減少します。

Deequ の概要

Deequ を使用するために、その主要コンポーネントを見てみましょう (図 1 も参照)。

  • メトリクス計算 — Deequ はデータ品質メトリクス、つまり完全性、最大値、相関関係などの統計情報を計算します。Deequ は、Spark を使用して Amazon S3 などのソースから読み取り、最適化された一連の集計クエリを通じてメトリクスを計算します。データに基づいて計算された未加工メトリクスに直接アクセスできます。
  • 制約の検証 — ユーザーは、一連のデータ品質において検証する制約を定義することに集中できます。Deequ は、データに対して計算される、必要な一連のメトリクスを導き出します。Deequ は、制約の検証結果を含むデータ品質レポートを生成します。
  • 制約の提案 — 独自のカスタムデータ品質の制約を定義するか、有用な制約を推測するためにデータをプロファイルする自動制約提案方法を使用することを選択できます。

図 1: Deequ コンポーネントの概要

設定: Spark クラスターを起動する

このセクションでは、お使いのデータに Deequ を使用する手順について説明します。まず、Amazon EMR クラスターに Spark と Deequ を設定します。次に、AWS から提供されているサンプルデータセットを読み込み、分析を実行してからデータテストを実行します。

Deequ は Apache Spark の上に構築されており、大規模なデータセットで高速かつ分散された計算が行えるようにサポートします。Deequ は Spark バージョン 2.2.0 以降によって異なります。最初の手順として、Amazon EMR で Spark を使用してクラスターを作成します。 Amazon EMR が Spark の設定を管理します。また、EMR ファイルシステム (EMRFS) を使用して Amazon S3 内のデータに直接アクセスすることもできます。テスト用に、スタンドアロンモードで Spark を単一のマシンにインストールすることもできます。

SSH を使用して Amazon EMR マスターノードに接続しますMaven リポジトリから最新の Deequ JAR を読み込みます。バージョン 1.0.1 の JAR をロードするには、次の手順に従ってください。

wget http://repo1.maven.org/maven2/com/amazon/deequ/deequ/1.0.1/deequ-1.0.1.jar

Spark シェルを起動し、Deequ JAR ファイルを参照するために spark.jars 引数を使用します。

spark-shell --conf spark.jars=deequ-1.0.1.jar

Spark 設定方法の詳細については、「Spark クイックスタートガイド」および「Spark 設定」オプションの概要を参照してください。

データを読み込む

実行例では、Amazon S3 で Amazon が提供している顧客レビューのデータセットを使用します。Spark のカテゴリ「電子機器製品」に対するレビューを含むデータセットを読み込んでみましょう。Spark シェルにコードを入力したかを確認してください。

val dataset = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/")

Spark シェルで dataset.printSchema() を実行すると、以下の選択された属性を確認することができます。

root
|-- marketplace: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- review_id: string (nullable = true)
|-- product_title: string (nullable = true)
|-- star_rating: integer (nullable = true)
|-- helpful_votes: integer (nullable = true)
|-- total_votes: integer (nullable = true)
|-- vine: string (nullable = true)
|-- year: integer (nullable = true)

データ分析

データの確認を定義する前に、データセットの統計を計算します。これらをメトリクスと呼びます。Deequ は以下のメトリクスをサポートしています (これらの内容はこの Deequ パッケージで定義されています)。

メトリクス

説明

使用例

ApproxCountDistinct HyperLogLogPlusPlus スケッチを使用して計算された、別個値の概数。 ApproxCountDistinct("review_id")
ApproxQuantile 分布の近似分位数。 ApproxQuantile("star_rating", quantile = 0.5)
ApproxQuantiles 分布の近似分位数。 ApproxQuantiles("star_rating", quantiles = Seq(0.1, 0.5, 0.9))
Completeness 列にある NULL 以外の値の割合。 Completeness("review_id")
Compliance 指定された列の制約に準拠する行の割合。 Compliance("top star_rating", "star_rating >= 4.0")
Correlation ピアソンの相関係数は、2 つの列の間を線形相関で測定します。結果は [-1, 1] の範囲内にあり、1 は正の線形相関、-1 は負の線形相関、0 は相関がないことを意味します。 Correlation("total_votes", "star_rating")
CountDistinct 個別値の数です。 CountDistinct("review_id")
DataType ブール値、小数、整数、文字列など、データ型の分布です。結果として得られるヒストグラムで、相対位置または絶対位置によるフィルタリングを行えるようになります。 DataType("year")
Distinctness すべての列の値に対する列の個別値の割合。個別値は少なくとも 1 回は出現します。例: [a, a, b] は 2 つの個別値 a と b を含んでいるため、その区別は 2/3 になります。 Distinctness("review_id")
Entropy エントロピーは、考えられるすべてのイベント (列の値) を考慮したときの、イベント (列の値) に含まれる情報レベルの尺度です。これは NATS (情報量の自然単位) で測定されます。観測値を (value_count/total_count) * log(value_count/total_count) の負の合計として、エントロピーを推定します。例: [a, b, b, c, c] は、カウント [1, 2, 2] の 3 つの異なる値を持ちます。エントロピーは (-1/5*log(1/5)-2/5*log(2/5)-2/5*log(2/5)) = 1.055 になります。 Entropy("star_rating")
Maximum 最大値です。 Maximum("star_rating")
Mean 平均値。NULL 値は除外されます。 Mean("star_rating")
Minimum 最小値です。 Minimum("star_rating")
MutualInformation 相互情報量は、ある列 (1 つの確率変数) に関する情報について、別の列 (別の確率変数) から推測できる量を表します。2 つの列が独立している場合、相互情報量はゼロです。一方の列が他方の列の関数である場合、相互情報量はその列のエントロピーになります。相互情報量は対称的で負でない数です。 MutualInformation(Seq("total_votes", "star_rating"))
PatternMatch 特定の正規表現に準拠する行の割合。 PatternMatch("marketplace", pattern = raw"\w{2}".r)
Size DataFrame 内の行数。 Size()
Sum 列のすべての値の合計。 Sum("total_votes")
UniqueValueRatio 列のすべての個別値に対する固有値の割合。固有値は 1 回だけ出現します。個別値は少なくとも 1 回は出現します。 例: [a, a, b] は 1 つの固有値 b と 2 つの個別値 a と b を含んでいるため、固有値の割合は 1/2 になります。 UniqueValueRatio("star_rating")
Uniqueness 列のすべての値に対する固有値の割合。 固有値は 1 回だけ出現します。 例: [a, a, b] は 1 つの固有値 b を含んでいるため、一意性は 1/3 です。 Uniqueness("star_rating")

次の例では、AnalysisRunner を使用して興味のあるメトリクスを定義する方法を示します。Spark シェルで次のコードを実行するには、単にシェルに貼り付けるか、またはマスターノードのローカルファイルに保存して、次のコマンドで Spark シェルに読み込んでください。

:load PATH_TO_FILE

import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}

val analysisResult: AnalyzerContext = { AnalysisRunner
  // data to run the analysis on
  .onData(dataset)
  // define analyzers that compute metrics
  .addAnalyzer(Size())
  .addAnalyzer(Completeness("review_id"))
  .addAnalyzer(ApproxCountDistinct("review_id"))
  .addAnalyzer(Mean("star_rating"))
  .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0"))
  .addAnalyzer(Correlation("total_votes", "star_rating"))
  .addAnalyzer(Correlation("total_votes", "helpful_votes"))
  // compute metrics
  .run()
}

// retrieve successfully computed metrics as a Spark data frame
val metrics = successMetricsAsDataFrame(spark, analysisResult)

結果のデータフレームには、計算されたメトリクス (Spark シェルの metrics.show() を呼び出します) が含まれています。

名前 インスタンス
ApproxCountDistinct review_id 3010972
Completeness review_id 1
Compliance top star_rating 0.74941
Correlation helpful_votes,total_votes 0.99365
Correlation total_votes,star_rating -0.03451
Mean star_rating 4.03614
Size * 3120938

得られた情報:

  • review_id には欠損値はなく、約 3,010,972 の固有値があります。
  • レビューの 74.9% が 4 以上の star_rating です。
  • total_votesstar_rating には相関関係がありません。
  • helpful_votestotal_votes には強い相関関係があります。
  • star_rating の平均値は 4.0 です。
  • データセットには 3,120,938 件のレビューが含まれています。

データのテストを定義して実行する

データを分析して理解した後で、導出したプロパティがデータセットの新しいバージョンにも当てはまることを確認します。データ配布に関するアサーションをデータパイプラインの一部として定義することで、処理されたすべてのデータセットが高品質であり、データを使用するアプリケーションが依存できるようになります。

データのテストを書き込むために、VerificationSuite から始めて、データ属性の確認を追加します。この例では、データの次のプロパティについてテストを行います。

  • 少なくとも合計 300 万行あります。
  • review_id が NULL になることはありません。
  • review_id は一意です。
  • star_rating の最小値は 1.0、最大値は 5.0 です。
  • marketplace には、「US」、「UK」、「DE」、「JP」、「FR」のみが含まれます。
  • year は負の値を含みません。

これは以前のステートメントを反映したコードです。利用可能なすべての項目の確認については、この GitHub リポジトリを参照してください。前述のように、これを Spark シェルで直接実行できます。

import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}

val verificationResult: VerificationResult = { VerificationSuite()
  // data to run the verification on
  .onData(dataset)
  // define a data quality check
  .addCheck(
    Check(CheckLevel.Error, "Review Check") 
      .hasSize(_ >= 3000000) // at least 3 million rows
      .hasMin("star_rating", _ == 1.0) // min is 1.0
      .hasMax("star_rating", _ == 5.0) // max is 5.0
      .isComplete("review_id") // should never be NULL
      .isUnique("review_id") // should not contain duplicates
      .isComplete("marketplace") // should never be NULL
      // contains only the listed values
      .isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR"))
      .isNonNegative("year")) // should not contain negative values
  // compute metrics and verify check conditions
  .run()
}

// convert check results to a Spark data frame
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)

run を呼び出した後、Deequ はテストの説明を一連の Spark ジョブに変換します。これらのジョブはデータのメトリクスを計算するために実行されます。その後、これらのメトリクスに対してアサーション関数 (たとえば、star-rating の最小確認のための _ == 1.0) を呼び出して、データ制約が保持されているかどうかを確かめます。

Spark シェルで resultDataFrame.show(truncate=false) を呼び出して結果を調べます。結果テーブルには、各テストの検証結果が表示されます。次に例を示します。

制約 constraint_status constraint_message
SizeConstraint(Size(None)) 成功
MinimumConstraint(Minimum(star_rating,None)) 成功
MaximumConstraint(Maximum(star_rating,None)) 成功
CompletenessConstraint(Completeness(review_id,None)) 成功
UniquenessConstraint(Uniqueness(List(review_id))) 失敗 値: 0.9926566948782706 が制約要件を満たしていません!
CompletenessConstraint(Completeness(marketplace,None)) 成功
ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None)) 成功
ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None)) 成功

興味深いことに、review_id 列は一意ではないため、一意性の確認に失敗しました。

この確認のために Deequ が計算したすべてのメトリクスを確認することもできます。

VerificationResult.successMetricsAsDataFrame(spark, verificationResult).show(truncate=False)

結果:

名前 インスタンス
Completeness review_id 1
Completeness marketplace 1
Compliance marketplace contained in US,UK,DE,JP,FR 1
Compliance year is non-negative 1
Maximum star_rating 5
Minimum star_rating 1
Size * 3120938
Uniqueness review_id 0.99266

自動化された制約の提案

多数のデータセットを所有している場合、またはデータセットに多数の列が含まれている場合は、適切な制約を手動で定義するのが難しい場合があります。Deequ はデータ分布に基づいて有用な制約を自動的に提案します。Deequ は最初にデータプロファイリングメソッドを実行し、次に結果に一連のルールを適用します。データプロファイリングメソッドの実行方法については、この GitHub リポジトリを参照してください。

import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._ // for toDS method

// We ask deequ to compute constraint suggestions for us on the data
val suggestionResult = { ConstraintSuggestionRunner()
  // data to suggest constraints for
  .onData(dataset)
  // default set of rules for constraint suggestion
  .addConstraintRules(Rules.DEFAULT)
  // run data profiling and constraint suggestion
  .run()
}

// We can now investigate the constraints that Deequ suggested.
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap { 
  case (column, suggestions) => 
    suggestions.map { constraint =>
      (column, constraint.description, constraint.codeForConstraint)
    } 
}.toSeq.toDS()

結果には、記述と Scala コードを含む制約一覧が含まれているため、データの品質確認に直接適用できます。Spark シェルで suggestionDataFrame.show(truncate=false) を呼び出して、提案された制約を調べます。ここでサブセットを示します。

制約 Scala コード
customer_id 「customer_id」が null ではありません .isComplete("customer_id")
customer_id 「customer_id」が整数タイプです .hasDataType("customer_id", ConstrainableDataTypes.Integral)
customer_id 「customer_id」に負の値はありません .isNonNegative("customer_id")
helpful_votes 「helpful_votes」が null ではありません .isComplete("helpful_votes")
helpful_votes 「helpful_votes」に負の値はありません .isNonNegative("helpful_votes")
marketplace 「marketplace」の値の範囲は、「US」、「UK」、「DE」、「JP」、「FR」です .isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR"))
product_title 「product_title」が null ではありません .isComplete("product_title")
star_rating 「star_rating」が null ではありません .isComplete("star_rating")
star_rating 「star_rating」に負の値はありません .isNonNegative("star_rating")
vine 「vine」の値の範囲は「N」、「Y」です。 .isContainedIn("vine", Array("N", "Y"))

制約の提案は試行錯誤的なルールに基づいており、表示されているデータが正しいと想定しているため、必ず当てはまるとは限りません。提案を本稼働用に適用する前に確認を行うことをお勧めします。

GitHub に関するその他の例

より高度な機能に関する例は、Deequ の GitHub ページにあります。

  • Deequ は、固定されたしきい値でデータ品質の確認を行うだけではありません。データ品質メトリクスに異常検出を使用して、時間の経過とともに変化するメトリクスに対してテストを適用する方法を学びます。
  • Deequ はメトリクスの保存と読み込みをサポートしています。このユースケースで MetricsRepository を使用する方法について学びましょう。
  • データセットが時間とともに成長または分割されている場合は、Deequ の増分メトリクス計算機能を使用できます。パーティションごとに、Deequ は計算された各メトリクスの状態を保存します。パーティションの結合に関するメトリクスを計算するために、Deequ はこれらの状態を使用して、データを再度読み込むことなく、全体的なメトリクスを効率的に導き出すことができます。

その他のリソース

Deequ の内部動作については、VLDB 2018 の論文「大規模なデータ品質検証の自動化」をご覧ください。

結論

このブログ記事では、データ品質メトリクスの計算、データ品質メトリクスの検証、およびデータ品質確認の設定を自動化するためのデータプロファイリングに Deequ を使用する方法について説明しました。Deequ は、現在、お客様の独自のデータ品質管理パイプラインを構築するためにご利用いただけます。

 


著者について

Dustin Lange は、ベルリンの Amazon Search に勤める応用科学のマネージャーです。Dustin の率いるチームは、機械学習とデータ品質追跡を通じて検索に対するお客様の体験を向上させるためのアルゴリズムを開発しています。彼は 2013 年にデータベースでの類似性検索の博士号を取得し、同じ年に予測分野の応用科学者として Amazon でのキャリアを始めました。

 

 

Sebastian Schelter は、Amazon Search のシニア応用科学者で、データ管理と機械学習の共通点から問題に取り組んでいます。彼はベルリン工科大学で、大規模なデータ処理に関する博士号を取得しています。また、Apache Software Foundation の選出メンバーでもあり、現在は Apache Incubator のメンターを務めています。

 

 

Philipp Schmidt は、Amazon Search の ML エンジニアです。ベルリン工科大学を卒業した後、彼はポツダム大学とベルリンのスタートアップ企業で働いていました。Amazon では、大規模なデータセットのデータ品質追跡を可能にし、機械学習を通じてお客様のショッピング体験を向上させることに取り組んでいます。

 

 

Tammo Rukat は、ベルリンの Amazon Search で応用科学者として勤めています。オックスフォード大学で統計的機械学習の博士号を取得しています。Amazon では、企業の大規模なデータセットの豊富さと複雑さを利用して、お客様により知的な体験を提供できるように貢献しています。