Delen via


Delta table streaming-lees- en schrijfbewerkingen

Delta Lake is diep geïntegreerd met Spark Structured Streaming via readStream en writeStream. Delta Lake overwint veel van de beperkingen die doorgaans worden geassocieerd met streamingsystemen en bestanden, waaronder:

  • Het samenvoegen van kleine bestanden die worden geproduceerd door opname met lage latentie.
  • Het 'exactly-once' verwerken met meer dan één stroom (of gelijktijdige batchtaken).
  • Efficiënt detecteren welke bestanden nieuw zijn bij het gebruik van bestanden als bron voor een stream.

Notitie

In dit artikel wordt beschreven hoe u Delta Lake tables gebruikt als streamingbronnen en sinks. Zie Gegevens laden met behulp van streaming tables in Databricks SQL voor meer informatie over het laden van gegevens met behulp van streaming-tables in Databricks SQL.

Zie Stream-static joins voor meer informatie over stream-statische joins met Delta Lake.

Delta-table als bron

Structured Streaming leest delta-tablesincrementeel. Hoewel een streamingquery actief is voor een Delta-table, worden nieuwe records idempotent verwerkt als nieuwe table versies aan de bron-tableworden toegewijd.

In de volgende codevoorbeelden ziet u hoe u een streaming-leesbewerking configureert met behulp van de table naam of bestandspad.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Belangrijk

Als de schema voor een Delta-table wordt gewijzigd nadat een streaming-leesbewerking begint met de table, mislukt de query. Voor de meeste schema wijzigingen kunt u de stream opnieuw starten om schema mismatch op te lossen en om verder te gaan met verwerken.

In Databricks Runtime 12.2 LTS en hieronder kun je niet streamen vanuit een Delta-table met ingeschakelde column-mapping die een niet-additieve schema-evolutie heeft ondergaan, zoals het hernoemen of verwijderen van een columns. Zie Streaming met column toewijzing en schema wijzigingenvoor meer informatie.

Limit invoersnelheid

De volgende opties zijn beschikbaar voor het beheren van microbatches:

  • maxFilesPerTrigger: hoeveel nieuwe bestanden in elke microbatch moeten worden overwogen. De standaardwaarde is 1000.
  • maxBytesPerTrigger: Hoeveel gegevens worden verwerkt in elke microbatch. Met deze optie wordt een 'voorlopig maximum' ingesteld, wat betekent dat een batch ongeveer deze hoeveelheid gegevens verwerkt en meer dan de limit kan verwerken om de streamingquery vooruit te laten gaan in gevallen waarin de kleinste invoereenheid groter is dan deze limit. Dit is niet standaard set.

Als u maxBytesPerTrigger gebruikt in combinatie met maxFilesPerTrigger, verwerkt de microbatch gegevens totdat de maxFilesPerTrigger of maxBytesPerTriggerlimit is bereikt.

Notitie

In gevallen waarin de bron-table transacties worden opgeschoond vanwege de logRetentionDurationconfiguratie en de streamingquery probeert deze versies te verwerken, kan de query standaard geen gegevensverlies voorkomen. U kunt de optie set naar failOnDataLoss gebruiken om verloren gegevens false te negeren en door te gaan met verwerken.

Een CDC-feed (Delta Lake Change Data Capture) streamen

Delta Lake wijzigingen in de gegevensfeed registreert wijzigingen in een Delta-table, inclusief updates en verwijderingen. Wanneer deze functie is ingeschakeld, kunt u streamen vanuit een wijzigingengegevensfeed en logica schrijven om invoegingen, updates en verwijderingen te verwerken in downstream-tables. Hoewel de uitvoer van wijzigingsgegevensstromen enigszins verschilt van de Delta-table die het beschrijft, biedt dit een oplossing voor het verspreiden van incrementele wijzigingen naar downstream-tables in een medaillonarchitectuur.

Belangrijk

In Databricks Runtime 12.2 LTS en lager kunt u niet streamen vanuit de change data feed voor een Delta-table waarvoor column-toewijzing is ingeschakeld en die niet-additieve schema-evolutie heeft ondergaan, zoals het hernoemen of verwijderen van columns. Zie Streaming met column toewijzing en schema wijzigingen.

