Redshift Spectrumのパフォーマンスチューニングが必要なら統計情報を確認する

一番下の参考情報にも載せている通り、Redshift Spectrumのチューニングには、パーティショニングやファイルフォーマット等色々なポイントがありますが、ここでは特に見落としやすい、かつ効果の高い統計情報について書いていきます。

利用するデータ

適切なパフォーマンス検証用のデータを探していたのですが、わかりやすいように公式のRedshiftのチューニングのチュートリアルのデータを利用することにします。

このチュートリアル自体はSpectrumではなくRedshiftそのもののチューニングなのですが、Redshift側のソートキーやキー分散のチューニングは、この記事では扱いません。

docs.aws.amazon.com

提供されているデータは、構造としては大きなファクトテーブル(LINEORDER)とその属性情報が格納されたディメンションテーブルで構成されるスタースキーマ構造です。

f:id:yomon8:20190627135803p:plain

S3上にあるデータファイルはこのくらいのサイズです。

$ aws s3 ls --human-readable s3://awssampledbuswest2/ssbgz/
2015-04-24 11:34:23    0 Bytes
2015-05-05 03:48:30  100.5 MiB customer0002_part_00.gz
2015-05-07 03:41:02   24.6 KiB dwdate.tbl.gz
2015-05-02 02:37:30    3.1 GiB lineorder0000_part_00.gz
2015-05-02 02:51:24    3.1 GiB lineorder0001_part_00.gz
2015-05-02 03:05:06    3.1 GiB lineorder0002_part_00.gz
2015-05-02 03:20:07    3.1 GiB lineorder0003_part_00.gz
2015-05-02 03:31:59    3.1 GiB lineorder0004_part_00.gz
2015-05-02 03:42:31    3.1 GiB lineorder0005_part_00.gz
2015-05-05 03:11:50    3.1 GiB lineorder0006_part_00.gz
2015-05-02 04:02:53    3.1 GiB lineorder0007_part_00.gz
2015-05-02 04:53:13    8.3 MiB part0000_part_00.gz
2015-05-02 04:53:15    8.3 MiB part0001_part_00.gz
2015-05-02 04:53:18    8.3 MiB part0002_part_00.gz
2015-05-02 04:53:20    8.3 MiB part0003_part_00.gz
2015-06-03 03:55:40   32.5 MiB supplier.tbl_0000_part_00.gz
2015-05-02 04:53:28   20 Bytes supplier0001_part_00.gz
2015-05-02 04:53:28   20 Bytes supplier0002_part_00.gz
2015-05-02 04:53:28   20 Bytes supplier0003_part_00.gz

行数でいうとこのくらい。

Table Name Rows
LINEORDER 600,037,902
PART 1,400,000
CUSTOMER 3,000,000
SUPPLIER 1,000,000
DWDATE 2,556

環境準備

Redshiftにデータをロード

普通のRedshiftとも比べたいので、手順通りデータをRedshiftにロードします。ロードの手順は以下に記載があるので省略します。

ステップ 1: テストデータセットを作成する - Amazon Redshift

GlueでRedshfit Spectrumで読むParquetファイルを準備

Spectrumで読み込むためのデータをS3上に準備します。ORCやParquetが推奨されてますが、今回はParquetにします。

このParquet化の手順も詳しくは書きませんが、ざっくり以下のような手順です。

  1. Redshiftをクローリング
  2. スクリプトでParquet変換しS3保存
  3. S3上のParquetをクローリングしてGlueのカタログを生成

参考までに2の手順で利用するGlueで利用したPySparkの変換スクリプトを載せておきます。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

catalog_db_name = "ssb"
catalog_table_names = ["otomodb_public_customer","otomodb_public_lineorder","otomodb_public_dwdate","otomodb_public_part","otomodb_public_supplier"]

