Поделиться через


Использование служб ИИ Azure с SynapseML в Microsoft Fabric

службах ИИ Azure помочь разработчикам и организациям быстро создавать интеллектуальные, передовые, готовые к рынкам и ответственные приложения с готовыми и предварительно созданными и настраиваемыми API и моделями. В этой статье вы будете использовать различные службы, доступные в службах ИИ Azure, для выполнения задач, которые включают: анализ текста, анализ документов, распознавание документов, поиск изображений, речь в текст и текст в преобразование речи, обнаружение аномалий и извлечение данных из веб-API.

Цель служб искусственного интеллекта Azure — помочь разработчикам создавать приложения, которые могут видеть, слышать, говорить, понимать и даже начинать рассуждать. Каталог служб в службах ИИ Azure можно разделить на пять основных принципов: визуального распознавания, распознавания речи, языка, веб-поискаи принятия решений.

Необходимые условия

  • Получите подписку Microsoft Fabric. Или зарегистрируйтесь на бесплатную пробную версию Microsoft Fabric .

  • Войдите в Microsoft Fabric.

  • Используйте переключатель интерфейса в левой нижней части домашней страницы, чтобы перейти на Fabric.

    снимок экрана меню переключателя режимов, в котором показано, где выбрать науку о данных.

Подготовьте вашу систему

Чтобы начать, импортируйте необходимые библиотеки и инициализировать сеанс Spark.

from pyspark.sql.functions import udf, col
from synapse.ml.io.http import HTTPTransformer, http_udf
from requests import Request
from pyspark.sql.functions import lit
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import *

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Импортируйте библиотеки служб ИИ Azure и замените ключи и местоположения в следующем фрагменте кода вашими ключом и местоположением служб ИИ Azure.

from synapse.ml.cognitive import *

# A general Azure AI services key for Text Analytics, Vision and Document Intelligence (or use separate keys that belong to each service)
service_key = "<YOUR-KEY-VALUE>" # Replace <YOUR-KEY-VALUE> with your Azure AI service key, check prerequisites for more details
service_loc = "eastus"

# A Bing Search v7 subscription key
bing_search_key =  "<YOUR-KEY-VALUE>" # Replace <YOUR-KEY-VALUE> with your Bing v7 subscription key, check prerequisites for more details

# An Anomaly Detector subscription key
anomaly_key = <"YOUR-KEY-VALUE"> # Replace <YOUR-KEY-VALUE> with your anomaly service key, check prerequisites for more details
anomaly_loc = "westus2"

# A Translator subscription key
translator_key = "<YOUR-KEY-VALUE>" # Replace <YOUR-KEY-VALUE> with your translator service key, check prerequisites for more details
translator_loc = "eastus"

# An Azure search key
search_key = "<YOUR-KEY-VALUE>" # Replace <YOUR-KEY-VALUE> with your search key, check prerequisites for more details

Выполнить анализ тональности текста

Служба анализа текста предоставляет несколько алгоритмов для извлечения интеллектуальных инсайтов из текста. Например, службу можно использовать для поиска тональности определенного входного текста. Служба вернет оценку от 0,0 до 1,0, где низкие оценки указывают на отрицательное настроение, а высокие оценки указывают на положительное настроение.

В следующем примере кода возвращается тональность для трех простых предложений.

# Create a dataframe that's tied to it's column names
df = spark.createDataFrame(
    [
        ("I am so happy today, its sunny!", "en-US"),
        ("I am frustrated by this rush hour traffic", "en-US"),
        ("The cognitive services on spark aint bad", "en-US"),
    ],
    ["text", "language"],
)

# Run the Text Analytics service with options
sentiment = (
    TextSentiment()
    .setTextCol("text")
    .setLocation(service_loc)
    .setSubscriptionKey(service_key)
    .setOutputCol("sentiment")
    .setErrorCol("error")
    .setLanguageCol("language")
)

# Show the results of your text query in a table format
display(
    sentiment.transform(df).select(
        "text", col("sentiment.document.sentiment").alias("sentiment")
    )
)

Анализ текста данных о здоровье

Анализ текста для медицинских служб извлекает и маркирует соответствующую медицинскую информацию из неструктурированного текста, такие как примечания врачей, выписки о выписке, клинические документы и электронные медицинские записи.

Следующий пример кода анализирует и преобразует текст из заметок врачей в структурированные данные.

df = spark.createDataFrame(
    [
        ("20mg of ibuprofen twice a day",),
        ("1tsp of Tylenol every 4 hours",),
        ("6-drops of Vitamin B-12 every evening",),
    ],
    ["text"],
)

healthcare = (
    AnalyzeHealthText()
    .setSubscriptionKey(service_key)
    .setLocation(service_loc)
    .setLanguage("en")
    .setOutputCol("response")
)

display(healthcare.transform(df))

Перевод текста на другой язык

