次の方法で共有


Python を使用し、Azure Cosmos DB for PostgreSQL に接続して SQL コマンドを実行する

適用対象: Azure Cosmos DB for PostgreSQL (PostgreSQL の Citus データベース拡張機能を利用)

このクイックスタートでは、Python コードを使用してクラスターに接続する方法と、SQL ステートメントを使用してテーブルを作成する方法について説明します。 その後で、データベース内のデータの挿入、クエリ、更新、削除を実行します。 この記事内のステップでは、Python の開発には慣れているものの、Azure Cosmos DB for PostgreSQL を使用するのは初めてというユーザーを対象とします。

PostgreSQL ライブラリをインストールする

この記事のコード例では、psycopg2 ライブラリが必要です。 言語パッケージ マネージャー (pip など) を使用して psycopg2 をインストールする必要があります。

接続、テーブルの作成、およびデータの挿入

次のコード例では、Postgres データベースへの接続プールを作成します。 続いて、cursor.execute 関数と SQL CREATE TABLE ステートメントおよび INSERT INTO ステートメントを併用し、テーブルを作成してデータを挿入します。

ヒント

下のサンプル コードでは、接続プールを使用して PostgreSQL への接続を作成および管理します。 次の理由から、アプリケーション側の接続プールを強くお勧めします。

  • アプリケーションがデータベースへの接続を生成しすぎないようにするため、接続制限の超過を回避できます。
  • 待機時間とスループットの両方のパフォーマンスを大幅に向上させるのに役立ちます。 PostgreSQL サーバー プロセスでは、新しい各接続を処理するためにフォークする必要があり、接続を再利用すると、そのオーバーヘッドが回避されます。

以下のコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードまたは Microsoft Entra ID トークンに置き換えます。

Note

この例では、最後に接続を閉じます。そのため、同じセッションでこの記事の他のサンプルを実行しようとしている場合は、このサンプルの実行時に # Clean up セクションを含めないでください。

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

コードが正常に実行されると、次の出力が生成されます。

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

テーブルの分散

Azure Cosmos DB for PostgreSQL は、スケーラビリティを確保するために複数のノード全体にテーブルを分散させる優れた能力をユーザーに提供するものです。 テーブルを分散させるには、次のコマンドを使用します。 create_distributed_table および分散列の詳細については、こちらを参照してください。

注意

テーブルを分散させると、クラスターに追加されたすべてのワーカー ノード全体にテーブルを拡張することができます。

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

データの読み取り

以下のコード例では、次の API を使用してデータベースからデータを読み取ります。

  • cursor.execute と SQL SELECT ステートメントを使用してデータを読み取ります。
  • cursor.fetchall() を使用して、クエリを受け取り、反復処理する結果セットを返します。
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

データの更新

次のコード例では、cursor.execute と SQL UPDATE ステートメントを併用してデータを更新します。

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

データの削除

次のコード例では、cursor.execute と SQL DELETE ステートメントを実行してデータを削除します。

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

高速取り込み用の COPY コマンド

COPY コマンドでは、Azure Cosmos DB for PostgreSQL にデータを取り込みながら、驚異的なスループットを達成できます。 COPY コマンドでは、ファイル内のデータを取り込んだり、メモリ内データのマイクロバッチからデータを取り込んだりすることで、リアルタイム インジェストを実現できます。

ファイルからデータを読み込む COPY コマンド

次のコードは、CSV ファイルからデータベース テーブルにデータをコピーします。 コードに pharmacies.csv というファイルが必要になります。

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

メモリ内データを読み込むための COPY コマンド

次のコードは、メモリ内データをテーブルにコピーします。

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

データベース要求の失敗によるアプリの再試行

アプリケーションからのデータベース要求が失敗する場合があります。 このような問題は、アプリとデータベース間のネットワーク障害や、パスワードの誤りなど、さまざまなシナリオで発生することがあります。一部の問題は一時的なもので、数秒から数分で自然に解決されます。 一時的なエラーを克服するために、アプリで再試行ロジックを構成することができます。

アプリで再試行ロジックを構成すると、エンド ユーザー エクスペリエンスの向上に役立ちます。 障害シナリオでは、ユーザーはエラーを経験するかわりに、アプリケーションで要求が処理されるまで少し長く待つことになります。

次の例は、アプリに再試行ロジックを実装する方法を示しています。 サンプル コード スニペットでは、成功するまで 60 秒ごとに (最大 5 回) データベース要求を試行します。 再試行の回数と頻度はアプリケーションのニーズに基づいて構成できます。

このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

次のステップ