SyntaxHighlighter

2022年5月5日木曜日

SQLAlchemy を用いた Snowflake へのアクセス

SQLAlchemy を用いた Snowflake へのアクセス

今回は、Snowflake SQLAlchemy ツールキットおよびPythonコネクターの使用にあるような、 SQLAlchemy を用いた Snowflake データベースへのアクセスについてまとめようと思います。

また、 Snowflake の JSON データへアクセスするのを容易にするために開発したライブラリである snowflake-sqlalchemy-json についても触れようと思います。

Snowflake と半構造化データ

Snowflake は、いわゆるクラウドでビッグデータを処理するためのデータプラットフォームです。
細かく最適化を考えなくても、それなりによいパフォーマンスでデータの保存/読み出しができ、 SQL でのアクセスもサポートされています。

2022年5月時点では、30日の無料トライアルを受け付けており、どんな雰囲気かは無料で試すことができます。

Snowflake は、一般的なデータ型をサポートしており、他のデータベースを使っていた人であれば、違和感なく使えるように思います。
なお、ビッグデータ向けのデータベースでよくあるように、 Unique 制約などは強制されないので、そこには注意が必要です。

少し特徴的な型として、半構造化データ型というものがあります。
これは、 VARIANT などの型をカラムに指定することで、カラム内に JSON, Avro などの半構造化データを入れておき、 SQL から直接アクセスできるようにするものです。

例えば SELECT column_name:key1 FROM table; のように記述することで、 table テーブルの column_name カラムにある {"key1": "value1", "key2": "value2"} の中の "value1" にアクセスすることができます。
WHERE の条件部分で指定することも可能で、通常のカラムと同様にアクセスすることができます。

SQLAlchemy による SQL の生成

話は変わりますが、 Python のライブラリで SQLAlchemy というものがあります。
このライブラリは ORM として使うこともできますが、生の SQL をラップし Python から扱いやすくするという、 core と呼ばれる使い方もあります。

これは、以下のように Python のコードとして SQL を組み立てられるものです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from sqlalchemy import Table, Column, Integer, String, MetaData
from sqlalchemy import select
 
metadata_obj = MetaData()
User = Table('users', metadata_obj,
    Column('id', Integer, primary_key=True),
    Column('name', String),
)
 
query = select(
    User.c.id,
    User.c.name,
).select_from(
    User,
).where(
    User.c.name.startswith("A"),
).order_by(
    User.c.name.desc(),
).limit(10)

SQLite の場合、上記の query は、以下のような SQL に変換されます。

1
2
3
4
SELECT users.id, users.name
FROM users
WHERE (users.name LIKE 'A' || '%') ORDER BY users.name DESC
 LIMIT 10 OFFSET 0

このように SQLAlchemy を用いると、 Python のコードとして SQL を作ることができます。

他にも JOIN や CTE (Common Table Expression) を用いた例を書いておくと以下のようになります。
この例では、テーブルの定義に declarative_base を使っています。この場合は、 Table を使う場合と異なり、 .c の部分が不要になり、補完も効きやすくなるので、書きやすいかもしれません。
なお、例自体には特に意味はありません。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy import select, or_
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
 
class User(Base):
    __tablename__ = "users"
 
    id = Column(Integer, primary_key=True)
    name = Column(String)
 
 
class Book(Base):
    __tablename__ = "books"
 
    id = Column(Integer, primary_key=True)
    title = Column(String)
    author_id = Column(None, ForeignKey('users.id'))
 
 
books_with_title_a = select(
    Book.title,
    Book.author_id,
).select_from(
    Book,
).where(
    or_(
        Book.title.startswith("A"),
        Book.title.startswith("a"),
    ),
).cte()
 
query = select(
    books_with_title_a.c.title,
    User.name,
).select_from(
    books_with_title_a,
).join(
    User,
    User.id == books_with_title_a.c.author_id,
).order_by(
    books_with_title_a.c.title,
)

Python で記述することで、 CTE 部分を変数として保持でき、それなりに分かりやすく記述できているように思います。

上記の query からは、下記の SQL が生成されます。