Updates en verwijderingen negeren

Structured Streaming verwerkt geen invoer die geen toevoeging is en geeft een uitzondering als er wijzigingen optreden in de table die als bron wordt gebruikt. Er zijn twee belangrijke strategieën voor het verwerken van wijzigingen die niet automatisch downstream kunnen worden doorgegeven:

  • U kunt de uitvoer en het controlepunt verwijderen en de stream opnieuw starten vanaf het begin.
  • U kunt een van de volgende twee opties set:
    • ignoreDeletes: transacties negeren die gegevens op partition grenzen verwijderen.
    • skipChangeCommits: transacties negeren die bestaande records verwijderen of wijzigen. skipChangeCommits ondergaat ignoreDeletes.

Notitie

In Databricks Runtime 12.2 LTS en hoger skipChangeCommits wordt de vorige instelling ignoreChangesafgeschaft. In Databricks Runtime 11.3 LTS en lager ignoreChanges is dit de enige ondersteunde optie.

De semantiek voor ignoreChanges verschilt sterk van skipChangeCommits. Als ignoreChanges ingeschakeld, worden herschreven gegevensbestanden in de bron-table opnieuw verzonden na een bewerking voor het wijzigen van gegevens, zoals UPDATE, MERGE INTO, DELETE (binnen partities) of OVERWRITE. Ongewijzigde rijen worden vaak samen met nieuwe rijen verzonden, dus downstreamgebruikers moeten dubbele waarden kunnen verwerken. Verwijderingen worden niet downstream doorgegeven. ignoreChanges ondergaat ignoreDeletes.

skipChangeCommits negeert bewerkingen voor het wijzigen van bestanden volledig. Gegevensbestanden die in de bron table worden herschreven vanwege een bewerking voor het wijzigen van gegevens, zoals UPDATE, MERGE INTO, DELETEen OVERWRITE worden volledig genegeerd. Als u wijzigingen in upstream-bron tableswilt weergeven, moet u afzonderlijke logica implementeren om deze wijzigingen door te geven.

Workloads die zijn geconfigureerd met ignoreChanges continue werking met behulp van bekende semantiek, maar Databricks raadt het gebruik skipChangeCommits aan voor alle nieuwe workloads. Voor het migreren van workloads die worden gebruikt ignoreChanges om herstructureringslogica te skipChangeCommits vereisen.

Opmerking

Stel dat u een tableuser_events hebt met date, user_emailen actioncolumns die door dateis gepartitioneerd. U streamt uit de user_eventstable en u moet er gegevens uit verwijderen vanwege de AVG.

Wanneer u bij partition grenzen verwijdert (dat wil gezegd, de WHERE zich op een partitioncolumnbevindt), worden de bestanden al gesegmenteerd op waarde, zodat de verwijdering deze bestanden alleen uit de metagegevens verwijdert. Wanneer u een volledige partition met gegevens verwijdert, kunt u het volgende gebruiken:

spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

Als u gegevens in meerdere partities verwijdert (in dit voorbeeld filteren op user_email), gebruikt u de volgende syntaxis:

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

Als u een user_emailupdate met de instructie UPDATE, wordt het bestand met de user_email in kwestie herschreven. Gebruik skipChangeCommits dit om de gewijzigde gegevensbestanden te negeren.

Beginpositie opgeven