for tbl in catalog_table_names:
    s3path = "s3://otomo-test/otomodb/%s" % tbl
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = catalog_db_name, table_name = tbl, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasource0")
    resolvechoice2 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice2")
    dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
    datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": s3path }, format = "parquet", transformation_ctx = "datasink4")
job.commit()

Redshiftに外部スキーマ定義

Parquetのファイルをクローリングして集めたテーブル情報は ssb_parquet というDBに入れています。

Spectrumで利用できるようにRedshiftで以下のクエリを実行します。

create external schema ssb_parquet
from data catalog 
database 'ssb_parquet' 
 iam_role 'arn:aws:iam::123456789023:role/YourRedshiftRoleName' 
create external database if not exists;

検証

キャッシュを切る

まず、パフォーマンス検証の前にはResultキャッシュ切っておきます。

set enable_result_cache_for_session to off;

検証用クエリの準備

Redshift向けクエリ

検証で利用するクエリはこちらに記載のある以下のクエリです。

-- Query 3
-- Drill down in time to just one month 

select c_city, s_city, d_year, sum(lo_revenue) as revenue 
from 
customer, 
lineorder, 
supplier, 
dwdate
where lo_custkey = c_custkey
and lo_suppkey = s_suppkey
and lo_orderdate = d_datekey
and (c_city='UNITED KI1' or
c_city='UNITED KI5')
and (s_city='UNITED KI1' or
s_city='UNITED KI5')
and d_yearmonth = 'Dec1997'
group by c_city, s_city, d_year
order by d_year asc, revenue desc;

Redshift Spectrum向けクエリ

上記クエリを修正して、Redshift Spectrum経由でParquet上のデータを読むクエリを作ります。

スキーマ名やテーブル名は適宜読み替えてください

from句の部分だけが変わっています。結果は上記のRedshift向けのクエリと同じになります。

select c_city, s_city, d_year, sum(lo_revenue) as revenue 
from 
ssb_parquet.otomodb_public_customer, 
ssb_parquet.otomodb_public_lineorder, 
ssb_parquet.otomodb_public_supplier, 
ssb_parquet.otomodb_public_dwdate
where lo_custkey = c_custkey
and lo_suppkey = s_suppkey
and lo_orderdate = d_datekey
and (c_city='UNITED KI1' or
c_city='UNITED KI5')
and (s_city='UNITED KI1' or
s_city='UNITED KI5')
and d_yearmonth = 'Dec1997'
group by c_city, s_city, d_year
order by d_year asc, revenue desc;

参考までに両クエリとも以下のような結果になります。

+------------+------------+----------+-----------+
| c_city     | s_city     | d_year   | revenue   |
|------------+------------+----------+-----------|
| UNITED KI5 | UNITED KI5 | 1997     | 373974482 |
| UNITED KI1 | UNITED KI5 | 1997     | 365124652 |
| UNITED KI1 | UNITED KI1 | 1997     | 358824801 |
| UNITED KI5 | UNITED KI1 | 1997     | 355652352 |
+------------+------------+----------+-----------+

パフォーマンス計測

dc2.largeの4ノードでクラスタ組みました。

早速上記のクエリのパフォーマンス計測してみます。

パッと見でSpectrumが圧倒的に遅いのがわかります。50倍以上の開き。。。

構成 1回目 2回目 3回目
4Nodes 3.518s 3.477s 3.476s
4Nodes-Spectrum 180.960s 187.457s 187.209s

調査

S3にデータがあるとは言え、この遅さから、かなり効率データを読んでいるのではないかと想像できます。早速調査開始します。

Explain基礎知識

RedshfitのExplain文理解するには、以下の2ページを読むことをおすすめします。

docs.aws.amazon.com docs.aws.amazon.com

Hash Joinなど一般的な処理の個別の動きを絵で見てイメージしたい場合は以下のページがわかりやすいです。

www.techscore.com

原因追求

では、順を追って原因追求してみます。

クエリ分析画面

Redshiftにはとても見やすい分析画面があるので、そちら使ってクエリの実行結果を見てみます。

