Verwenden von Node.js zum Herstellen einer Verbindung und zum Ausführen von SQL-Befehlen in Azure Cosmos DB for PostgreSQL
GILT FÜR: Azure Cosmos DB for PostgreSQL (unterstützt von der Citus-Datenbankerweiterung auf PostgreSQL)
In dieser Schnellstartanleitung erfahren Sie, wie Sie mithilfe von Node.js-Code eine Verbindung mit einem Cluster herstellen und SQL-Anweisungen zum Erstellen einer Tabelle verwenden. Anschließend werden Sie Daten in die Datenbank einfügen, abfragen, aktualisieren und löschen. Bei den Schritten in diesem Artikel wird davon vorausgesetzt, dass Sie mit der Entwicklung in Node.js vertraut sind. Es ist jedoch keine Erfahrung im Umgang mit Azure Cosmos DB for PostgreSQL erforderlich.
Installieren der PostgreSQL-Bibliothek
Für die Codebeispiele in diesem Artikel ist die pg-Bibliothek zur Verbindung mit dem PostgreSQL-Server erforderlich. Sie müssen pg mit Ihrem Sprachpaket-Manager (z. B. npm) installieren.
Herstellen einer Verbindung, Erstellen einer Tabelle und Einfügen von Daten
Erstellen des allgemeinen Verbindungsmoduls
Tipp
Der folgende Beispielcode verwendet einen Verbindungspool, um Verbindungen zu PostgreSQL zu erstellen und zu verwalten. Anwendungsseitiges Verbindungspooling wird dringend empfohlen, weil:
- Es stellt sicher, dass die Anwendung nicht zu viele Verbindungen zur Datenbank herstellt und somit ein Überschreiten der Verbindungslimits vermieden wird.
- Sie kann dazu beitragen, die Leistung drastisch zu verbessern - sowohl die Latenzzeit als auch den Durchsatz. Der PostgreSQL-Serverprozess muss freihand geben, um jede neue Verbindung zu behandeln und eine Verbindung wiederzuverwenden, um diesen Aufwand zu vermeiden.
Erstellen Sie einen Ordner namens db und in diesem Ordner eine Datei namens citus.js, die den folgenden allgemeinen Verbindungscode enthält. Ersetzen Sie in diesem Code <cluster> durch den Namen Ihres Clusters und <password> durch Ihr Administratorkennwort.
/**
* file: db/citus.js
*/
const { Pool } = require('pg');
const pool = new Pool({
max: 300,
connectionTimeoutMillis: 5000,
host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
port: 5432,
user: 'citus',
password: '<password>',
database: 'citus',
ssl: true,
});
module.exports = {
pool,
};
Erstellen einer Tabelle
Verwenden Sie den folgenden Code, um eine Verbindung herzustellen und die Daten zu laden, indem Sie die SQL-Anweisungen CREATE TABLE und INSERT INTO verwenden. Der Code erstellt eine neue Tabelle vom Typ pharmacy
und fügt einige Beispieldaten ein.
/**
* file: create.js
*/
const { pool } = require('./db/citus');
async function queryDatabase() {
const queryString = `
DROP TABLE IF EXISTS pharmacy;
CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
`;
try {
/* Real application code would probably request a dedicated client with
pool.connect() and run multiple queries with the client. In this
example, you're running only one query, so you use the pool.query()
helper method to run it on the first available idle client.
*/
await pool.query(queryString);
console.log('Created the Pharmacy table and inserted rows.');
} catch (err) {
console.log(err.stack);
} finally {
pool.end();
}
}
queryDatabase();
Verteilen von Tabellen
Mit Azure Cosmos DB for PostgreSQL können Sie Tabellen knotenübergreifend verteilen, um Skalierbarkeit zu ermöglichen. Mit dem folgenden Befehl können Sie eine Tabelle verteilen. Weitere Informationen zu create_distributed_table
und zur Verteilungsspalte finden Sie hier.
Hinweis
Verteilte Tabellen werden auf alle Workerknoten platziert, die dem Cluster hinzugefügt werden.
Verwenden Sie den folgenden Code, um eine Verbindung mit der Datenbank herzustellen und die Tabelle zu verteilen.
/**
* file: distribute-table.js
*/
const { pool } = require('./db/citus');
async function queryDatabase() {
const queryString = `
SELECT create_distributed_table('pharmacy', 'pharmacy_id');
`;
try {
await pool.query(queryString);
console.log('Distributed pharmacy table.');
} catch (err) {
console.log(err.stack);
} finally {
pool.end();
}
}
queryDatabase();
Lesen von Daten
Verwenden Sie den folgenden Code, um die Daten mit einer SQL-Anweisung des Typs SELECT zu verbinden und zu lesen.
/**
* file: read.js
*/
const { pool } = require('./db/citus');
async function queryDatabase() {
const queryString = `
SELECT * FROM pharmacy;
`;
try {
const res = await pool.query(queryString);
console.log(res.rows);
} catch (err) {
console.log(err.stack);
} finally {
pool.end();
}
}
queryDatabase();
Aktualisieren von Daten
Verwenden Sie den folgenden Code, um die Daten mit einer SQL-Anweisung des Typs UPDATE zu verbinden und zu aktualisieren.
/**
* file: update.js
*/
const { pool } = require('./db/citus');
async function queryDatabase() {
const queryString = `
UPDATE pharmacy SET city = 'Long Beach'
WHERE pharmacy_id = 1;
`;
try {
const result = await pool.query(queryString);
console.log('Update completed.');
console.log(`Rows affected: ${result.rowCount}`);
} catch (err) {
console.log(err.stack);
} finally {
pool.end();
}
}
queryDatabase();
Löschen von Daten
Verwenden Sie den folgenden Code, um die Daten mit einer SQL-Anweisung des Typs DELETE zu verbinden und zu löschen.
/**
* file: delete.js
*/
const { pool } = require('./db/citus');
async function queryDatabase() {
const queryString = `
DELETE FROM pharmacy
WHERE pharmacy_name = 'Target';
`;
try {
const result = await pool.query(queryString);
console.log('Delete completed.');
console.log(`Rows affected: ${result.rowCount}`);
} catch (err) {
console.log(err.stack);
} finally {
pool.end();
}
}
queryDatabase();
COPY-Befehl für eine schnelle Datenerfassung
Der COPY-Befehl kann bei der Erfassung von Daten in Azure Cosmos DB for PostgreSQL einen enormen Durchsatz erzielen. Der COPY-Befehl kann Daten in Dateien erfassen oder Mikrobatches von Daten im Arbeitsspeicher in Echtzeit erfassen.
COPY-Befehl zum Laden von Daten aus einer Datei
Mit dem folgenden Code werden Daten aus einer CSV-Datei in eine Datenbanktabelle kopiert. Für den Code werden das Paket pg-copy-streams und die Datei pharmacies.csv benötigt.
/**
* file: copycsv.js
*/
const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');
async function importCsvDatabase() {
return new Promise((resolve, reject) => {
const queryString = `
COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
`;
fileStream.on('error', reject);
pool
.connect()
.then(client => {
const stream = client
.query(copyFrom(queryString))
.on('error', reject)
.on('end', () => {
reject(new Error('Connection closed!'));
})
.on('finish', () => {
client.release();
resolve();
});
fileStream.pipe(stream);
})
.catch(err => {
reject(new Error(err));
});
});
}
(async () => {
console.log('Copying from CSV...');
await importCsvDatabase();
await pool.end();
console.log('Inserted csv successfully');
})();
COPY-Befehl zum Laden von In-Memory-Daten
Mit dem folgenden Code werden In-Memory-Daten in eine Tabelle kopiert. Für den Code wird das Paket through2 benötigt, um die Verkettung von Pipes zu ermöglichen.
/**
* file: copyinmemory.js
*/
const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');
async function importInMemoryDatabase() {
return new Promise((resolve, reject) => {
pool
.connect()
.then(client => {
const stream = client
.query(copyFrom('COPY pharmacy FROM STDIN'))
.on('error', reject)
.on('end', () => {
reject(new Error('Connection closed!'));
})
.on('finish', () => {
client.release();
resolve();
});
const internDataset = [
['100', 'Target', 'Sunnyvale', 'California', '94001'],
['101', 'CVS', 'San Francisco', 'California', '94002'],
];
let started = false;
const internStream = through2.obj((arr, _enc, cb) => {
const rowText = (started ? '\n' : '') + arr.join('\t');
started = true;
cb(null, rowText);
});
internStream.on('error', reject).pipe(stream);
internDataset.forEach((record) => {
internStream.write(record);
});
internStream.end();
})
.catch(err => {
reject(new Error(err));
});
});
}
(async () => {
await importInMemoryDatabase();
await pool.end();
console.log('Inserted inmemory data successfully.');
})();
App-Wiederholung bei Datenbankanforderungsfehlern
Manchmal passiert es, dass Datenbankanforderungen an Ihre Anwendung fehlschlagen. Solche Probleme können in verschiedenen Szenarien auftreten, z. B. bei Netzwerkfehlern zwischen App und Datenbank, bei einem falschen Kennwort usw. Manche Probleme sind temporär und lassen sich in wenigen Sekunden bis Minuten lösen. Sie können die Retry-Logik in Ihrer App konfigurieren, um die vorübergehenden Fehler zu bewältigen.
Durch das Konfigurieren der Retry-Logik in Ihrer App lässt sich die Benutzerfreundlichkeit für den Endbenutzers verbessern. In einem Fehlerszenario wartet der Benutzer lediglich etwas länger, bis die Anwendung Anforderungen ausgibt und keine Fehler mehr auftreten.
Im nachfolgenden Beispiel sehen Sie, wie Sie die Retry-Logik in Ihrer App implementieren. Der Beispielcodeschnipsel versucht es alle 60 Sekunden mit einer Datenbankanforderung (bis zu fünf Mal), bis er erfolgreich ist. Die Anzahl und Häufigkeit der Retries kann basierend auf den Anforderungen Ihrer Anwendung konfiguriert werden.
Ersetzen Sie in diesem Code <cluster> durch den Namen Ihres Clusters und <password> durch Ihr Administratorkennwort.
const { Pool } = require('pg');
const { sleep } = require('sleep');
const pool = new Pool({
host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
port: 5432,
user: 'citus',
password: '<password>',
database: 'citus',
ssl: true,
connectionTimeoutMillis: 0,
idleTimeoutMillis: 0,
min: 10,
max: 20,
});
(async function() {
res = await executeRetry('select nonexistent_thing;',5);
console.log(res);
process.exit(res ? 0 : 1);
})();
async function executeRetry(sql,retryCount)
{
for (let i = 0; i < retryCount; i++) {
try {
result = await pool.query(sql)
return result;
} catch (err) {
console.log(err.message);
sleep(60);
}
}
// didn't succeed after all the tries
return null;
}
Nächste Schritte
- Hier erfahren Sie, wie PostgreSQL durch die Azure Cosmos DB for PostgreSQL-API erweitert wird, und lernen nützliche Diagnoseabfragen kennen.
- Auswählen der optimalen Clustergröße für Ihre Workload
- Überwachen der Clusterleistung