一番下の参考情報にも載せている通り、Redshift Spectrumのチューニングには、パーティショニングやファイルフォーマット等色々なポイントがありますが、ここでは特に見落としやすい、かつ効果の高い統計情報について書いていきます。
利用するデータ
適切なパフォーマンス検証用のデータを探していたのですが、わかりやすいように公式のRedshiftのチューニングのチュートリアルのデータを利用することにします。
このチュートリアル自体はSpectrumではなくRedshiftそのもののチューニングなのですが、Redshift側のソートキーやキー分散のチューニングは、この記事では扱いません。
提供されているデータは、構造としては大きなファクトテーブル(LINEORDER)とその属性情報が格納されたディメンションテーブルで構成されるスタースキーマ構造です。
S3上にあるデータファイルはこのくらいのサイズです。
$ aws s3 ls --human-readable s3://awssampledbuswest2/ssbgz/ 2015-04-24 11:34:23 0 Bytes 2015-05-05 03:48:30 100.5 MiB customer0002_part_00.gz 2015-05-07 03:41:02 24.6 KiB dwdate.tbl.gz 2015-05-02 02:37:30 3.1 GiB lineorder0000_part_00.gz 2015-05-02 02:51:24 3.1 GiB lineorder0001_part_00.gz 2015-05-02 03:05:06 3.1 GiB lineorder0002_part_00.gz 2015-05-02 03:20:07 3.1 GiB lineorder0003_part_00.gz 2015-05-02 03:31:59 3.1 GiB lineorder0004_part_00.gz 2015-05-02 03:42:31 3.1 GiB lineorder0005_part_00.gz 2015-05-05 03:11:50 3.1 GiB lineorder0006_part_00.gz 2015-05-02 04:02:53 3.1 GiB lineorder0007_part_00.gz 2015-05-02 04:53:13 8.3 MiB part0000_part_00.gz 2015-05-02 04:53:15 8.3 MiB part0001_part_00.gz 2015-05-02 04:53:18 8.3 MiB part0002_part_00.gz 2015-05-02 04:53:20 8.3 MiB part0003_part_00.gz 2015-06-03 03:55:40 32.5 MiB supplier.tbl_0000_part_00.gz 2015-05-02 04:53:28 20 Bytes supplier0001_part_00.gz 2015-05-02 04:53:28 20 Bytes supplier0002_part_00.gz 2015-05-02 04:53:28 20 Bytes supplier0003_part_00.gz
行数でいうとこのくらい。
Table Name | Rows |
---|---|
LINEORDER | 600,037,902 |
PART | 1,400,000 |
CUSTOMER | 3,000,000 |
SUPPLIER | 1,000,000 |
DWDATE | 2,556 |
環境準備
Redshiftにデータをロード
普通のRedshiftとも比べたいので、手順通りデータをRedshiftにロードします。ロードの手順は以下に記載があるので省略します。
ステップ 1: テストデータセットを作成する - Amazon Redshift
GlueでRedshfit Spectrumで読むParquetファイルを準備
Spectrumで読み込むためのデータをS3上に準備します。ORCやParquetが推奨されてますが、今回はParquetにします。
このParquet化の手順も詳しくは書きませんが、ざっくり以下のような手順です。
- Redshiftをクローリング
- スクリプトでParquet変換しS3保存
- S3上のParquetをクローリングしてGlueのカタログを生成
参考までに2の手順で利用するGlueで利用したPySparkの変換スクリプトを載せておきます。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) catalog_db_name = "ssb" catalog_table_names = ["otomodb_public_customer","otomodb_public_lineorder","otomodb_public_dwdate","otomodb_public_part","otomodb_public_supplier"] for tbl in catalog_table_names: s3path = "s3://otomo-test/otomodb/%s" % tbl datasource0 = glueContext.create_dynamic_frame.from_catalog(database = catalog_db_name, table_name = tbl, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasource0") resolvechoice2 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice2") dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": s3path }, format = "parquet", transformation_ctx = "datasink4") job.commit()
Redshiftに外部スキーマ定義
Parquetのファイルをクローリングして集めたテーブル情報は ssb_parquet
というDBに入れています。
Spectrumで利用できるようにRedshiftで以下のクエリを実行します。
create external schema ssb_parquet from data catalog database 'ssb_parquet' iam_role 'arn:aws:iam::123456789023:role/YourRedshiftRoleName' create external database if not exists;
検証
キャッシュを切る
まず、パフォーマンス検証の前にはResultキャッシュ切っておきます。
set enable_result_cache_for_session to off;
検証用クエリの準備
Redshift向けクエリ
検証で利用するクエリはこちらに記載のある以下のクエリです。
-- Query 3 -- Drill down in time to just one month select c_city, s_city, d_year, sum(lo_revenue) as revenue from customer, lineorder, supplier, dwdate where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey and (c_city='UNITED KI1' or c_city='UNITED KI5') and (s_city='UNITED KI1' or s_city='UNITED KI5') and d_yearmonth = 'Dec1997' group by c_city, s_city, d_year order by d_year asc, revenue desc;
Redshift Spectrum向けクエリ
上記クエリを修正して、Redshift Spectrum経由でParquet上のデータを読むクエリを作ります。
※スキーマ名やテーブル名は適宜読み替えてください
from句の部分だけが変わっています。結果は上記のRedshift向けのクエリと同じになります。
select c_city, s_city, d_year, sum(lo_revenue) as revenue from ssb_parquet.otomodb_public_customer, ssb_parquet.otomodb_public_lineorder, ssb_parquet.otomodb_public_supplier, ssb_parquet.otomodb_public_dwdate where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey and (c_city='UNITED KI1' or c_city='UNITED KI5') and (s_city='UNITED KI1' or s_city='UNITED KI5') and d_yearmonth = 'Dec1997' group by c_city, s_city, d_year order by d_year asc, revenue desc;
参考までに両クエリとも以下のような結果になります。
+------------+------------+----------+-----------+ | c_city | s_city | d_year | revenue | |------------+------------+----------+-----------| | UNITED KI5 | UNITED KI5 | 1997 | 373974482 | | UNITED KI1 | UNITED KI5 | 1997 | 365124652 | | UNITED KI1 | UNITED KI1 | 1997 | 358824801 | | UNITED KI5 | UNITED KI1 | 1997 | 355652352 | +------------+------------+----------+-----------+
パフォーマンス計測
dc2.largeの4ノードでクラスタ組みました。
早速上記のクエリのパフォーマンス計測してみます。
パッと見でSpectrumが圧倒的に遅いのがわかります。50倍以上の開き。。。
構成 | 1回目 | 2回目 | 3回目 |
---|---|---|---|
4Nodes | 3.518s | 3.477s | 3.476s |
4Nodes-Spectrum | 180.960s | 187.457s | 187.209s |
調査
S3にデータがあるとは言え、この遅さから、かなり効率データを読んでいるのではないかと想像できます。早速調査開始します。
Explain基礎知識
RedshfitのExplain文理解するには、以下の2ページを読むことをおすすめします。
docs.aws.amazon.com docs.aws.amazon.com
Hash Joinなど一般的な処理の個別の動きを絵で見てイメージしたい場合は以下のページがわかりやすいです。
原因追求
では、順を追って原因追求してみます。
クエリ分析画面
Redshiftにはとても見やすい分析画面があるので、そちら使ってクエリの実行結果を見てみます。
パッと見てすぐわかるところから言うと、S3 Query Scan
で凄い時間かかっていることが見て取れます。これは、一番大きなファクトテーブル lineorder
をS3から持ってくるところです。
クリックすると詳しい情報が見えるのですが、9.60GBのかなり大きなデータを持ってきているのがわかります。(このデータ、S3上の全データサイズは19GB程度なのですが、Parquetが列指向なので、必要な列だけ全件持ってきているものになります。)
実行プラン
何でこんなことになっているのか、実行プランを見てみます。
良く見るといくつか変なところがあります。
XN Merge (cost=275000005312500155880713452781568.00..275000005312500155880713452781568.00 rows=8000000 width=1040) -> XN Network (cost=275000005312500155880713452781568.00..275000005312500155880713452781568.00 rows=8000000 width=1040) -> XN Sort (cost=275000005312500155880713452781568.00..275000005312500155880713452781568.00 rows=8000000 width=1040) -> XN HashAggregate (cost=275000005312500155880713452781568.00..275000005312500155880713452781568.00 rows=8000000 width=1040) -> XN Hash Join DS_BCAST_INNER (cost=750000000.00..262500005312500170972541873029120.00 rows=1250000000000000076084226809659392 width=1040) -> XN Hash Join DS_BCAST_INNER (cost=475000000.00..5250000110250001011048448.00 rows=25000000000000001191182336 width=528) -> XN Hash Join DS_BCAST_INNER (cost=225000000.00..107400000500000000.00 rows=500000000000000000 width=528) -> XN S3 Query Scan otomodb_public_customer (cost=0.00..250000000.00 rows=10000000000 width=520) -> XN Hash (cost=200000000.00..200000000.00 rows=10000000000 width=16) -> XN S3 Query Scan otomodb_public_lineorder (cost=0.00..200000000.00 rows=10000000000 width=16) -> XN Hash (cost=225000000.00..225000000.00 rows=10000000000 width=8) -> XN S3 Query Scan otomodb_public_dwdate (cost=0.00..225000000.00 rows=10000000000 width=8) -> XN Hash (cost=250000000.00..250000000.00 rows=10000000000 width=520) -> XN S3 Query Scan otomodb_public_supplier (cost=0.00..250000000.00 rows=10000000000 width=520)
まずは該当のlineorderの部分から。
こんな大きなテーブルを一番最初に全件SCANして、そこからHashテーブル作っているっぽいです。これは遅い。
-> XN Hash (cost=200000000.00..200000000.00 rows=10000000000 width=16) -> XN S3 Query Scan otomodb_public_lineorder (cost=0.00..200000000.00 rows=10000000000 width=16)
何でこんなことになるのか、、次の変なところがわかれば判明します。
そうです。よく見ると「 rows=10000000000
」という文字列が沢山並んでいるのがわかると思います。 オプティマイザがS3上のデータを全て同一のレコード数だと認識しているのです。
小さなテーブルも大きなテーブルも、10000000000件。それは変な実行プランになってしまうのもしょうがない。
対応(統計情報の設定)
この原因に関しては、オプティマイザに以下表にあるような、実際のテーブルのレコード数を教えることができれば改善しそうです。
Table Name | Rows |
---|---|
LINEORDER | 600,037,902 |
PART | 1,400,000 |
CUSTOMER | 3,000,000 |
SUPPLIER | 1,000,000 |
DWDATE | 2,556 |
Redshiftのクエリで設定
numRows
というテーブルのプロパティを設定すると、オプティマイザにレコード数を知らせることができます。
ALTER TABLEコマンドで設定できます。
ALTER TABLE ssb_parquet."otomodb_public_lineorder" SET TABLE PROPERTIES ('numRows'='600037902'); ALTER TABLE ssb_parquet."otomodb_public_part" SET TABLE PROPERTIES ('numRows'='1400000'); ALTER TABLE ssb_parquet."otomodb_public_customer" SET TABLE PROPERTIES ('numRows'='3000000'); ALTER TABLE ssb_parquet."otomodb_public_supplier" SET TABLE PROPERTIES ('numRows'='1000000'); ALTER TABLE ssb_parquet."otomodb_public_dwdate" SET TABLE PROPERTIES ('numRows'='2556');
Glueのカタログを使わずに、テーブルを自分で作っている場合は、CREATE EXTERNAL TABLE文の中でも指定可能です。
CREATE EXTERNAL TABLE - Amazon Redshift
設定の実態はGlueのカタログの中に
上記ですが、Redshiftで設定していますが、実は設定の実態はGlueのカタログの中にあります。
以下のようにテーブルのプロパティに numRows
が設定されているのがわかります。Redshfit Spectrumの実行計画はこの値を見ています。
Glueの設定なので、Glueの画面やAPIからも設定可能です。
対応後の結果
実行プラン
統計情報として numRows
に正しい値の設定後の実行プランです。
オプティマイザが正しいレコード数を把握できたことから、一番小さい dwdate
を読み込んだ内容を元に、lineorder
を読み込んでいます。
XN Merge (cost=126560262215193504.00..126560262215213504.00 rows=8000000 width=1040) -> XN Network (cost=126560262215193504.00..126560262215213504.00 rows=8000000 width=1040) -> XN Sort (cost=126560262215193504.00..126560262215213504.00 rows=8000000 width=1040) -> XN HashAggregate (cost=126559262214256240.00..126559262214276240.00 rows=8000000 width=1040) -> XN Hash Join DS_BCAST_INNER (cost=110063.90..120807898923256240.00 rows=575136329100000000 width=1040) -> XN Hash Join DS_BCAST_INNER (cost=27563.90..13333963048900.66 rows=38342421937800 width=528) -> XN Hash Join DS_BCAST_INNER (cost=63.90..2035242789.70 rows=7668484388 width=16) -> XN S3 Query Scan otomodb_public_lineorder (cost=0.00..12000758.04 rows=600037902 width=16) -> XN Hash (cost=57.51..57.51 rows=2556 width=8) -> XN S3 Query Scan otomodb_public_dwdate (cost=0.00..57.51 rows=2556 width=8) -> XN Hash (cost=25000.00..25000.00 rows=1000000 width=520) -> XN S3 Query Scan otomodb_public_supplier (cost=0.00..25000.00 rows=1000000 width=520) -> XN Hash (cost=75000.00..75000.00 rows=3000000 width=520) -> XN S3 Query Scan otomodb_public_customer (cost=0.00..75000.00 rows=3000000 width=520)
実際に lineorder
から読み込まれた容量が、先程の9.6GBから、123MBに大幅に小さくなっているのがわかります。行数も1%程度まで絞り込めています。
実行結果
先程の実行結果に赤字で追加しています。かなりスピードアップしていることがわかります。
構成 | 1回目 | 2回目 | 3回目 |
---|---|---|---|
4Nodes | 3.518s | 3.477s | 3.476s |
4Nodes-Spectrum | 180.960s | 187.457s | 187.209s |
4Nodes-Spectrum(Tuned) | 5.967s | 4.849s | 5.066s |
その他
ディメンションテーブルをローカルに持つだけでもかなり早くなる
今回のような大きな少数のファクトテーブルと、小さいディメンションテーブルのスタースキーマ構造で、S3上のファクトテーブルの読み込みがパフォーマンス上の支配的なボトルネックとなっている場合、小さいディメンションテーブルをローカルに持ってくるだけでも、比較的健全な実行プランになります。(デフォルトで設定されている rows=10000000000
が十分に大きめの値なので)
技術者的に気持ち悪い気もするので、オススメはしないですが。
例えば、上記のクエリでディメンションテーブルだけローカルを使った場合。
select c_city, s_city, d_year, sum(lo_revenue) as revenue from customer, ssb_parquet.otomodb_public_lineorder, supplier, dwdate where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey and (c_city='UNITED KI1' or c_city='UNITED KI5') and (s_city='UNITED KI1' or s_city='UNITED KI5') and d_yearmonth = 'Dec1997' group by c_city, s_city, d_year order by d_year asc, revenue desc;
以下のような実行プランになり、lineorder
の読み込み量は絞られます。
XN Merge (cost=1009144884276.02..1009144884276.02 rows=1 width=36) -> XN Network (cost=1009144884276.02..1009144884276.02 rows=1 width=36) -> XN Sort (cost=1009144884276.02..1009144884276.02 rows=1 width=36) -> XN HashAggregate (cost=9144884276.00..9144884276.01 rows=1 width=36) -> XN Hash Join DS_DIST_ALL_NONE (cost=60110.76..9144884200.85 rows=7515 width=36) -> XN Hash Join DS_BCAST_INNER (cost=60078.73..9144876349.26 rows=619553 width=36) -> XN Hash Join DS_BCAST_INNER (cost=15019.67..2528561719.67 rows=78670000 width=26) -> XN S3 Query Scan otomodb_public_lineorder (cost=0.00..200000000.00 rows=10000000000 width=16) -> XN Hash (cost=15000.00..15000.00 rows=7867 width=18) -> XN Seq Scan on supplier (cost=0.00..15000.00 rows=7867 width=18) -> XN Hash (cost=45000.00..45000.00 rows=23626 width=18) -> XN Seq Scan on customer (cost=0.00..45000.00 rows=23626 width=18) -> XN Hash (cost=31.95..31.95 rows=31 width=8) -> XN Seq Scan on dwdate (cost=0.00..31.95 rows=31 width=8)
小さいクラスタではSpectrumの方が早い場合も
先程の例ではチューニング後も、ディスクローカルにデータを持っている場合より、Spectrumの方が遅いままでした。
しかし、場合によってはローカルにデータを持つより、Spectrumの方が早いシナリオもあります。上記の検証は4ノードのクラスタで行っていましたが、同様の検証を1ノード構成で行った場合の結果を赤字で追記しました。
このように、Redshift層の1ノードでは処理しきれないものでも、Spectrum層がスケールすることで高速に処理できるケースもあります。当然、この場合も正しい統計情報は必須です。
構成 | 1回目 | 2回目 | 3回目 |
---|---|---|---|
4Nodes | 3.518s | 3.477s | 3.476s |
4Nodes-Spectrum | 180.960s | 187.457s | 187.209s |
4Nodes-Spectrum(Tuned) | 5.967s | 4.849s | 5.066s |
1Node | 105.027s | 102.859s | 103.316s |
1Node-Spectrum | 153.268s | 152.031s | 151.818s |
1Node-Spectrum(Tuned) | 11.995s | 11.660s | 12.031s |
numRowsを自動更新するには
Analyze文はまだS3上のデータをサポートしていないようです。
> analyze ssb_parquet.otomodb_public_lineorder; ANALYZE of s3 tables is not yet supported.
データ取り込みのパイプラインの最後にselect countして、numRows
を更新することが考えられます。
Glueを使っていて、かつParquet形式なら、Glueのカタログの recordCount
の項目がParquetメタデータから読み込んだものなので、これを numRows
にコピーしても良さそうです。
参考
統計情報について特に見逃しやすいので書きましたが、この辺りも纏まっています。
AWSのブログです。統計情報の件も記載ありました。
上記のブログの英語版です。こちらの方が12個ということで、加筆があったようです。