Переводчик — это облачная служба машинного перевода и является частью семейства служб искусственного интеллекта Azure когнитивных API, используемых для создания интеллектуальных приложений. Переводчик легко интегрировать в приложения, веб-сайты, инструменты и решения. Он позволяет добавлять многоязычные пользовательские интерфейсы на 90 языках и диалектах и использовать для перевода текста с любой операционной системой.

В следующем примере кода выполняется простой перевод текста, предоставляя предложения, которые необходимо перевести и на целевые языки, на которые они нужно перевести.

from pyspark.sql.functions import col, flatten

# Create a dataframe including sentences you want to translate
df = spark.createDataFrame(
    [(["Hello, what is your name?", "Bye"],)],
    [
        "text",
    ],
)

# Run the Translator service with options
translate = (
    Translate()
    .setSubscriptionKey(translator_key)
    .setLocation(translator_loc)
    .setTextCol("text")
    .setToLanguage(["zh-Hans"])
    .setOutputCol("translation")
)

# Show the results of the translation.
display(
    translate.transform(df)
    .withColumn("translation", flatten(col("translation.translations")))
    .withColumn("translation", col("translation.text"))
    .select("translation")
)

Извлечение сведений из документа в структурированные данные

Аналитика документов Azure — это часть служб ИИ Azure, которая позволяет создавать автоматизированное программное обеспечение для обработки данных с помощью технологии машинного обучения. С помощью Azure AI Document Intelligence вы можете определять и извлекать текст, пары ключ/значение, отметки выбора, таблицы и структуру из документов. Служба выводит структурированные данные, которые включают взаимосвязи в исходном файле, ограничивающие рамки, степень уверенности и многое другое.

В следующем примере кода анализируется изображение визитной карточки и извлекается его информация в структурированные данные.

from pyspark.sql.functions import col, explode

# Create a dataframe containing the source files
imageDf = spark.createDataFrame(
    [
        (
            "https://mmlspark.blob.core.windows.net/datasets/FormRecognizer/business_card.jpg",
        )
    ],
    [
        "source",
    ],
)

# Run the Form Recognizer service
analyzeBusinessCards = (
    AnalyzeBusinessCards()
    .setSubscriptionKey(service_key)
    .setLocation(service_loc)
    .setImageUrlCol("source")
    .setOutputCol("businessCards")
)

# Show the results of recognition.
display(
    analyzeBusinessCards.transform(imageDf)
    .withColumn(
        "documents", explode(col("businessCards.analyzeResult.documentResults.fields"))
    )
    .select("source", "documents")
)

Анализ изображений и тегов

Компьютерное зрение анализирует изображения, чтобы выявить такие элементы структуры, как лица, объекты и описания на естественном языке.

Следующий пример кода анализирует изображения и маркирует их тегами . Теги — это одно слово описания вещей в изображении, такие как узнаваемые объекты, люди, пейзажи и действия.

# Create a dataframe with the image URLs
base_url = "https://raw.githubusercontent.com/Azure-Samples/cognitive-services-sample-data-files/master/ComputerVision/Images/"
df = spark.createDataFrame(
    [
        (base_url + "objects.jpg",),
        (base_url + "dog.jpg",),
        (base_url + "house.jpg",),
    ],
    [
        "image",
    ],
)

# Run the Computer Vision service. Analyze Image extracts information from/about the images.
analysis = (
    AnalyzeImage()
    .setLocation(service_loc)
    .setSubscriptionKey(service_key)
    .setVisualFeatures(
        ["Categories", "Color", "Description", "Faces", "Objects", "Tags"]
    )
    .setOutputCol("analysis_results")
    .setImageUrlCol("image")
    .setErrorCol("error")
)

# Show the results of what you wanted to pull out of the images.
display(analysis.transform(df).select("image", "analysis_results.description.tags"))

Bing Image Search выполняет поиск в Интернете, чтобы извлечь изображения, которые связаны с вопросом на естественном языке пользователя.

В следующем примере кода используется текстовый запрос, который ищет изображения с кавычками. Выходные данные кода — это список URL-адресов изображений, содержащих фотографии, связанные с запросом.

# Number of images Bing will return per query
imgsPerBatch = 10
# A list of offsets, used to page into the search results
offsets = [(i * imgsPerBatch,) for i in range(100)]
# Since web content is our data, we create a dataframe with options on that data: offsets
bingParameters = spark.createDataFrame(offsets, ["offset"])

# Run the Bing Image Search service with our text query
bingSearch = (
    BingImageSearch()
    .setSubscriptionKey(bing_search_key)
    .setOffsetCol("offset")
    .setQuery("Martin Luther King Jr. quotes")
    .setCount(imgsPerBatch)
    .setOutputCol("images")
)

# Transformer that extracts and flattens the richly structured output of Bing Image Search into a simple URL column
getUrls = BingImageSearch.getUrlTransformer("images", "url")

# This displays the full results returned, uncomment to use
# display(bingSearch.transform(bingParameters))