U kunt de volgende opties gebruiken om het beginpunt van de Delta Lake-streamingbron op te geven zonder de volledige tablete verwerken.

  • startingVersion: De Delta Lake-versie om van te beginnen. Databricks raadt aan deze optie voor de meeste workloads weg te laten. Indien niet set, begint de stream bij de meest recente beschikbare versie, inclusief een volledige momentopname van de table op dat moment.

    Indien opgegeven, leest de stream alle wijzigingen in de Delta-table vanaf de opgegeven versie (inclusief). Als de opgegeven versie niet meer beschikbaar is, kan de stream niet worden gestart. U kunt de doorvoerversies verkrijgen uit de versioncolumn van de DESCRIBE HISTORY opdrachtuitvoer.

    Als u alleen de meest recente wijzigingen wilt retourneren, geeft u op latest.

  • startingTimestamp: het tijdstempel waaruit moet worden gestart. Alle table wijzigingen die zijn doorgevoerd op of na de tijdstempel (inclusief de tijdstempel zelf) worden gelezen door de streamlezer. Als de opgegeven tijdstempel voorafgaat aan alle table doorvoeringen, begint de streaming-leesbewerking met de vroegste beschikbare tijdstempel. Een van de volgende:

    • Een tijdstempeltekenreeks. Bijvoorbeeld: "2019-01-01T00:00:00.000Z".
    • Een datumtekenreeks. Bijvoorbeeld: "2019-01-01".

U kunt beide opties niet tegelijkertijd uitvoeren (set). Ze worden alleen van kracht wanneer een nieuwe streamingquery wordt gestart. Als een streamingquery is gestart en de voortgang is vastgelegd in het controlepunt, worden deze opties genegeerd.

Belangrijk

Hoewel u de streamingbron vanuit een opgegeven versie of tijdstempel kunt starten, is de schema van de streamingbron altijd de meest recente schema van de Delta-table. U moet ervoor zorgen dat er na de opgegeven versie of tijdstempel geen incompatibele schema-wijziging voor de Delta table is. Anders kan de streamingbron onjuiste resultaten retourneren bij het lezen van de gegevens met een onjuiste schema.

Opmerking

Stel dat u een tableuser_eventshebt. Als u wijzigingen wilt lezen sinds versie 5, gebruikt u:

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

Als u wijzigingen wilt lezen sinds 2018-10-18, gebruikt u:

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

Eerste momentopname verwerken zonder dat gegevens worden verwijderd

Notitie

Deze functie is beschikbaar in Databricks Runtime 11.3 LTS en hoger. Deze functie is beschikbaar als openbare preview.

Wanneer u een Delta-table als een stroombron gebruikt, verwerkt de query eerst alle gegevens die aanwezig zijn in de table. Delta table in deze versie wordt de eerste momentopname genoemd. Standaard worden de gegevensbestanden van delta tableverwerkt op basis van welk bestand het laatst is gewijzigd. De laatste wijzigingstijd vertegenwoordigt echter niet noodzakelijkerwijs de volgorde van de gebeurtenistijd van de record.

In een stateful streaming-query met een gedefinieerde watermarkkan het verwerken van bestanden op basis van wijzigingstijd ertoe leiden dat records in de verkeerde volgorde worden verwerkt. Dit kan ertoe leiden dat records worden verwijderd als latere gebeurtenissen door de watermark.

U kunt het probleem met gegevensuitval voorkomen door de volgende optie in te schakelen:

  • withEventTimeOrder: Hiermee wordt aangegeven of de eerste momentopname moet worden verwerkt met gebeurtenistijdvolgorde.

Als gebeurtenistijdvolgorde is ingeschakeld, wordt het tijdsbereik van de eerste momentopnamegegevens onderverdeeld in tijdbuckets. Elke microbatch verwerkt een bucket door gegevens binnen het tijdsbereik te filteren. De configuratieopties maxFilesPerTrigger en maxBytesPerTrigger zijn nog steeds van toepassing om de microbatchgrootte te beheren, maar alleen op een geschatte manier vanwege de aard van de verwerking.

In de onderstaande afbeelding ziet u dit proces:

Eerste momentopname

