SQLAlchemy を用いた Snowflake へのアクセス
今回は、Snowflake SQLAlchemy ツールキットおよびPythonコネクターの使用にあるような、 SQLAlchemy を用いた Snowflake データベースへのアクセスについてまとめようと思います。
また、 Snowflake の JSON データへアクセスするのを容易にするために開発したライブラリである snowflake-sqlalchemy-json についても触れようと思います。
Snowflake と半構造化データ
Snowflake は、いわゆるクラウドでビッグデータを処理するためのデータプラットフォームです。
細かく最適化を考えなくても、それなりによいパフォーマンスでデータの保存/読み出しができ、 SQL でのアクセスもサポートされています。
2022年5月時点では、30日の無料トライアルを受け付けており、どんな雰囲気かは無料で試すことができます。
Snowflake は、一般的なデータ型をサポートしており、他のデータベースを使っていた人であれば、違和感なく使えるように思います。
なお、ビッグデータ向けのデータベースでよくあるように、 Unique 制約などは強制されないので、そこには注意が必要です。
少し特徴的な型として、半構造化データ型というものがあります。
これは、 VARIANT
などの型をカラムに指定することで、カラム内に JSON, Avro などの半構造化データを入れておき、 SQL から直接アクセスできるようにするものです。
例えば SELECT column_name:key1 FROM table;
のように記述することで、 table
テーブルの column_name
カラムにある {"key1": "value1", "key2": "value2"}
の中の "value1"
にアクセスすることができます。
WHERE
の条件部分で指定することも可能で、通常のカラムと同様にアクセスすることができます。
SQLAlchemy による SQL の生成
話は変わりますが、 Python のライブラリで SQLAlchemy というものがあります。
このライブラリは ORM として使うこともできますが、生の SQL をラップし Python から扱いやすくするという、 core と呼ばれる使い方もあります。
これは、以下のように Python のコードとして SQL を組み立てられるものです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | from sqlalchemy import Table, Column, Integer, String, MetaData from sqlalchemy import select metadata_obj = MetaData() User = Table( 'users' , metadata_obj, Column( 'id' , Integer, primary_key = True ), Column( 'name' , String), ) query = select( User.c. id , User.c.name, ).select_from( User, ).where( User.c.name.startswith( "A" ), ).order_by( User.c.name.desc(), ).limit( 10 ) |
SQLite の場合、上記の query
は、以下のような SQL に変換されます。
1 2 3 4 | SELECT users.id, users. name FROM users WHERE (users. name LIKE 'A' || '%' ) ORDER BY users. name DESC LIMIT 10 OFFSET 0 |
このように SQLAlchemy を用いると、 Python のコードとして SQL を作ることができます。
他にも JOIN や CTE (Common Table Expression) を用いた例を書いておくと以下のようになります。
この例では、テーブルの定義に declarative_base
を使っています。この場合は、 Table
を使う場合と異なり、 .c
の部分が不要になり、補完も効きやすくなるので、書きやすいかもしれません。
なお、例自体には特に意味はありません。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | from sqlalchemy import Column, Integer, String, ForeignKey from sqlalchemy import select, or_ from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class User(Base): __tablename__ = "users" id = Column(Integer, primary_key = True ) name = Column(String) class Book(Base): __tablename__ = "books" id = Column(Integer, primary_key = True ) title = Column(String) author_id = Column( None , ForeignKey( 'users.id' )) books_with_title_a = select( Book.title, Book.author_id, ).select_from( Book, ).where( or_( Book.title.startswith( "A" ), Book.title.startswith( "a" ), ), ).cte() query = select( books_with_title_a.c.title, User.name, ).select_from( books_with_title_a, ).join( User, User. id = = books_with_title_a.c.author_id, ).order_by( books_with_title_a.c.title, ) |
Python で記述することで、 CTE 部分を変数として保持でき、それなりに分かりやすく記述できているように思います。
上記の query
からは、下記の SQL が生成されます。
1 2 3 4 5 6 | WITH anon_1 AS ( SELECT books.title AS title, books.author_id AS author_id FROM books WHERE (books.title LIKE 'A' || '%' ) OR (books.title LIKE 'a' || '%' )) SELECT anon_1.title, users. name FROM anon_1 JOIN users ON users.id = anon_1.author_id ORDER BY anon_1.title |
snowflake-sqlalchemy
Snowflake でも、上述した SQLAlchemy を使うことができます。
公式のページで使い方がまとめられているので、こちらのページを参考にするのがよいと思います。
なお、 Snowflake の場合、テーブルを指定する際には DATABASE.SCHEMA.TABLE
のように、データベース名とスキーマ名でテーブル名を修飾することで、一意にテーブルを指定することができます。
SQLAlchemy で、このようにデータベース名やスキーマ名も含めて修飾したテーブル名を指定する場合、以下のように quoted_name
を用いてテーブル名を記述することで、実現できます。
1 2 3 4 5 6 7 8 9 10 11 12 13 | from sqlalchemy import Column, Integer, String from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql.elements import quoted_name Base = declarative_base() class User(Base): # 下記のように、 quoted_name でテーブル名を指定する __tablename__ = quoted_name( "database.schema.users" , False ) id = Column(Integer, primary_key = True ) name = Column(String) |
snowflake-sqlalchemy-json
上述したように、 Snowflake では VARIANT
型のカラムに対しては、 JSON 形式などの値に対して SQL 内から所定の書式でアクセスすることができます。
snowflake-sqlalchemy では、その辺りの処理は 2022年5月時点では実装されていないようだったので、 snowflake-sqlalchemy-json でいくつかを実装してみました。
現時点では読み込みのみしかサポートしておらず、機能も限定的ですが、それなりに使えそうに思うので、このライブラリを使った例を挙げておきます。
ここでは、 Snowflake のアカウント作成時に例として提供されていた SNOWFLAKE_SAMPLE_DATA.WEATHER.DAILY_14_TOTAL
テーブルに含まれているデータを読み込む際の例について挙げます。データのサンプルは、末尾に挙げておきます。
VARIANT
型を SQLAlchemy の JSON にマッピングしアクセス
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | from sqlalchemy import create_engine, select, Column, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql.elements import quoted_name import snowflake_sqlalchemy_json snowflake_sqlalchemy_json.register_json_handler() Base = declarative_base() class Daily14Total(Base): __tablename__ = quoted_name( "SNOWFLAKE_SAMPLE_DATA.WEATHER.DAILY_14_TOTAL" , False ) T = Column(DateTime, primary_key = True ) V = Column(JSON) query = select( Daily14Total.V[ "city" ][ "id" ], Daily14Total.V[ "city" ][ "name" ], ).select_from( Daily14Total, ).where( Daily14Total.V[ "city" ][ "country" ] = = "IN" ).limit( 10 ) |
上記の query
で、 V
カラムに入っているデータの中の city
キーで取得できるオブジェクト内の要素にアクセスすることができます。
基本的に、 Python の dict
と同様のアクセス方法になり、直感的に書けているのが分かると思います。
lateral
と flatten
を用いたアクセス
Snowflake の VARIANT
型へのアクセスで重要な概念である lateral
結合を用いたアクセスについては、下記のように記述できます。
なお、 lateral
結合については結局 LATERAL とは何者なのか?の記事が分かりやすいように思います。
flatten
は、あくまで通常の関数なので、 SQLAlchemy で関数を呼び出す際の形式である func.flatten(xxx)
として記載します。
また、 lateral
は SQLAlchemy がサポートしているので、公式のチュートリアルの通りに呼び出します。
lateral()
で返されるテーブルの value
カラムにアクセスすると、 flatten
で展開された要素にアクセスすることができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | from sqlalchemy import and_, create_engine, func, select, Column, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql.elements import quoted_name import snowflake_sqlalchemy_json snowflake_sqlalchemy_json.register_json_handler() Base = declarative_base() class Daily14Total(Base): __tablename__ = quoted_name( "SNOWFLAKE_SAMPLE_DATA.WEATHER.DAILY_14_TOTAL" , False ) T = Column(DateTime, primary_key = True ) V = Column(JSON) lateral_flatten = func.flatten(Daily14Total.V[ "data" ]).lateral() query = select( Daily14Total.T, lateral_flatten.c.value[ "clouds" ], lateral_flatten.c.value[ "deg" ], lateral_flatten.c.value[ "rain" ], ).select_from( Daily14Total, lateral_flatten, ).where( and_( lateral_flatten.c.value[ "deg" ] > 273 , lateral_flatten.c.value[ "rain" ] > 0 , ), ).limit( 10 ) |
なお、上記は FROM 句に FROM DAILY_14_TOTAL, LATERAL FLATTEN(xxx)
のように lateral
を記述する方法ですが、この書き方では Warning が出るかもしれません。
その場合、以下のように、 JOIN 句として独立させて書くこともできます。
私はこちらの方が分かりやすいように思います。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | from sqlalchemy import and_, create_engine, func, select, Column, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql.elements import quoted_name import snowflake_sqlalchemy_json snowflake_sqlalchemy_json.register_json_handler() Base = declarative_base() class Daily14Total(Base): __tablename__ = quoted_name( "SNOWFLAKE_SAMPLE_DATA.WEATHER.DAILY_14_TOTAL" , False ) T = Column(DateTime, primary_key = True ) V = Column(JSON) lateral_flatten = func.flatten(Daily14Total.V[ "data" ]).lateral() query = select( Daily14Total.T, lateral_flatten.c.value[ "clouds" ], lateral_flatten.c.value[ "deg" ], lateral_flatten.c.value[ "rain" ], ).select_from(Daily14Total).join( lateral_flatten, True , ).where( and_( lateral_flatten.c.value[ "deg" ] > 273 , lateral_flatten.c.value[ "rain" ] > 0 , ), ).limit( 10 ) |
用いたデータの形式
最後に、 DAILY_14_TOTAL
テーブルの V
カラムに入っている JSON データの形式を挙げておきます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | { "city" : { "coord" : { "lat" : 9.7, "lon" : 78.48333 }, "country" : "IN" , "id" : 1263965, "name" : "Manamadurai" }, "data" : [{ "clouds" : 80, "deg" : 277, "dt" : 1500357600, "humidity" : 47, "pressure" : 980.94, "speed" : 2.13, "temp" : { "day" : 303.15, "eve" : 303.15, "max" : 303.15, "min" : 300.13, "morn" : 303.15, "night" : 300.13 }, "uvi" : 12.892, "weather" : [{ "description" : "broken clouds" , "icon" : "04n" , "id" : 803, "main" : "Clouds" }] }, { "clouds" : 80, "deg" : 234, "dt" : 1500444000, "humidity" : 57, "pressure" : 982.09, "rain" : 0.85, "speed" : 2.9, "temp" : { "day" : 302.81, "eve" : 304.33, "max" : 304.82, "min" : 296.22, "morn" : 296.22, "night" : 298.93 }, "uvi" : 12.679, "weather" : [{ "description" : "light rain" , "icon" : "10d" , "id" : 500, "main" : "Rain" }] }, { "clouds" : 76, "deg" : 243, "dt" : 1500530400, "humidity" : 61, "pressure" : 980.75, "rain" : 3, "speed" : 1.71, "temp" : { "day" : 303.18, "eve" : 304.86, "max" : 304.86, "min" : 297.51, "morn" : 298.36, "night" : 297.51 }, "uvi" : 12.986, "weather" : [{ "description" : "light rain" , "icon" : "10d" , "id" : 500, "main" : "Rain" }] }], "time" : 1500396663 } |
以上のようにすることで、 Snowflake の VARIANT
型へのアクセスを SQLAlchemy を用いて記載することができるようになります。
0 件のコメント:
コメントを投稿