# Since we have two services, they are put into a pipeline
pipeline = PipelineModel(stages=[bingSearch, getUrls])

# Show the results of your search: image URLs
display(pipeline.transform(bingParameters))

Преобразование речи в текст

Служба преобразования речи в текст конвертирует потоки или файлы с голосовыми записями в текст. В следующем примере кода выполняется транскрибирование одного звукового файла в текст.

# Create a dataframe with our audio URLs, tied to the column called "url"
df = spark.createDataFrame(
    [("https://mmlspark.blob.core.windows.net/datasets/Speech/audio2.wav",)], ["url"]
)

# Run the Speech-to-text service to translate the audio into text
speech_to_text = (
    SpeechToTextSDK()
    .setSubscriptionKey(service_key)
    .setLocation(service_loc)
    .setOutputCol("text")
    .setAudioDataCol("url")
    .setLanguage("en-US")
    .setProfanity("Masked")
)

# Show the results of the translation
display(speech_to_text.transform(df).select("url", "text.DisplayText"))

Преобразование текста в речь

Текст в речь — это служба, которая позволяет создавать программные приложения и службы с естественной речью, используя более чем 270 нейронных голосов на 119 языках и вариантах.

В следующем примере кода текст преобразуется в звуковой файл, содержащий содержимое текста.

from synapse.ml.cognitive import TextToSpeech

fs = ""
if running_on_databricks():
    fs = "dbfs:"
elif running_on_synapse_internal():
    fs = "Files"

# Create a dataframe with text and an output file location
df = spark.createDataFrame(
    [
        (
            "Reading out loud is fun! Check out aka.ms/spark for more information",
            fs + "/output.mp3",
        )
    ],
    ["text", "output_file"],
)

tts = (
    TextToSpeech()
    .setSubscriptionKey(service_key)
    .setTextCol("text")
    .setLocation(service_loc)
    .setVoiceName("en-US-JennyNeural")
    .setOutputFileCol("output_file")
)

# Check to make sure there were no errors during audio creation
display(tts.transform(df))

Обнаружение аномалий в данных временных рядов

детектор аномалий отлично подходит для обнаружения нарушений в данных временных рядов. В следующем примере кода служба детектора аномалий используется для поиска аномалий во всех данных временных рядов.

# Create a dataframe with the point data that Anomaly Detector requires
df = spark.createDataFrame(
    [
        ("1972-01-01T00:00:00Z", 826.0),
        ("1972-02-01T00:00:00Z", 799.0),
        ("1972-03-01T00:00:00Z", 890.0),
        ("1972-04-01T00:00:00Z", 900.0),
        ("1972-05-01T00:00:00Z", 766.0),
        ("1972-06-01T00:00:00Z", 805.0),
        ("1972-07-01T00:00:00Z", 821.0),
        ("1972-08-01T00:00:00Z", 20000.0),
        ("1972-09-01T00:00:00Z", 883.0),
        ("1972-10-01T00:00:00Z", 898.0),
        ("1972-11-01T00:00:00Z", 957.0),
        ("1972-12-01T00:00:00Z", 924.0),
        ("1973-01-01T00:00:00Z", 881.0),
        ("1973-02-01T00:00:00Z", 837.0),
        ("1973-03-01T00:00:00Z", 9000.0),
    ],
    ["timestamp", "value"],
).withColumn("group", lit("series1"))

# Run the Anomaly Detector service to look for irregular data
anamoly_detector = (
    SimpleDetectAnomalies()
    .setSubscriptionKey(anomaly_key)
    .setLocation(anomaly_loc)
    .setTimestampCol("timestamp")
    .setValueCol("value")
    .setOutputCol("anomalies")
    .setGroupbyCol("group")
    .setGranularity("monthly")
)

# Show the full results of the analysis with the anomalies marked as "True"
display(
    anamoly_detector.transform(df).select("timestamp", "value", "anomalies.isAnomaly")
)

Получение сведений из произвольных веб-API

С помощью HTTP в Spark можно использовать любую веб-службу в конвейере больших данных. В следующем примере кода используется API Всемирного банка для получения информации о различных странах по всему миру.

# Use any requests from the python requests library


def world_bank_request(country):
    return Request(
        "GET", "http://api.worldbank.org/v2/country/{}?format=json".format(country)
    )


# Create a dataframe with specifies which countries we want data on
df = spark.createDataFrame([("br",), ("usa",)], ["country"]).withColumn(
    "request", http_udf(world_bank_request)(col("country"))
)

# Much faster for big data because of the concurrency :)
client = (
    HTTPTransformer().setConcurrency(3).setInputCol("request").setOutputCol("response")
)

# Get the body of the response


def get_response_body(resp):
    return resp.entity.content.decode()


# Show the details of the country data returned
display(
    client.transform(df).select(
        "country", udf(get_response_body)(col("response")).alias("response")
    )
)