Belangrijke informatie over deze functie:

  • Het probleem met het verwijderen van gegevens treedt alleen op wanneer de eerste Delta-momentopname van een stateful streaming-query wordt verwerkt in de standaardvolgorde.
  • U kunt niet wijzigen withEventTimeOrder zodra de streamquery is gestart terwijl de eerste momentopname nog steeds wordt verwerkt. Als u opnieuw wilt opstarten met withEventTimeOrder gewijzigd, moet u het controlepunt verwijderen.
  • Als u een streamquery uitvoert waarvoorEventTimeOrder is ingeschakeld, kunt u deze niet downgraden naar een DBR-versie die deze functie pas ondersteunt als de eerste momentopnameverwerking is voltooid. Als u een downgrade wilt uitvoeren, kunt u wachten tot de eerste momentopname is voltooid of het controlepunt verwijderen en de query opnieuw starten.
  • Deze functie wordt niet ondersteund in de volgende ongebruikelijke scenario's:
    • De gebeurtenistijd column is een gegenereerde column en er zijn niet-projectietransformaties tussen de Delta-bron en watermark.
    • Er is een watermark met meer dan één Delta-bron in de stroomquery.
  • Als gebeurtenistijdvolgorde is ingeschakeld, kunnen de prestaties van de initiële verwerking van de Delta-momentopname langzamer zijn.
  • Elke microbatch scant de eerste momentopname om gegevens binnen het bijbehorende tijdsbereik van de gebeurtenis te filteren. Voor een snellere filteractie is het raadzaam om een Delta-bron te gebruiken column als gebeurtenistijd, zodat gegevens kunnen worden overgeslagen (controleer Gegevens overslaan op Delta Lake wanneer deze van toepassing is). Daarnaast kan table partitionering langs de gebeurtenistijd column de verwerking verder versnellen. U kunt de Spark-gebruikersinterface controleren om te zien hoeveel Delta-bestanden worden gescand op een specifieke microbatch.

Opmerking

Stel dat u een tableuser_events hebt met een event_timecolumn. Uw streamingquery is een aggregatiequery. Als u er zeker van wilt zijn dat er geen gegevens worden wegvallen tijdens de eerste momentopnameverwerking, kunt u het volgende gebruiken:

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

Notitie

U kunt dit ook inschakelen met Spark-configuratie op het cluster dat van toepassing is op alle streamingquery's: spark.databricks.delta.withEventTimeOrder.enabled true

Delta-table als opvang

U kunt ook gegevens naar een Delta-table schrijven met behulp van Structured Streaming. Met het transactielogboek kan Delta Lake exact één keer verwerken, zelfs wanneer er andere streams of batchquery's gelijktijdig worden uitgevoerd op de table.

Notitie

De functie Delta Lake VACUUM verwijdert alle bestanden die niet worden beheerd door Delta Lake, maar slaat alle mappen over die beginnen met _. U kunt controlepunten veilig opslaan naast andere gegevens en metagegevens voor een Delta-table met behulp van een mapstructuur zoals <table-name>/_checkpoints.

Metrische gegevens

U vindt het aantal bytes en het aantal bestanden dat nog moet worden verwerkt in een streamingqueryproces als de numBytesOutstanding metrische numFilesOutstanding gegevens. Aanvullende metrische gegevens zijn onder andere:

  • numNewListedFiles: Het aantal Delta Lake-bestanden dat is vermeld om de achterstand voor deze batch te berekenen.
    • backlogEndOffset: de table versie die wordt gebruikt om de achterstand te berekenen.

Als u de stream uitvoert in een notebook, ziet u deze metrische gegevens op het tabblad Onbewerkte gegevens in het voortgangsdashboard voor streamingquery's:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Toevoegmodus

Streams worden standaard uitgevoerd in de toevoegmodus, waarmee nieuwe records worden toegevoegd aan de table.

Gebruik de methode toTable bij het streamen naar tables, zoals in het volgende voorbeeld:

Python

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Volledige modus

U kunt ook Structured Streaming gebruiken om de gehele table te vervangen met elke batch. Een voorbeeld van een use case is het berekenen van een samenvatting met behulp van aggregatie:

Python

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)

Scala

spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

In het voorgaande voorbeeld wordt continu een table bijgewerkt die het geaggregeerde aantal gebeurtenissen per klant bevat.

Voor toepassingen met meer soepele latentievereisten kunt u rekenresources besparen met eenmalige triggers. Gebruik deze om samenvattingsaggregatie te updatetables volgens een bepaald schema, waarbij alleen nieuwe gegevens worden verwerkt die sinds de laatste updatezijn aangekomen.