パッと見てすぐわかるところから言うと、S3 Query Scan で凄い時間かかっていることが見て取れます。これは、一番大きなファクトテーブル lineorder をS3から持ってくるところです。

f:id:yomon8:20190627155052p:plain

クリックすると詳しい情報が見えるのですが、9.60GBのかなり大きなデータを持ってきているのがわかります。(このデータ、S3上の全データサイズは19GB程度なのですが、Parquetが列指向なので、必要な列だけ全件持ってきているものになります。)

f:id:yomon8:20190627154227p:plain

実行プラン

何でこんなことになっているのか、実行プランを見てみます。

良く見るといくつか変なところがあります。

XN Merge (cost=275000005312500155880713452781568.00..275000005312500155880713452781568.00 rows=8000000 width=1040)
    -> XN Network (cost=275000005312500155880713452781568.00..275000005312500155880713452781568.00 rows=8000000 width=1040)
      -> XN Sort (cost=275000005312500155880713452781568.00..275000005312500155880713452781568.00 rows=8000000 width=1040)
        -> XN HashAggregate (cost=275000005312500155880713452781568.00..275000005312500155880713452781568.00 rows=8000000 width=1040)
          -> XN Hash Join DS_BCAST_INNER (cost=750000000.00..262500005312500170972541873029120.00 rows=1250000000000000076084226809659392 width=1040)
            -> XN Hash Join DS_BCAST_INNER (cost=475000000.00..5250000110250001011048448.00 rows=25000000000000001191182336 width=528)
              -> XN Hash Join DS_BCAST_INNER (cost=225000000.00..107400000500000000.00 rows=500000000000000000 width=528)
                -> XN S3 Query Scan otomodb_public_customer (cost=0.00..250000000.00 rows=10000000000 width=520)
                -> XN Hash (cost=200000000.00..200000000.00 rows=10000000000 width=16)
                  -> XN S3 Query Scan otomodb_public_lineorder (cost=0.00..200000000.00 rows=10000000000 width=16)
              -> XN Hash (cost=225000000.00..225000000.00 rows=10000000000 width=8)
                -> XN S3 Query Scan otomodb_public_dwdate (cost=0.00..225000000.00 rows=10000000000 width=8)
            -> XN Hash (cost=250000000.00..250000000.00 rows=10000000000 width=520)
              -> XN S3 Query Scan otomodb_public_supplier (cost=0.00..250000000.00 rows=10000000000 width=520)

まずは該当のlineorderの部分から。

こんな大きなテーブルを一番最初に全件SCANして、そこからHashテーブル作っているっぽいです。これは遅い。

                -> XN Hash (cost=200000000.00..200000000.00 rows=10000000000 width=16)
                  -> XN S3 Query Scan otomodb_public_lineorder (cost=0.00..200000000.00 rows=10000000000 width=16)

何でこんなことになるのか、、次の変なところがわかれば判明します。

そうです。よく見ると「 rows=10000000000 」という文字列が沢山並んでいるのがわかると思います。 オプティマイザがS3上のデータを全て同一のレコード数だと認識しているのです。

小さなテーブルも大きなテーブルも、10000000000件。それは変な実行プランになってしまうのもしょうがない。

対応(統計情報の設定)

この原因に関しては、オプティマイザに以下表にあるような、実際のテーブルのレコード数を教えることができれば改善しそうです。

Table Name Rows
LINEORDER 600,037,902
PART 1,400,000
CUSTOMER 3,000,000
SUPPLIER 1,000,000
DWDATE 2,556

Redshiftのクエリで設定

numRows というテーブルのプロパティを設定すると、オプティマイザにレコード数を知らせることができます。

ALTER TABLEコマンドで設定できます。

