Verwenden von Python zum Herstellen einer Verbindung und zum Ausführen von SQL-Befehlen in Azure Cosmos DB for PostgreSQL
GILT FÜR: Azure Cosmos DB for PostgreSQL (unterstützt von der Citus-Datenbankerweiterung auf PostgreSQL)
In dieser Schnellstartanleitung erfahren Sie, wie Sie mithilfe von Python-Code eine Verbindung mit einem Cluster herstellen und SQL-Anweisungen zum Erstellen einer Tabelle verwenden. Anschließend werden Sie Daten in die Datenbank einfügen, abfragen, aktualisieren und löschen. Für die Anleitung in diesem Artikel benötigen Sie Kenntnisse in der Entwicklung in Python. Es ist jedoch keine Erfahrung im Umgang mit Azure Cosmos DB for PostgreSQL erforderlich.
Installieren der PostgreSQL-Bibliothek
Für die Codebeispiele in diesem Artikel ist die psycopg2-Bibliothek erforderlich. Sie müssen psycopg2 mit Ihrem Sprachpaket-Manager (z. B. pip) installieren.
Herstellen einer Verbindung, Erstellen einer Tabelle und Einfügen von Daten
Das folgende Codebeispiel erstellt einen Verbindungspool zu Ihrer Postgres-Datenbank. Anschließend werden cursor.execute-Funktionen mit den SQL-Anweisungen CREATE TABLE und INSERT INTO verwendet, um eine Tabelle zu erstellen und Daten einzufügen.
Tipp
Der folgende Beispielcode verwendet einen Verbindungspool, um Verbindungen zu PostgreSQL zu erstellen und zu verwalten. Anwendungsseitiges Verbindungspooling wird dringend empfohlen, weil:
- Es stellt sicher, dass die Anwendung nicht zu viele Verbindungen zur Datenbank herstellt und somit ein Überschreiten der Verbindungslimits vermieden wird.
- Sie kann dazu beitragen, die Leistung drastisch zu verbessern - sowohl die Latenzzeit als auch den Durchsatz. Der PostgreSQL-Serverprozess muss für jede neue Verbindung einen Fork ausführen, und die Wiederverwendung einer Verbindung vermeidet diesen Overhead.
Ersetzen Sie im folgenden Code <Cluster> durch Ihren Clusternamen und <Passwort> durch Ihr Administratorpasswort oder Microsoft Entra ID-Token.
Hinweis
In diesem Beispiel wird die Verbindung am Ende geschlossen. Wenn Sie also die anderen Stichproben im Artikel in derselben Sitzung ausführen möchten, sollten Sie den Abschnitt # Clean up
beim Ausführen dieser Stichprobe nicht einschließen.
import psycopg2
from psycopg2 import pool
# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
print("Connection pool created successfully")
# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")
# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")
# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")
# Insert some data into the table
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")
# Clean up
conn.commit()
cursor.close()
conn.close()
Nach erfolgreicher Ausführung generiert der Code die folgende Ausgabe:
Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data
Verteilen von Tabellen
Mit Azure Cosmos DB for PostgreSQL können Sie Tabellen knotenübergreifend verteilen, um Skalierbarkeit zu ermöglichen. Mit dem folgenden Befehl können Sie eine Tabelle verteilen. Weitere Informationen zu create_distributed_table
und zur Verteilungsspalte finden Sie hier.
Hinweis
Verteilte Tabellen werden auf alle Workerknoten platziert, die dem Cluster hinzugefügt werden.
# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")
Lesen von Daten
Im folgenden Codebeispiel werden die folgenden APIs zum Lesen von Daten aus der Datenbank verwendet:
- cursor.execute mit der SQL-Anweisung SELECT zum Lesen der Daten
- cursor.fetchall() akzeptiert eine Abfrage und gibt ein Resultset zurück, das durchlaufen werden kann.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()
# Print all rows
for row in rows:
print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))
Aktualisieren von Daten
Im folgenden Codebeispiel wird cursor.execute
mit der SQL-Anweisung UPDATE zum Aktualisieren der Daten verwendet.
# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")
Löschen von Daten
Im folgenden Codebeispiel wird cursor.execute
mit der DELETE-SQL-Anweisung zum Löschen der Daten verwendet.
# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")
COPY-Befehl für eine schnelle Datenerfassung
Der COPY-Befehl kann bei der Erfassung von Daten in Azure Cosmos DB for PostgreSQL einen enormen Durchsatz erzielen. Der COPY-Befehl kann Daten in Dateien erfassen oder Mikrobatches von Daten im Arbeitsspeicher in Echtzeit erfassen.
COPY-Befehl zum Laden von Daten aus einer Datei
Mit dem folgenden Code werden Daten aus einer CSV-Datei in eine Datenbanktabelle kopiert. Für den Code ist die Datei pharmacies.csv erforderlich.
with open('pharmacies.csv', 'r') as f:
# Notice that we don't need the `csv` module.
next(f) # Skip the header row.
cursor.copy_from(f, 'pharmacy', sep=',')
print("copying data completed")
COPY-Befehl zum Laden von Daten im Arbeitsspeicher
Mit dem folgenden Code werden speicherinterne Daten in eine Tabelle kopiert.
data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "pharmacy", sep=",")
conn.commit()
conn.close()
App-Wiederholung bei Datenbankanforderungsfehlern
Manchmal passiert es, dass Datenbankanforderungen an Ihre Anwendung fehlschlagen. Solche Probleme können in verschiedenen Szenarien auftreten, z. B. bei Netzwerkfehlern zwischen App und Datenbank, bei einem falschen Kennwort usw. Manche Probleme sind temporär und lassen sich in wenigen Sekunden bis Minuten lösen. Sie können die Retry-Logik in Ihrer App konfigurieren, um die vorübergehenden Fehler zu bewältigen.
Durch das Konfigurieren der Retry-Logik in Ihrer App lässt sich die Benutzerfreundlichkeit für den Endbenutzers verbessern. In einem Fehlerszenario wartet der Benutzer lediglich etwas länger, bis die Anwendung Anforderungen ausgibt und keine Fehler mehr auftreten.
Im nachfolgenden Beispiel sehen Sie, wie Sie die Retry-Logik in Ihrer App implementieren. Der Beispielcodeschnipsel versucht es alle 60 Sekunden mit einer Datenbankanforderung (bis zu fünf Mal), bis er erfolgreich ist. Die Anzahl und Häufigkeit der Retries kann basierend auf den Anforderungen Ihrer Anwendung konfiguriert werden.
Ersetzen Sie in diesem Code <cluster> durch den Namen Ihres Clusters und <password> durch Ihr Administratorkennwort.
import psycopg2
import time
from psycopg2 import pool
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)
def executeRetry(query, retryCount):
for x in range(retryCount):
try:
if (postgreSQL_pool):
# Use getconn() to Get Connection from connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
break
except Exception as err:
print(err)
postgreSQL_pool.putconn(conn)
time.sleep(60)
return None
print(executeRetry("select 1", 5))
Nächste Schritte
- Hier erfahren Sie, wie PostgreSQL durch die Azure Cosmos DB for PostgreSQL-API erweitert wird, und lernen nützliche Diagnoseabfragen kennen.
- Auswählen der optimalen Clustergröße für Ihre Workload
- Überwachen der Clusterleistung