Python gebruiken om SQL-opdrachten te verbinden en uit te voeren in Azure Cosmos DB for PostgreSQL
VAN TOEPASSING OP: Azure Cosmos DB for PostgreSQL (mogelijk gemaakt door de Citus-database-extensie naar PostgreSQL)
In deze quickstart ziet u hoe u Python-code gebruikt om verbinding te maken met een cluster en SQL-instructies gebruikt om een tabel te maken. Vervolgens voegt u gegevens in de database in, voert u query's uit, werkt u deze bij en verwijdert u deze. In de stappen in dit artikel wordt ervan uitgegaan dat u bekend bent met de ontwikkeling van Python en geen ervaring hebt met het werken met Azure Cosmos DB voor PostgreSQL.
PostgreSQL-bibliotheek installeren
Voor de codevoorbeelden in dit artikel is de psycopg2-bibliotheek vereist. U moet psycopg2 installeren met uw taalpakketbeheer (zoals pip).
Verbinding maken, een tabel maken en gegevens invoegen
In het volgende codevoorbeeld wordt een verbindingsgroep met uw Postgres-database gemaakt. Vervolgens worden cursor.execute-functies gebruikt met SQL CREATE TABLE- en INSERT INTO-instructies om een tabel te maken en gegevens in te voegen.
Tip
In de onderstaande voorbeeldcode wordt een verbindingsgroep gebruikt om verbindingen met PostgreSQL te maken en te beheren. Groepsgewijze verbindingen aan de toepassingszijde wordt sterk aanbevolen omdat:
- Het zorgt ervoor dat de toepassing niet te veel verbindingen met de database genereert en dus geen verbindingslimieten overschrijdt.
- Het kan helpen de prestaties drastisch te verbeteren, zowel latentie als doorvoer. Het PostgreSQL-serverproces moet fork gebruiken om elke nieuwe verbinding te verwerken en het hergebruik van een verbinding voorkomt die overhead.
Vervang in de volgende code <het cluster> door de clusternaam en <het wachtwoord> door het beheerderswachtwoord of het Microsoft Entra ID-token.
Notitie
In dit voorbeeld wordt de verbinding aan het einde gesloten, dus als u de andere voorbeelden in het artikel in dezelfde sessie wilt uitvoeren, neemt u de # Clean up
sectie niet op wanneer u dit voorbeeld uitvoert.
import psycopg2
from psycopg2 import pool
# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
print("Connection pool created successfully")
# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")
# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")
# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")
# Insert some data into the table
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")
# Clean up
conn.commit()
cursor.close()
conn.close()
Wanneer de code wordt uitgevoerd, wordt de volgende uitvoer gegenereerd:
Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data
Tabellen distribueren
Azure Cosmos DB for PostgreSQL biedt u de superkracht van het distribueren van tabellen over meerdere knooppunten voor schaalbaarheid. Met de onderstaande opdracht kunt u een tabel distribueren. Hier vindt u meer informatie over create_distributed_table
en de distributiekolom.
Notitie
Als u tabellen distribueert, kunnen ze groeien over alle werkknooppunten die aan het cluster zijn toegevoegd.
# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")
Gegevens lezen
In het volgende codevoorbeeld worden de volgende API's gebruikt om gegevens uit de database te lezen:
- cursor.execute with the SQL SELECT statement to read data.
- cursor.fetchall() om een query te accepteren en een resultatenset te retourneren om deze te herhalen.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()
# Print all rows
for row in rows:
print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))
Gegevens bijwerken
In het volgende codevoorbeeld wordt de SQL UPDATE-instructie gebruikt cursor.execute
om gegevens bij te werken.
# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")
Gegevens verwijderen
Het volgende codevoorbeeld wordt uitgevoerd cursor.execute
met de SQL DELETE-instructie om de gegevens te verwijderen.
# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")
COPY-opdracht voor snelle opname
De opdracht COPY kan een enorme doorvoer opleveren tijdens het opnemen van gegevens in Azure Cosmos DB for PostgreSQL. Met de opdracht COPY kunt u gegevens opnemen in bestanden of uit microbatches met gegevens in het geheugen voor realtime opname.
De opdracht COPY om gegevens uit een bestand te laden
Met de volgende code worden gegevens uit een CSV-bestand gekopieerd naar een databasetabel. Voor de code is het bestand pharmacies.csv vereist.
with open('pharmacies.csv', 'r') as f:
# Notice that we don't need the `csv` module.
next(f) # Skip the header row.
cursor.copy_from(f, 'pharmacy', sep=',')
print("copying data completed")
COPY-opdracht voor het laden van in-memory gegevens
Met de volgende code worden gegevens in het geheugen gekopieerd naar een tabel.
data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "pharmacy", sep=",")
conn.commit()
conn.close()
App-nieuwe poging voor mislukte databaseaanvragen
Het is soms mogelijk dat databaseaanvragen van uw toepassing mislukken. Dergelijke problemen kunnen zich voordoen in verschillende scenario's, zoals netwerkfouten tussen app en database, onjuist wachtwoord, enzovoort. Sommige problemen kunnen tijdelijk zijn en zichzelf binnen enkele seconden tot minuten oplossen. U kunt logica voor opnieuw proberen in uw app configureren om de tijdelijke fouten op te lossen.
Het configureren van logica voor opnieuw proberen in uw app helpt de eindgebruikerservaring te verbeteren. In scenario's met fouten wachten gebruikers slechts wat langer totdat de toepassing aanvragen verwerkt, in plaats van fouten te ervaren.
In het onderstaande voorbeeld ziet u hoe u logica voor opnieuw proberen implementeert in uw app. Het voorbeeldcodefragment probeert elke 60 seconden (maximaal vijf keer) een databaseaanvraag uit te voeren totdat dit lukt. Het aantal en de frequentie van nieuwe pogingen kunnen worden geconfigureerd op basis van de behoeften van uw toepassing.
Vervang in deze code het cluster> door <uw clusternaam en <wachtwoord> door het beheerderswachtwoord.
import psycopg2
import time
from psycopg2 import pool
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)
def executeRetry(query, retryCount):
for x in range(retryCount):
try:
if (postgreSQL_pool):
# Use getconn() to Get Connection from connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
break
except Exception as err:
print(err)
postgreSQL_pool.putconn(conn)
time.sleep(60)
return None
print(executeRetry("select 1", 5))
Volgende stappen
- Bekijk hoe de Azure Cosmos DB for PostgreSQL-API PostgreSQL uitbreidt en nuttige diagnostische query's probeert uit te proberen
- Kies de beste clustergrootte voor uw workload
- Cluster-prestaties bewaken