1
2
3
4
5
6
WITH anon_1 AS
(SELECT books.title AS title, books.author_id AS author_id
FROM books
WHERE (books.title LIKE 'A' || '%') OR (books.title LIKE 'a' || '%'))
 SELECT anon_1.title, users.name
FROM anon_1 JOIN users ON users.id = anon_1.author_id ORDER BY anon_1.title

snowflake-sqlalchemy

Snowflake でも、上述した SQLAlchemy を使うことができます。

公式のページで使い方がまとめられているので、こちらのページを参考にするのがよいと思います。

なお、 Snowflake の場合、テーブルを指定する際には DATABASE.SCHEMA.TABLE のように、データベース名とスキーマ名でテーブル名を修飾することで、一意にテーブルを指定することができます。
SQLAlchemy で、このようにデータベース名やスキーマ名も含めて修飾したテーブル名を指定する場合、以下のように quoted_name を用いてテーブル名を記述することで、実現できます。

1
2
3
4
5
6
7
8
9
10
11
12
13
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql.elements import quoted_name
 
Base = declarative_base()
 
 
class User(Base):
    # 下記のように、 quoted_name でテーブル名を指定する
    __tablename__ = quoted_name("database.schema.users", False)
 
    id = Column(Integer, primary_key=True)
    name = Column(String)

snowflake-sqlalchemy-json

上述したように、 Snowflake では VARIANT 型のカラムに対しては、 JSON 形式などの値に対して SQL 内から所定の書式でアクセスすることができます。

snowflake-sqlalchemy では、その辺りの処理は 2022年5月時点では実装されていないようだったので、 snowflake-sqlalchemy-json でいくつかを実装してみました。

現時点では読み込みのみしかサポートしておらず、機能も限定的ですが、それなりに使えそうに思うので、このライブラリを使った例を挙げておきます。
ここでは、 Snowflake のアカウント作成時に例として提供されていた SNOWFLAKE_SAMPLE_DATA.WEATHER.DAILY_14_TOTAL テーブルに含まれているデータを読み込む際の例について挙げます。データのサンプルは、末尾に挙げておきます。

VARIANT 型を SQLAlchemy の JSON にマッピングしアクセス

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from sqlalchemy import create_engine, select, Column, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql.elements import quoted_name
import snowflake_sqlalchemy_json
 
snowflake_sqlalchemy_json.register_json_handler()
 
Base = declarative_base()
 
 
class Daily14Total(Base):
    __tablename__ = quoted_name("SNOWFLAKE_SAMPLE_DATA.WEATHER.DAILY_14_TOTAL",
                                False)
 
    T = Column(DateTime, primary_key=True)
    V = Column(JSON)
 
 
query = select(
    Daily14Total.V["city"]["id"],
    Daily14Total.V["city"]["name"],
).select_from(
    Daily14Total,
).where(
    Daily14Total.V["city"]["country"] == "IN"
).limit(10)

上記の query で、 V カラムに入っているデータの中の city キーで取得できるオブジェクト内の要素にアクセスすることができます。
基本的に、 Python の dict と同様のアクセス方法になり、直感的に書けているのが分かると思います。

lateralflatten を用いたアクセス

Snowflake の VARIANT 型へのアクセスで重要な概念である lateral 結合を用いたアクセスについては、下記のように記述できます。
なお、 lateral 結合については結局 LATERAL とは何者なのか?の記事が分かりやすいように思います。

flatten は、あくまで通常の関数なので、 SQLAlchemy で関数を呼び出す際の形式である func.flatten(xxx) として記載します。
また、 lateral は SQLAlchemy がサポートしているので、公式のチュートリアルの通りに呼び出します。
lateral() で返されるテーブルの value カラムにアクセスすると、 flatten で展開された要素にアクセスすることができます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from sqlalchemy import and_, create_engine, func, select, Column, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql.elements import quoted_name
import snowflake_sqlalchemy_json
 
snowflake_sqlalchemy_json.register_json_handler()
 