ALTER TABLE ssb_parquet."otomodb_public_lineorder" SET TABLE PROPERTIES ('numRows'='600037902');
ALTER TABLE ssb_parquet."otomodb_public_part" SET TABLE PROPERTIES ('numRows'='1400000');
ALTER TABLE ssb_parquet."otomodb_public_customer" SET TABLE PROPERTIES ('numRows'='3000000');
ALTER TABLE ssb_parquet."otomodb_public_supplier" SET TABLE PROPERTIES ('numRows'='1000000');
ALTER TABLE ssb_parquet."otomodb_public_dwdate" SET TABLE PROPERTIES ('numRows'='2556');

Glueのカタログを使わずに、テーブルを自分で作っている場合は、CREATE EXTERNAL TABLE文の中でも指定可能です。

CREATE EXTERNAL TABLE - Amazon Redshift

設定の実態はGlueのカタログの中に

上記ですが、Redshiftで設定していますが、実は設定の実態はGlueのカタログの中にあります。

以下のようにテーブルのプロパティに numRows が設定されているのがわかります。Redshfit Spectrumの実行計画はこの値を見ています。

Glueの設定なので、Glueの画面やAPIからも設定可能です。

f:id:yomon8:20190627185727p:plain

対応後の結果

実行プラン

統計情報として numRows に正しい値の設定後の実行プランです。

オプティマイザが正しいレコード数を把握できたことから、一番小さい dwdate を読み込んだ内容を元に、lineorder を読み込んでいます。

XN Merge (cost=126560262215193504.00..126560262215213504.00 rows=8000000 width=1040)
    -> XN Network (cost=126560262215193504.00..126560262215213504.00 rows=8000000 width=1040)
      -> XN Sort (cost=126560262215193504.00..126560262215213504.00 rows=8000000 width=1040)
        -> XN HashAggregate (cost=126559262214256240.00..126559262214276240.00 rows=8000000 width=1040)
          -> XN Hash Join DS_BCAST_INNER (cost=110063.90..120807898923256240.00 rows=575136329100000000 width=1040)
            -> XN Hash Join DS_BCAST_INNER (cost=27563.90..13333963048900.66 rows=38342421937800 width=528)
              -> XN Hash Join DS_BCAST_INNER (cost=63.90..2035242789.70 rows=7668484388 width=16)
                -> XN S3 Query Scan otomodb_public_lineorder (cost=0.00..12000758.04 rows=600037902 width=16)
                -> XN Hash (cost=57.51..57.51 rows=2556 width=8)
                  -> XN S3 Query Scan otomodb_public_dwdate (cost=0.00..57.51 rows=2556 width=8)
              -> XN Hash (cost=25000.00..25000.00 rows=1000000 width=520)
                -> XN S3 Query Scan otomodb_public_supplier (cost=0.00..25000.00 rows=1000000 width=520)
            -> XN Hash (cost=75000.00..75000.00 rows=3000000 width=520)
              -> XN S3 Query Scan otomodb_public_customer (cost=0.00..75000.00 rows=3000000 width=520)

実際に lineorder から読み込まれた容量が、先程の9.6GBから、123MBに大幅に小さくなっているのがわかります。行数も1%程度まで絞り込めています。

f:id:yomon8:20190627190550p:plain

実行結果

先程の実行結果に赤字で追加しています。かなりスピードアップしていることがわかります。

構成 1回目 2回目 3回目
4Nodes 3.518s 3.477s 3.476s
4Nodes-Spectrum 180.960s 187.457s 187.209s
4Nodes-Spectrum(Tuned) 5.967s 4.849s 5.066s

その他

ディメンションテーブルをローカルに持つだけでもかなり早くなる

今回のような大きな少数のファクトテーブルと、小さいディメンションテーブルのスタースキーマ構造で、S3上のファクトテーブルの読み込みがパフォーマンス上の支配的なボトルネックとなっている場合、小さいディメンションテーブルをローカルに持ってくるだけでも、比較的健全な実行プランになります。(デフォルトで設定されている rows=10000000000 が十分に大きめの値なので)

技術者的に気持ち悪い気もするので、オススメはしないですが。

