Κοινή χρήση μέσω


Εγκατάσταση ιδιωτικού πακέτου ως απαίτηση στην εργασία airflow Apache

Σημείωμα

Η εργασία Airflow Apache υποστηρίζεται από το Apache Airflow.

Ένα πακέτο python είναι ένας τρόπος για να οργανώσετε σχετικές εκπαιδευτικές ενότητες Python σε μια μοναδική ιεραρχία καταλόγου. Ένα πακέτο συνήθως αναπαρίσταται ως κατάλογος που περιέχει ένα ειδικό αρχείο που ονομάζεται init.py. Μέσα σε έναν κατάλογο πακέτων, μπορείτε να έχετε πολλά αρχεία λειτουργικής μονάδας Python (.py αρχεία) που ορίζουν συναρτήσεις, και μεταβλητές. Στο πλαίσιο της Εργασίας airflow Apache, μπορείτε να αναπτύξετε ιδιωτικά πακέτα για να προσθέσετε προσαρμοσμένους τελεστές ροής αέρα Apache, άγκιστρα, αισθητήρες, προσθήκες κ.λπ.

Σε αυτή την εκμάθηση, θα δημιουργήσετε έναν απλό προσαρμοσμένο τελεστή ως πακέτο Python, θα τον προσθέσετε ως απαίτηση στο περιβάλλον εργασίας Airflow apache και θα εισαγάγετε το ιδιωτικό πακέτο ως λειτουργική μονάδα μέσα στο αρχείο DAG.

Ανάπτυξη προσαρμοσμένου τελεστή και δοκιμή με μια Dag ροής αέρα Apache

  1. Δημιουργήστε ένα αρχείο sample_operator.py και μετατρέψτε το σε ιδιωτικό πακέτο. Ανατρέξτε στον οδηγό: Δημιουργία πακέτου σε python

    from airflow.models.baseoperator import BaseOperator
    
    
    class SampleOperator(BaseOperator):
        def __init__(self, name: str, **kwargs) -> None:
            super().__init__(**kwargs)
            self.name = name
    
        def execute(self, context):
            message = f"Hello {self.name}"
            return message
    
    
  2. Δημιουργήστε το αρχείο sample_dag.py DAG Airflow Apache για να ελέγξετε τον τελεστή που ορίστηκε στο βήμα 1.

    from datetime import datetime
    from airflow import DAG
    
     # Import from private package
    from airflow_operator.sample_operator import SampleOperator
    
    
    with DAG(
    "test-custom-package",
    tags=["example"]
    description="A simple tutorial DAG",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    ) as dag:
        task = SampleOperator(task_id="sample-task", name="foo_bar")
    
        task
    
  3. Δημιουργήστε ένα αποθετήριο GitHub που περιέχει τον φάκελο in Dags και το sample_dag.py αρχείο του ιδιωτικού πακέτου σας. Οι συνήθεις μορφές αρχείων περιλαμβάνουν zip, .whlή tar.gz. Τοποθετήστε το αρχείο είτε στον φάκελο "Dags" είτε στο φάκελο "Plugins", ανάλογα με την περίπτωση. Συγχρονίστε το αποθετήριο Git με την εργασία airflow Apache ή μπορείτε να χρησιμοποιήσετε προρυθμισμένο αποθετήριο εγκατάστασης-ιδιωτικούπακέτου

Προσθέστε το πακέτο σας ως απαίτηση

Προσθέστε το πακέτο ως απαίτηση στην περιοχή Airflow requirements. Χρήση της μορφής /opt/airflow/git/<repoName>.git/<pathToPrivatePackage>

Για παράδειγμα, εάν το ιδιωτικό πακέτο βρίσκεται σε /dags/test/private.whl ένα αποθετήριο δεδομένων GitHub, προσθέστε την απαίτηση /opt/airflow/git/<repoName>.git/dags/test/private.whl στο περιβάλλον Ροή αέρα.

Στιγμιότυπο οθόνης που εμφανίζει το ιδιωτικό πακέτο που έχει προστεθεί ως απαίτηση.

Γρήγορη εκκίνηση: Δημιουργία εργασίας ροής αέρα Apache