Base = declarative_base()
 
 
class Daily14Total(Base):
    __tablename__ = quoted_name("SNOWFLAKE_SAMPLE_DATA.WEATHER.DAILY_14_TOTAL",
                                False)
 
    T = Column(DateTime, primary_key=True)
    V = Column(JSON)
 
 
lateral_flatten = func.flatten(Daily14Total.V["data"]).lateral()
query = select(
    Daily14Total.T,
    lateral_flatten.c.value["clouds"],
    lateral_flatten.c.value["deg"],
    lateral_flatten.c.value["rain"],
).select_from(
    Daily14Total,
    lateral_flatten,
).where(
    and_(
        lateral_flatten.c.value["deg"] > 273,
        lateral_flatten.c.value["rain"] > 0,
    ),
).limit(10)

なお、上記は FROM 句に FROM DAILY_14_TOTAL, LATERAL FLATTEN(xxx) のように lateral を記述する方法ですが、この書き方では Warning が出るかもしれません。
その場合、以下のように、 JOIN 句として独立させて書くこともできます。
私はこちらの方が分かりやすいように思います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from sqlalchemy import and_, create_engine, func, select, Column, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql.elements import quoted_name
import snowflake_sqlalchemy_json
 
snowflake_sqlalchemy_json.register_json_handler()
 
Base = declarative_base()
 
 
class Daily14Total(Base):
    __tablename__ = quoted_name("SNOWFLAKE_SAMPLE_DATA.WEATHER.DAILY_14_TOTAL",
                                False)
 
    T = Column(DateTime, primary_key=True)
    V = Column(JSON)
 
 
lateral_flatten = func.flatten(Daily14Total.V["data"]).lateral()
query = select(
    Daily14Total.T,
    lateral_flatten.c.value["clouds"],
    lateral_flatten.c.value["deg"],
    lateral_flatten.c.value["rain"],
).select_from(Daily14Total).join(
    lateral_flatten,
    True,
).where(
    and_(
        lateral_flatten.c.value["deg"] > 273,
        lateral_flatten.c.value["rain"] > 0,
    ),
).limit(10)

用いたデータの形式

最後に、 DAILY_14_TOTAL テーブルの V カラムに入っている JSON データの形式を挙げておきます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
{
    "city": {
        "coord": {
            "lat": 9.7,
            "lon": 78.48333
        },
        "country": "IN",
        "id": 1263965,
        "name": "Manamadurai"
    },
    "data": [{
        "clouds": 80,
        "deg": 277,
        "dt": 1500357600,
        "humidity": 47,
        "pressure": 980.94,
        "speed": 2.13,
        "temp": {
            "day": 303.15,
            "eve": 303.15,
            "max": 303.15,
            "min": 300.13,
            "morn": 303.15,
            "night": 300.13
        },
        "uvi": 12.892,
        "weather": [{
            "description": "broken clouds",
            "icon": "04n",
            "id": 803,
            "main": "Clouds"
        }]
    }, {
        "clouds": 80,
        "deg": 234,
        "dt": 1500444000,
        "humidity": 57,
        "pressure": 982.09,
        "rain": 0.85,
        "speed": 2.9,
        "temp": {
            "day": 302.81,
            "eve": 304.33,
            "max": 304.82,
            "min": 296.22,
            "morn": 296.22,
            "night": 298.93
        },
        "uvi": 12.679,
        "weather": [{
            "description": "light rain",
            "icon": "10d",
            "id": 500,
            "main": "Rain"
        }]
    }, {
        "clouds": 76,
        "deg": 243,
        "dt": 1500530400,
        "humidity": 61,
        "pressure": 980.75,
        "rain": 3,
        "speed": 1.71,
        "temp": {
            "day": 303.18,
            "eve": 304.86,
            "max": 304.86,
            "min": 297.51,
            "morn": 298.36,
            "night": 297.51
        },
        "uvi": 12.986,
        "weather": [{
            "description": "light rain",
            "icon": "10d",
            "id": 500,
            "main": "Rain"
        }]
    }],
    "time":
    1500396663
}

以上のようにすることで、 Snowflake の VARIANT 型へのアクセスを SQLAlchemy を用いて記載することができるようになります。