例えば、上記のクエリでディメンションテーブルだけローカルを使った場合。

select c_city, s_city, d_year, sum(lo_revenue) as revenue 
from 
customer, 
ssb_parquet.otomodb_public_lineorder, 
supplier, 
dwdate
where lo_custkey = c_custkey
and lo_suppkey = s_suppkey
and lo_orderdate = d_datekey
and (c_city='UNITED KI1' or
c_city='UNITED KI5')
and (s_city='UNITED KI1' or
s_city='UNITED KI5')
and d_yearmonth = 'Dec1997'
group by c_city, s_city, d_year
order by d_year asc, revenue desc;

以下のような実行プランになり、lineorder の読み込み量は絞られます。

XN Merge (cost=1009144884276.02..1009144884276.02 rows=1 width=36)
    -> XN Network (cost=1009144884276.02..1009144884276.02 rows=1 width=36)
      -> XN Sort (cost=1009144884276.02..1009144884276.02 rows=1 width=36)
        -> XN HashAggregate (cost=9144884276.00..9144884276.01 rows=1 width=36)
          -> XN Hash Join DS_DIST_ALL_NONE (cost=60110.76..9144884200.85 rows=7515 width=36)
            -> XN Hash Join DS_BCAST_INNER (cost=60078.73..9144876349.26 rows=619553 width=36)
              -> XN Hash Join DS_BCAST_INNER (cost=15019.67..2528561719.67 rows=78670000 width=26)
                -> XN S3 Query Scan otomodb_public_lineorder (cost=0.00..200000000.00 rows=10000000000 width=16)
                -> XN Hash (cost=15000.00..15000.00 rows=7867 width=18)
                  -> XN Seq Scan on supplier (cost=0.00..15000.00 rows=7867 width=18)
              -> XN Hash (cost=45000.00..45000.00 rows=23626 width=18)
                -> XN Seq Scan on customer (cost=0.00..45000.00 rows=23626 width=18)
            -> XN Hash (cost=31.95..31.95 rows=31 width=8)
              -> XN Seq Scan on dwdate (cost=0.00..31.95 rows=31 width=8)

小さいクラスタではSpectrumの方が早い場合も

先程の例ではチューニング後も、ディスクローカルにデータを持っている場合より、Spectrumの方が遅いままでした。

しかし、場合によってはローカルにデータを持つより、Spectrumの方が早いシナリオもあります。上記の検証は4ノードのクラスタで行っていましたが、同様の検証を1ノード構成で行った場合の結果を赤字で追記しました。

このように、Redshift層の1ノードでは処理しきれないものでも、Spectrum層がスケールすることで高速に処理できるケースもあります。当然、この場合も正しい統計情報は必須です。

構成 1回目 2回目 3回目
4Nodes 3.518s 3.477s 3.476s
4Nodes-Spectrum 180.960s 187.457s 187.209s
4Nodes-Spectrum(Tuned) 5.967s 4.849s 5.066s
1Node 105.027s 102.859s 103.316s
1Node-Spectrum 153.268s 152.031s 151.818s
1Node-Spectrum(Tuned) 11.995s 11.660s 12.031s

numRowsを自動更新するには

Analyze文はまだS3上のデータをサポートしていないようです。

> analyze ssb_parquet.otomodb_public_lineorder;                                                                                                                                              ANALYZE of s3 tables is not yet supported.

データ取り込みのパイプラインの最後にselect countして、numRows を更新することが考えられます。

Glueを使っていて、かつParquet形式なら、Glueのカタログの recordCount の項目がParquetメタデータから読み込んだものなので、これを numRows にコピーしても良さそうです。

参考

統計情報について特に見逃しやすいので書きましたが、この辺りも纏まっています。

docs.aws.amazon.com

AWSのブログです。統計情報の件も記載ありました。

aws.amazon.com

上記のブログの英語版です。こちらの方が12個ということで、加筆があったようです。

aws.amazon.com