Compartir vía


Uso de Python para conectarse y ejecutar comandos SQL en Azure Cosmos DB for PostgreSQL

SE APLICA A: Azure Cosmos DB for PostgreSQL (con tecnología de la extensión de base de datos de Citus en PostgreSQL)

En este inicio rápido se muestra cómo usar el código de Python para conectarse a un clúster y usar instrucciones SQL para crear una tabla. A continuación, insertará, consultará, actualizará y eliminará datos de la base de datos. En los pasos de este artículo se da por hecho que está familiarizado con el desarrollo en Python, pero que nunca ha trabajado con Azure Cosmos DB for PostgreSQL.

Instalación de la biblioteca de PostgreSQL

Los ejemplos de código de este artículo requieren la biblioteca psycopg2. Deberá instalar psycopg2 con el administrador de paquetes de idioma (por ejemplo, pip).

Conexión, creación de una tabla e inserción de datos

En el ejemplo de código siguiente se crea un grupo de conexiones a la base de datos de Postgres. A continuación, usa funciones cursor.execute con instrucciones SQL CREATE TABLE e INSERT INTO para crear una tabla e insertar datos.

Sugerencia

El siguiente código de ejemplo usa un grupo de conexiones para crear y administrar las conexiones a PostgreSQL. Se recomienda encarecidamente la agrupación de conexiones en el lado de la aplicación porque:

  • Garantiza que la aplicación no genere demasiadas conexiones a la base de datos, lo que evita que se superen los límites de conexiones.
  • Puede ayudar a mejorar drásticamente el rendimiento, tanto la latencia como el procesamiento. El proceso del servidor PostgreSQL debe bifurcarse para controlar cada nueva conexión y reutilizar una conexión evita esa sobrecarga.

En el código siguiente, reemplace <clúster> por el nombre del clúster y <contraseña> por la contraseña de administrador o el token de Microsoft Entra ID.

Nota:

En este ejemplo se cierra la conexión al final, por lo que si desea ejecutar los otros ejemplos del artículo de la misma sesión, no incluya la sección # Clean up al ejecutar este ejemplo.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Cuando el código se ejecuta correctamente, genera el siguiente resultado:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Distribución de tablas

Azure Cosmos DB for PostgreSQL le proporciona la habilidad de distribuir tablas entre varios nodos para mejorar la escalabilidad. El uso del siguiente comando le permitirá distribuir una tabla. Puede obtener más información sobre create_distributed_table y la columna de distribución aquí.

Nota

La distribución de tablas les permite crecer en todos los nodos de trabajo que se han agregado al clúster.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Lectura de datos

En el ejemplo de código siguiente se usan las siguientes API para leer datos de la base de datos:

  • cursor.execute con la instrucción SQL SELECT para leer los datos.
  • cursor.fetchall() para aceptar una consulta y devolver un conjunto de resultados que iterar.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Actualización de datos

En el ejemplo de código siguiente se usa cursor.execute con la instrucción SQL UPDATE para actualizar los datos.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Eliminación de datos

En el siguiente ejemplo de código se usa la API cursor.execute con la instrucción SQL DELETE para eliminar los datos.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

Comando COPY para llevar a cabo una ingesta rápida

El comando COPY es capaz de conseguir un rendimiento enorme cuando ingiere datos en Azure Cosmos DB for PostgreSQL. El comando COPY puede ingerir datos ubicados en archivos o microprocesos de datos ubicados en memoria durante un proceso de ingesta en tiempo real.

Uso del comando COPY para cargar datos desde un archivo

El código siguiente copia datos de un archivo .csv a una tabla de base de datos. El código requiere el archivo pharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

Comando COPY para cargar datos en memoria

El código siguiente copia en una tabla los datos en memoria.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Reintento de la aplicación para errores de solicitud de la base de datos

A veces es posible que las solicitudes de base de datos de la aplicación produzcan un error. Estos problemas pueden producirse en diferentes escenarios, como errores de red entre la aplicación y la base de datos, contraseñas incorrectas, etc. Algunos problemas pueden ser transitorios y resolverse por sí mismos en unos segundos o minutos. Puede configurar la lógica de reintento en la aplicación para resolver los errores transitorios.

Configurar la lógica de reintento en la aplicación ayuda a mejorar la experiencia del usuario final. En escenarios de error, los usuarios simplemente esperarán un poco más para que la aplicación atienda las solicitudes, en lugar de experimentar errores.

En el ejemplo siguiente se muestra cómo implementar la lógica de reintento en la aplicación. El fragmento de código de ejemplo intenta una solicitud de base de datos cada 60 segundos (hasta cinco veces) hasta que se realiza correctamente. El número y la frecuencia de los reintentos se pueden configurar en función de las necesidades de la aplicación.

En este código, reemplace el <clúster> por el nombre del clúster y la <contraseña> por la contraseña de administrador.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Pasos siguientes