Python 用 Azure Schema Registry Avro Encoder クライアント ライブラリ - バージョン 1.0.0
Azure Schema Registry は、スキーマ ストレージ、バージョン管理、管理を提供する、Azure Event Hubsによってホストされるスキーマ リポジトリ サービスです。 このパッケージは、スキーマ レジストリ スキーマ識別子と Avro でエンコードされたコンテンツを含むペイロードをエンコードおよびデコードできる Avro エンコーダーを提供します。
ソースコード | パッケージ (PyPi) | API リファレンス ドキュメント | サンプル | Changelog
免責事項
Python 2.7 の Azure SDK Python パッケージのサポートは、2022 年 1 月 1 日に終了しました。 詳細と質問については、https://github.com/Azure/azure-sdk-for-python/issues/20691 を参照してください
作業の開始
パッケージをインストールする
pip を使用して Python 用 Azure Schema Registry Avro Encoder クライアント ライブラリをインストールします。
pip install azure-schemaregistry-avroencoder
前提条件:
このパッケージを使用するには、次が必要です。
- Azure サブスクリプション - 無料アカウントを作成できます
- Azure Schema Registry - Azure portalを使用してスキーマ レジストリ グループを作成するためのクイックスタート ガイドを次に示します。
- Python 3.6 以降 - Python をインストールする
クライアントを認証する
スキーマ レジストリ Avro Encoder との対話は、まず AvroEncoder クラスのインスタンスから始まり、スキーマ グループ名と スキーマ レジストリ クライアント クラスを受け取ります。 クライアント コンストラクターは、Event Hubs の完全修飾名前空間と Azure Active Directory 資格情報を受け取ります。
スキーマ レジストリ インスタンスの完全修飾名前空間は、 の形式
<yournamespace>.servicebus.windows.net
に従う必要があります。TokenCredential プロトコルを実装する AAD 資格情報をコンストラクターに渡す必要があります。 azure-identity パッケージで使用できるプロトコルの実装
TokenCredential
があります。 でazure-identity
提供される資格情報の種類を使用するには、 pip を使用して Python 用の Azure Identity クライアント ライブラリをインストールしてください。
pip install azure-identity
- さらに、非同期 API を使用するには、まず、 aiohttp などの非同期トランスポートをインストールする必要があります。
pip install aiohttp
azure-schemaregistry ライブラリを使用して AvroEncoder を作成します。
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
主要な概念
AvroEncoder
Avro Binary Encoding にエンコードしてデコードする API と、スキーマ ID を持つコンテンツ タイプを提供します。 SchemaRegistryClient を使用して、スキーマコンテンツからスキーマ ID を取得するか、またはその逆を行います。
サポートされているメッセージ モデル
との相互運用性のために、特定の Azure Messaging SDK モデル クラスにサポートが AvroEncoder
追加されました。 これらのモデルは、 名前空間で定義されている MessageType
プロトコルの azure.schemaregistry.encoder.avroencoder
サブタイプです。 現在、サポートされているモデル クラスは次のとおりです。
azure-eventhub>=5.9.0
のazure.eventhub.EventData
メッセージの形式
MessageType プロトコルに従うメッセージの種類をエンコード用にエンコーダーに提供すると、対応するコンテンツとコンテンツ タイプのプロパティが設定されます。ここで、次のようになります。
content
: Avro ペイロード (一般に、形式固有のペイロード)- Avro バイナリ エンコード
- NOT Avro Object Container File。スキーマが含まれており、メッセージ ペイロードからスキーマをスキーマ レジストリに移動するために、このエンコーダーの目的を無効にします。
content type
: という形式avro/binary+<schema ID>
の文字列。ここで、avro/binary
は書式インジケーターです<schema ID>
は、GUID の 16 進数表現で、スキーマ レジストリ サービスの文字列と同じ形式とバイト順です。
がメッセージの種類として渡された場合 EventData
、オブジェクトに次のプロパティが EventData
設定されます。
- プロパティは
body
コンテンツ値に設定されます。 - プロパティは
content_type
コンテンツ タイプの値に設定されます。
メッセージの種類が指定されていない場合、エンコーダーは既定で次の dict を作成します。 {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
例
次のセクションでは、次のような最も一般的なスキーマ レジストリ タスクの一部をカバーするいくつかのコード スニペットを示します。
エンコード
メソッドを AvroEncoder.encode
使用して、指定された Avro スキーマでコンテンツをエンコードします。
メソッドは、以前にスキーマ レジストリ サービスに登録されたスキーマを使用し、後でエンコードを使用するためにスキーマをキャッシュに保持します。 スキーマをサービスに事前登録し、 メソッドに自動的に登録 encode
しないようにするには、キーワード引数 auto_register=True
をコンストラクターに AvroEncoder
渡す必要があります。
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
デコード
メソッドを AvroEncoder.decode
使用して、Avro でエンコードされたコンテンツを次のいずれかの方法でデコードします。
- MessageType プロトコルのサブタイプであるメッセージ オブジェクトを渡します。
- キー
content
(型バイト) と (型文字列) を使用してcontent_type
dict を渡します。 メソッドは、スキーマ レジストリ サービスからスキーマを自動的に取得し、将来のデコードの使用のためにスキーマをキャッシュに保持します。
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
統合を送信する Event Hubs
Event Hubs と統合して、EventData
Avro でエンコードされたコンテンツとbody
対応する に設定された オブジェクトを送信しますcontent_type
。
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
統合を受け取る Event Hubs
Event Hubs と統合してオブジェクトをEventData
受信し、Avro でエンコードされたbody
値をデコードします。
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
トラブルシューティング
全般
Azure Schema Registry Avro Encoder は、スキーマ レジストリ サービスとの通信時にエラーが発生した場合に 、Azure Core で定義されている例外を発生させます。 無効なコンテンツ/コンテンツ タイプと無効なスキーマに関連するエラーはそれぞれ と azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
としてazure.schemaregistry.encoder.avroencoder.InvalidContentError
発生します。このエラー__cause__
には、Apache Avro ライブラリによって発生した基になる例外が含まれます。
ログの記録
このライブラリでは、ログ記録に標準 のログ ライブラリが使用されます。 HTTP セッションに関する基本情報 (URL、ヘッダーなど) は INFO レベルでログに記録されます。
要求/応答本文、未変換ヘッダーなど、詳細な DEBUG レベルのログは、 引数を使用してクライアントで logging_enable
有効にすることができます。
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
同様に、logging_enable
は、詳細なログ記録がクライアントで有効になっていない場合でも、1 回の操作のために有効にすることができます。
encoder.encode(dict_content, schema=definition, logging_enable=True)
次のステップ
その他のサンプル コード
一般的な Azure Schema Registry Avro Encoder シナリオを示すその他の例については、 サンプル ディレクトリを参照してください。
共同作成
このプロジェクトでは、共同作成と提案を歓迎しています。 ほとんどの共同作成では、共同作成者使用許諾契約書 (CLA) にご同意いただき、ご自身の共同作成内容を使用する権利を Microsoft に供与する権利をお持ちであり、かつ実際に供与することを宣言していただく必要があります。 詳細については、 https://cla.microsoft.com を参照してください。
pull request を送信すると、CLA を提供して PR (ラベル、コメントなど) を適宜装飾する必要があるかどうかを CLA ボットが自動的に決定します。 ボットによって提供される手順にそのまま従ってください。 この操作は、Microsoft の CLA を使用するすべてのリポジトリについて、1 回だけ行う必要があります。
このプロジェクトでは、Microsoft オープン ソースの倫理規定を採用しています。 詳しくは、「Code of Conduct FAQ (倫理規定についてよくある質問)」を参照するか、opencode@microsoft.com 宛てに質問またはコメントをお送りください。
Azure SDK for Python