Upsert van streamingquery's met behulp van foreachBatch

U kunt een combinatie van merge en foreachBatch gebruiken om complexe upserts van een streamingquery naar een Delta-tablete schrijven. Zie foreachBatch gebruiken om naar willekeurige gegevens-sinks te schrijven.

Dit patroon heeft veel toepassingen, waaronder de volgende:

  • schrijf streaming-aggregaten in modus Update: Dit is veel efficiënter dan de Complete Mode.
  • Een stroom databasewijzigingen schrijven naar een Delta table: de merge-query voor het schrijven van veranderingsgegevens kan in foreachBatch worden gebruikt om continu een stroom wijzigingen toe te passen op een Delta table.
  • Een gegevensstroom naar Delta table schrijven met ontdubbeling: de insert-only merge-query voor ontdubbeling kan in foreachBatch worden gebruikt om continu gegevens (met duplicaten) naar een Delta-table te schrijven met automatische ontdubbeling.

Notitie

  • Zorg ervoor dat uw merge instructie foreachBatch idempotent is als het opnieuw opstarten van de streamingquery de bewerking meerdere keren op dezelfde batch met gegevens kan toepassen.
  • Wanneer merge deze wordt gebruikt foreachBatch, kan de invoergegevenssnelheid van de streamingquery (gerapporteerd via StreamingQueryProgress en zichtbaar in de grafiek van de notebooksnelheid) worden gerapporteerd als een veelvoud van de werkelijke snelheid waarmee gegevens worden gegenereerd bij de bron. Dit komt doordat merge de invoergegevens meerdere keren leest, waardoor de metrische invoergegevens worden vermenigvuldigd. Als dit een knelpunt is, kunt u de batch DataFrame vóór merge in de cache opslaan en de cache vervolgens na merge uit de cache halen.

In het volgende voorbeeld ziet u hoe u SQL kunt gebruiken om foreachBatch deze taak uit te voeren:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

U kunt er ook voor kiezen om de Delta Lake-API's te gebruiken om streaming-upserts uit te voeren, zoals in het volgende voorbeeld:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Idempotent table schrijft in foreachBatch

Notitie

Databricks raadt u aan een afzonderlijke streaming-schrijfbewerking te configureren voor elke sink die u wilt update. Het gebruik van foreachBatch om te schrijven naar meerdere tables serialiseert schrijfbewerkingen, waardoor parallelle uitvoering wordt verminderd en de algehele latentie toeneemt.

Delta tables ondersteunt de volgende DataFrameWriter opties om schrijfbewerkingen naar meerdere tables te maken binnen foreachBatch idempotent:

  • txnAppId: Een unieke tekenreeks die u kunt doorgeven aan elke DataFrame-schrijfbewerking. U kunt bijvoorbeeld de StreamingQuery-id gebruiken als txnAppId.
  • txnVersion: Een monotonisch toenemend getal dat fungeert als transactieversie.

Delta Lake gebruikt de combinatie van txnAppId en txnVersion om dubbele schrijfbewerkingen te identificeren en te negeren.

Als een batch-schrijfbewerking wordt onderbroken door een fout, gebruikt het opnieuw uitvoeren van de batch dezelfde toepassing en batch-id om de runtime te helpen dubbele schrijfbewerkingen correct te identificeren en te negeren. Toepassings-id (txnAppId) kan elke door de gebruiker gegenereerde unieke tekenreeks zijn en hoeft niet gerelateerd te zijn aan de stream-id. Zie foreachBatch gebruiken om naar willekeurige gegevens-sinks te schrijven.

Waarschuwing

Als u het streamingcontrolepunt verwijdert en de query opnieuw start met een nieuw controlepunt, moet u een ander txnAppIditem opgeven. Nieuwe controlepunten beginnen met een batch-id van 0. Delta Lake maakt gebruik van de batch-id en txnAppId als een unieke sleutel en slaat batches met reeds geziene valuesover.

In het volgende codevoorbeeld ziet u dit patroon:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}