使用 C# 在 Azure Cosmos DB for PostgreSQL 上連線和執行 SQL 命令
適用於: Azure Cosmos DB for PostgreSQL (由 PostgreSQL 的超大規模 (Citus) 資料庫延伸模組提供)
此快速入門說明如何使用 C# 程式碼來連線到叢集,並使用 SQL 陳述式來建立資料表。 接著,您會在資料庫中插入、查詢、更新和刪除資料。 此文章中的步驟假設您已熟悉 C# 開發,但不熟悉使用 Azure Cosmos DB for PostgreSQL。
安裝 PostgreSQL 程式庫
此文章中的程式碼範例需要 Npgsql (英文) 程式庫。 您必須使用語言套件管理員安裝 Npgsql (例如 Visual Studio 中的 NuGet。)
連線、建立資料表及插入資料
我們將使用 CREATE TABLE 和 INSERT INTO SQL 陳述式連線到叢集並載入資料。 程式碼會使用這些 NpgsqlCommand
類別方法:
- Open() (英文) 以建立與 Azure Cosmos DB for PostgreSQL 的連線
- CreateCommand(),用以設定 CommandText 屬性
- ExecuteNonQuery() (英文) 以執行資料庫命令
提示
以下範例程式碼使用連線集區來建立及管理與 PostgreSQL 的連線。 我們強烈建議使用應用程式端連線共用,原因如下:
- 可確保應用程式不會產生過多資料庫連線,進而避免超過連線限制。
- 可協助大幅改善效能,包括延遲和輸送量。 PostgreSQL 伺服器處理序必須進行派生才能處理每個新連線,而重複使用連線可避免派生帶來的負擔。
在下列程式代碼中,以您的系統管理員密碼取代 <叢集> 名稱和 <密碼> ,或Microsoft Entra ID 令牌。
using System;
using Npgsql;
namespace Driver
{
public class AzurePostgresCreate
{
static void Main(string[] args)
{
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
connStr.TrustServerCertificate = true;
using (var conn = new NpgsqlConnection(connStr.ToString()))
{
Console.Out.WriteLine("Opening connection");
conn.Open();
using (var command = new NpgsqlCommand("DROP TABLE IF EXISTS pharmacy;", conn))
{
command.ExecuteNonQuery();
Console.Out.WriteLine("Finished dropping table (if existed)");
}
using (var command = new NpgsqlCommand("CREATE TABLE pharmacy (pharmacy_id integer ,pharmacy_name text,city text,state text,zip_code integer);", conn))
{
command.ExecuteNonQuery();
Console.Out.WriteLine("Finished creating table");
}
using (var command = new NpgsqlCommand("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);", conn))
{
command.ExecuteNonQuery();
Console.Out.WriteLine("Finished creating index");
}
using (var command = new NpgsqlCommand("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (@n1, @q1, @a, @b, @c)", conn))
{
command.Parameters.AddWithValue("n1", 0);
command.Parameters.AddWithValue("q1", "Target");
command.Parameters.AddWithValue("a", "Sunnyvale");
command.Parameters.AddWithValue("b", "California");
command.Parameters.AddWithValue("c", 94001);
int nRows = command.ExecuteNonQuery();
Console.Out.WriteLine(String.Format("Number of rows inserted={0}", nRows));
}
}
Console.WriteLine("Press RETURN to exit");
Console.ReadLine();
}
}
}
散發資料表
Azure Cosmos DB for PostgreSQL 可提供您在多個節點之間散發資料表的強大功能,以取得可擴縮性。 使用下列程式碼來散發資料表。 您可以在散發資料行 (也稱為分區索引鍵) 中深入了解 create_distributed_table
和散發資料行。
注意
散發資料表可讓其在新增至叢集的任何背景工作節點上成長。
在下列程式碼中,以您的叢集名稱取代 <cluster>,並以管理員密碼取代 <password>。
using System;
using Npgsql;
namespace Driver
{
public class AzurePostgresCreate
{
static void Main(string[] args)
{
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = {your password}; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50");
connStr.TrustServerCertificate = true;
using (var conn = new NpgsqlConnection(connStr.ToString()))
{
Console.Out.WriteLine("Opening connection");
conn.Open();
using (var command = new NpgsqlCommand("select create_distributed_table('pharmacy','pharmacy_id');", conn))
{
command.ExecuteNonQuery();
Console.Out.WriteLine("Finished distributing the table");
}
}
Console.WriteLine("Press RETURN to exit");
Console.ReadLine();
}
}
}
讀取資料
使用下列程式碼搭配 SELECT SQL 陳述式來連線和讀取資料。 程式碼會使用這些 NpgsqlCommand
類別方法:
- Open() (英文) 以建立與 Azure Cosmos DB for PostgreSQL 的連線。
- CreateCommand() 和 ExecuteReader() 執行資料庫命令。
- Read() 前往結果中的記錄。
- GetInt32() 和 GetString() 剖析記錄中的值。
在下列程式碼中,以您的叢集名稱取代 <cluster>,並以管理員密碼取代 <password>。
using System;
using Npgsql;
namespace Driver
{
public class read
{
static void Main(string[] args)
{
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
connStr.TrustServerCertificate = true;
using (var conn = new NpgsqlConnection(connStr.ToString()))
{
Console.Out.WriteLine("Opening connection");
conn.Open();
using (var command = new NpgsqlCommand("SELECT * FROM pharmacy", conn))
{
var reader = command.ExecuteReader();
while (reader.Read())
{
Console.WriteLine(
string.Format(
"Reading from table=({0}, {1}, {2}, {3}, {4})",
reader.GetInt32(0).ToString(),
reader.GetString(1),
reader.GetString(2),
reader.GetString(3),
reader.GetInt32(4).ToString()
)
);
}
reader.Close();
}
}
Console.WriteLine("Press RETURN to exit");
Console.ReadLine();
}
}
}
更新資料
使用 UPDATE SQL 陳述式並搭配使用下列程式碼以連接和更新資料。 在程式碼中,以您的叢集名稱取代 <cluster>,並以您的系統管理員密碼取代 <password>。
using System;
using Npgsql;
namespace Driver
{
public class AzurePostgresUpdate
{
static void Main(string[] args)
{
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
connStr.TrustServerCertificate = true;
using (var conn = new NpgsqlConnection(connStr.ToString()))
{
Console.Out.WriteLine("Opening connection");
conn.Open();
using (var command = new NpgsqlCommand("UPDATE pharmacy SET city = @q WHERE pharmacy_id = @n", conn))
{
command.Parameters.AddWithValue("n", 0);
command.Parameters.AddWithValue("q", "guntur");
int nRows = command.ExecuteNonQuery();
Console.Out.WriteLine(String.Format("Number of rows updated={0}", nRows));
}
}
Console.WriteLine("Press RETURN to exit");
Console.ReadLine();
}
}
}
刪除資料
使用 DELETE SQL 陳述式並搭配使用下列程式碼以連接和刪除資料。 在程式碼中,以您的叢集名稱取代 <cluster>,並以您的系統管理員密碼取代 <password>。
using System;
using Npgsql;
namespace Driver
{
public class AzurePostgresDelete
{
static void Main(string[] args)
{
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = {your password}; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
connStr.TrustServerCertificate = true;
using (var conn = new NpgsqlConnection(connStr.ToString()))
{
Console.Out.WriteLine("Opening connection");
conn.Open();
using (var command = new NpgsqlCommand("DELETE FROM pharmacy WHERE pharmacy_id = @n", conn))
{
command.Parameters.AddWithValue("n", 0);
int nRows = command.ExecuteNonQuery();
Console.Out.WriteLine(String.Format("Number of rows deleted={0}", nRows));
}
}
Console.WriteLine("Press RETURN to exit");
Console.ReadLine();
}
}
}
適用於快速擷取的 COPY 命令
COPY 命令會在將資料內嵌至 Azure Cosmos DB for PostgreSQL 時產生極大的輸送量。 COPY 命令可以在檔案中擷取資料,或從記憶體中的微批次資料擷取以進行即時擷取。
用來從檔案載入資料的 COPY 命令
下列範例程式碼會將資料從 CSV 檔案複製到資料庫資料表。
程式碼範例需要您將 pharmacies.csv 檔案置於您的 [文件] 資料夾中。 在程式碼中,以您的叢集名稱取代 <cluster>,並以您的系統管理員密碼取代 <password>。
using Npgsql;
public class csvtotable
{
static void Main(string[] args)
{
String sDestinationSchemaAndTableName = "pharmacy";
String sFromFilePath = "C:\\Users\\Documents\\pharmacies.csv";
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
connStr.TrustServerCertificate = true;
NpgsqlConnection conn = new NpgsqlConnection(connStr.ToString());
NpgsqlCommand cmd = new NpgsqlCommand();
conn.Open();
if (File.Exists(sFromFilePath))
{
using (var writer = conn.BeginTextImport("COPY " + sDestinationSchemaAndTableName + " FROM STDIN WITH(FORMAT CSV, HEADER true,NULL ''); "))
{
foreach (String sLine in File.ReadAllLines(sFromFilePath))
{
writer.WriteLine(sLine);
}
}
Console.WriteLine("csv file data copied sucessfully");
}
}
}
用來載入記憶體內部資料的 COPY 命令
下列範例程式碼會將記憶體內部資料複製到資料表。 在程式碼中,以您的叢集名稱取代 <cluster>,並以您的系統管理員密碼取代 <password>。
using Npgsql;
using NpgsqlTypes;
namespace Driver
{
public class InMemory
{
static async Task Main(string[] args)
{
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
connStr.TrustServerCertificate = true;
using (var conn = new NpgsqlConnection(connStr.ToString()))
{
conn.Open();
var text = new dynamic[] { 0, "Target", "Sunnyvale", "California", 94001 };
using (var writer = conn.BeginBinaryImport("COPY pharmacy FROM STDIN (FORMAT BINARY)"))
{
writer.StartRow();
foreach (var item in text)
{
writer.Write(item);
}
writer.Complete();
}
Console.WriteLine("in-memory data copied sucessfully");
}
}
}
}
適用於資料庫要求失敗的應用程式重試
有時候,來自您應用程式的資料庫要求可能會失敗。 這類問題可能在不同的情況下發生,例如應用程式與資料庫之間的網路失敗、密碼不正確等。有些問題可能是暫時性的,而且會在幾秒到幾分鐘內自行解決。 您可以在應用程式中設定重試邏輯,以克服暫時性錯誤。
在應用程式中設定重試邏輯有助於改善使用者體驗。 在失敗案例中,使用者只會多等一些時間讓應用程式服務要求,而不會遇到錯誤。
下列範例示範如何在應用程式中實作重試邏輯。 範例程式碼片段會每隔 60 秒嘗試一次資料庫要求 (最多五次) 直到成功為止。 您可以根據應用程式的需求來設定重試次數和頻率。
在此程式碼中,以叢集名稱取代 <cluster>,並以管理員密碼取代 <password>。
using System;
using System.Data;
using System.Runtime.InteropServices;
using System.Text;
using Npgsql;
namespace Driver
{
public class Reconnect
{
// Replace <cluster> with your cluster name and <password> with your password:
static string connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50;TrustServerCertificate = true").ToString();
static string executeRetry(string sql, int retryCount)
{
for (int i = 0; i < retryCount; i++)
{
try
{
using (var conn = new NpgsqlConnection(connStr))
{
conn.Open();
DataTable dt = new DataTable();
using (var _cmd = new NpgsqlCommand(sql, conn))
{
NpgsqlDataAdapter _dap = new NpgsqlDataAdapter(_cmd);
_dap.Fill(dt);
conn.Close();
if (dt != null)
{
if (dt.Rows.Count > 0)
{
int J = dt.Rows.Count;
StringBuilder sb = new StringBuilder();
for (int k = 0; k < dt.Rows.Count; k++)
{
for (int j = 0; j < dt.Columns.Count; j++)
{
sb.Append(dt.Rows[k][j] + ",");
}
sb.Remove(sb.Length - 1, 1);
sb.Append("\n");
}
return sb.ToString();
}
}
}
}
return null;
}
catch (Exception e)
{
Thread.Sleep(60000);
Console.WriteLine(e.Message);
}
}
return null;
}
static void Main(string[] args)
{
string result = executeRetry("select 1",5);
Console.WriteLine(result);
}
}
}
下一步
- 瞭解 Azure Cosmos DB for PostgreSQL API 如何延伸 PostgreSQL,並嘗試實用的診斷查詢
- 為工作負載挑選最佳的叢集大小
- 監視叢集效能