다음을 통해 공유


Notebook에 대한 단위 테스트

단위 테스트를 사용하여 Notebook 코드의 품질과 일관성을 향상시킬 수 있습니다. 단위 테스트는 함수와 같은 자체적으로 포함된 코드 단위를 초기에 자주 테스트하는 방법입니다. 이렇게 하면 코드 문제를 더욱 빠르게 찾고, 코드에 대한 잘못된 가정을 더욱 빠르게 파악하고, 전체 코딩 작업을 간소화할 수 있습니다.

이 문서에서는 함수에 대한 기본 단위 테스트를 소개합니다. 단위 테스트 클래스 및 인터페이스와 같은 고급 개념과 Notebook용 단위 테스트 시 지원되는 스텁, 모의테스트 도구의 사용은 이 문서의 범위에 속하지 않습니다. 이 문서에서는 또한 통합 테스트, 시스템 테스트, 수용 테스트와 같은 다른 종류의 테스트 방법, 또는 성능 테스트유용성 테스트와 같은 비기능적 테스트 방법을 다루지 않습니다.

이 문서에서는 다음을 수행하는 방법을 보여줍니다.

  • 함수 및 해당 단위 테스트를 구성하는 방법.
  • 단위 테스트에 맞춰 잘 설계된 SQL의 사용자 정의 함수뿐만 아니라 Python, R, Scala에서 함수를 작성하는 방법.
  • Python, R, Scala 및 SQL Notebook에서 이러한 함수를 호출하는 방법.
  • Python용 pytest, R용 testthat, Scala용 ScalaTest 등 자주 사용되는 테스트 프레임워크를 사용하여 Python, R 및 Scala에서 단위 테스트를 작성하는 방법. 또한 SQL UDF(사용자 정의 함수)를 테스트하는 SQL을 작성하는 방법.
  • Python, R, Scala 및 SQL Notebook에서 이러한 단위 테스트를 실행하는 방법.

함수 및 단위 테스트 구성

Notebook을 사용하여 함수 및 해당 단위 테스트를 구성하는 몇 가지 일반적인 방법이 있습니다. 각 방법에는 이점과 문제가 있습니다.

Python, R 및 Scala Notebook의 경우 일반적인 방법은 다음과 같습니다.

  • 함수 및 해당 단위 테스트를 Notebook 외부에 저장.
    • 이점: Notebook에서, 그리고 Notebook 외부에서 이러한 함수를 호출할 수 있습니다. 테스트 프레임워크는 Notebook 외부에서 테스트를 실행하는 데 적합하도록 설계되었습니다.
    • 문제: 이 방법은 Scala Notebook에서 지원되지 않습니다. 이 방법을 사용하면 추적 및 유지 관리할 파일 수도 증가합니다.
  • 함수를 하나의 Notebook에 저장하고 해당 단위 테스트를 별도의 Notebook에 저장.
    • 이점: 이러한 함수는 Notebook에서 재사용하기 쉽습니다.
    • 문제: 추적 및 유지 관리할 Notebook의 수가 증가합니다. 이러한 함수는 Notebook 외부에서 사용할 수 없습니다. 이러한 함수는 Notebook 외부에서 테스트하기가 더 어려울 수도 있습니다.
  • 함수와 해당 단위 테스트를 동일한 Notebook 내에 저장.
    • 이점: 함수 및 해당 단위 테스트가 더욱 쉬운 추적 및 유지 관리를 위해 단일 Notebook 내에 저장됩니다.
    • 문제: 이러한 함수는 Notebook 전체에서 재사용이 더 어려울 수 있습니다. 이러한 함수는 Notebook 외부에서 사용할 수 없습니다. 이러한 함수는 Notebook 외부에서 테스트하기가 더 어려울 수도 있습니다.

Python 및 R Notebook의 경우 Databricks는 Notebook 외부에 함수 및 해당 단위 테스트를 저장하는 것을 권장합니다. Scala Notebook의 경우 Databricks는 하나의 Notebook에 함수를 포함하고 별도의 Notebook에 단위 테스트를 포함하는 것을 권장합니다.

SQL Notebook의 경우 Databricks는 함수를 스키마(데이터베이스라고도 함)의 SQL UDF(SQL 사용자 정의 함수)로 저장하는 것을 권장합니다. 이후 SQL Notebook에서 이러한 SQL UDF 및 해당 단위 테스트를 호출할 수 있습니다.

함수 작성

이 섹션에서는 다음을 결정하는 간단한 예제 함수 집합에 대해 설명합니다.

  • 테이블이 데이터베이스에 있는지 여부.
  • 열이 테이블에 있는지 여부.
  • 해당 열 내의 값에 대한 열에 존재하는 행의 수.

이러한 함수는 함수 자체에 집중하지 않고 이 문서의 단위 테스트 세부 정보에 집중할 수 있도록 간단해야 합니다.

최상의 단위 테스트 결과를 얻으려면 함수는 예측 가능한 단일 결과를 반환하고 단일 데이터 형식이어야 합니다. 예를 들어 어떤 항목이 있는지 확인하려면 함수가 true 또는 false의 부울 값을 반환해야 합니다. 존재하는 행의 수를 반환하려면 함수가 음수가 아닌 정수를 반환해야 합니다. 첫 번째 예제에서는 무언가 존재하지 않는 경우 false를 반환하거나 존재하는 경우 그 자체를 반환해서는 안 됩니다. 마찬가지로 두 번째 예제에서는 존재하는 행의 수를 반환하거나 행이 없는 경우 false를 반환해서는 안 됩니다.

Python, R, Scala 또는 SQL에서 다음과 같이 이러한 함수를 기존 Azure Databricks 작업 영역에 추가할 수 있습니다.

Python

다음 코드에서는 Databricks Git 폴더(Repos)를 설정하고, 리포지토리를 추가했으며, 리포지토리가 Azure Databricks 작업 영역에서 열려 있다고 가정합니다.

리포지토리 내에 이름이 myfunctions.py파일을 만들고 파일에 다음 콘텐츠를 추가합니다. 이 문서의 다른 예제에서는 이 파일의 이름을 myfunctions.py로 지정해야 합니다. 자체 파일에는 다른 이름을 사용할 수 있습니다.

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
                    .appName('integrity-tests') \
                    .getOrCreate()

# Does the specified table exist in the specified database?
def tableExists(tableName, dbName):
  return spark.catalog.tableExists(f"{dbName}.{tableName}")

# Does the specified column exist in the given DataFrame?
def columnExists(dataFrame, columnName):
  if columnName in dataFrame.columns:
    return True
  else:
    return False

# How many rows are there for the specified value in the specified column
# in the given DataFrame?
def numRowsInColumnForValue(dataFrame, columnName, columnValue):
  df = dataFrame.filter(col(columnName) == columnValue)

  return df.count()

R

다음 코드에서는 Databricks Git 폴더(Repos)를 설정하고, 리포지토리를 추가했으며, 리포지토리가 Azure Databricks 작업 영역에서 열려 있다고 가정합니다.

리포지토리 내에 이름이 myfunctions.r파일을 만들고 파일에 다음 콘텐츠를 추가합니다. 이 문서의 다른 예제에서는 이 파일의 이름을 myfunctions.r로 지정해야 합니다. 자체 파일에는 다른 이름을 사용할 수 있습니다.

library(SparkR)

# Does the specified table exist in the specified database?
table_exists <- function(table_name, db_name) {
  tableExists(paste(db_name, ".", table_name, sep = ""))
}

# Does the specified column exist in the given DataFrame?
column_exists <- function(dataframe, column_name) {
  column_name %in% colnames(dataframe)
}

# How many rows are there for the specified value in the specified column
# in the given DataFrame?
num_rows_in_column_for_value <- function(dataframe, column_name, column_value) {
  df = filter(dataframe, dataframe[[column_name]] == column_value)

  count(df)
}

Scala

다음 콘텐츠가 포함된 myfunctions Scala Notebook을 만듭니다. 이 문서의 다른 예제에서는 이 Notebook의 이름을 myfunctions로 지정해야 합니다. 자체 Notebook에는 다른 이름을 사용할 수 있습니다.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

// Does the specified table exist in the specified database?
def tableExists(tableName: String, dbName: String) : Boolean = {
  return spark.catalog.tableExists(dbName + "." + tableName)
}

// Does the specified column exist in the given DataFrame?
def columnExists(dataFrame: DataFrame, columnName: String) : Boolean = {
  val nameOfColumn = null

  for(nameOfColumn <- dataFrame.columns) {
    if (nameOfColumn == columnName) {
      return true
    }
  }

  return false
}

// How many rows are there for the specified value in the specified column
// in the given DataFrame?
def numRowsInColumnForValue(dataFrame: DataFrame, columnName: String, columnValue: String) : Long = {
  val df = dataFrame.filter(col(columnName) === columnValue)

  return df.count()
}

SQL

다음 코드에서는 Azure Databricks 작업 영역에서 액세스할 수 있는 main 카탈로그 내에 있는 default 스키마에 있는 타사 샘플 데이터 세트 다이아몬드가 있다고 가정합니다. 사용하려는 카탈로그 또는 스키마의 이름이 다른 경우 다음 USE 문 중 하나 또는 두 가지 모두를 일치하도록 변경합니다.

SQL Notebook을 만들고 이 새 Notebook에 다음 콘텐츠를 추가합니다. 이후 Notebook을 클러스터에 연결하고 Notebook을 실행하여 지정된 카탈로그 및 스키마에 다음 SQL UDF를 추가합니다.

참고 항목

SQL UDF table_existscolumn_exists는 Unity 카탈로그에서만 사용됩니다. Unity 카탈로그에 대한 SQL UDF 지원은 공개 미리 보기로 제공됩니다.

USE CATALOG main;
USE SCHEMA default;

CREATE OR REPLACE FUNCTION table_exists(catalog_name STRING,
                                        db_name      STRING,
                                        table_name   STRING)
  RETURNS BOOLEAN
  RETURN if(
    (SELECT count(*) FROM system.information_schema.tables
     WHERE table_catalog = table_exists.catalog_name
       AND table_schema  = table_exists.db_name
       AND table_name    = table_exists.table_name) > 0,
    true,
    false
  );

CREATE OR REPLACE FUNCTION column_exists(catalog_name STRING,
                                         db_name      STRING,
                                         table_name   STRING,
                                         column_name  STRING)
  RETURNS BOOLEAN
  RETURN if(
    (SELECT count(*) FROM system.information_schema.columns
     WHERE table_catalog = column_exists.catalog_name
       AND table_schema  = column_exists.db_name
       AND table_name    = column_exists.table_name
       AND column_name   = column_exists.column_name) > 0,
    true,
    false
  );

CREATE OR REPLACE FUNCTION num_rows_for_clarity_in_diamonds(clarity_value STRING)
  RETURNS BIGINT
  RETURN SELECT count(*)
         FROM main.default.diamonds
         WHERE clarity = clarity_value

함수 호출

이 섹션에서는 이전 함수를 호출하는 코드에 대해 설명합니다. 예를 들어 이러한 함수를 사용하여 지정된 값이 특정 열 내에 있는 테이블의 행 수를 계산할 수 있습니다. 하지만 계속하기 전에 테이블이 실제로 존재하는지 여부와 해당 테이블에 실제로 열이 있는지 확인하려고 합니다. 다음 코드는 이러한 조건을 확인합니다.

이전 섹션의 함수를 Azure Databricks 작업 영역에 추가한 경우 다음과 같이 작업 영역에서 이러한 함수를 호출할 수 있습니다.

Python

리포지토리의 이전 myfunctions.py 파일과 동일한 폴더에 Python Notebook을 만들고 Notebook에 다음 콘텐츠를 추가합니다. 필요한 경우 테이블 이름, 스키마(데이터베이스) 이름, 열 이름 및 열 값에 대한 변수 값을 변경합니다. 이후 Notebook을 클러스터에 연결하고 Notebook을 실행하여 결과를 확인합니다.

from myfunctions import *

tableName   = "diamonds"
dbName      = "default"
columnName  = "clarity"
columnValue = "VVS2"

# If the table exists in the specified database...
if tableExists(tableName, dbName):

  df = spark.sql(f"SELECT * FROM {dbName}.{tableName}")

  # And the specified column exists in that table...
  if columnExists(df, columnName):
    # Then report the number of rows for the specified value in that column.
    numRows = numRowsInColumnForValue(df, columnName, columnValue)

    print(f"There are {numRows} rows in '{tableName}' where '{columnName}' equals '{columnValue}'.")
  else:
    print(f"Column '{columnName}' does not exist in table '{tableName}' in schema (database) '{dbName}'.")
else:
  print(f"Table '{tableName}' does not exist in schema (database) '{dbName}'.") 

R

리포지토리의 이전 myfunctions.r 파일과 동일한 폴더에 R Notebook을 만들고 다음 콘텐츠를 Notebook에 추가합니다. 필요한 경우 테이블 이름, 스키마(데이터베이스) 이름, 열 이름 및 열 값에 대한 변수 값을 변경합니다. 이후 Notebook을 클러스터에 연결하고 Notebook을 실행하여 결과를 확인합니다.

library(SparkR)
source("myfunctions.r")

table_name   <- "diamonds"
db_name      <- "default"
column_name  <- "clarity"
column_value <- "VVS2"

# If the table exists in the specified database...
if (table_exists(table_name, db_name)) {

  df = sql(paste("SELECT * FROM ", db_name, ".", table_name, sep = ""))

  # And the specified column exists in that table...
  if (column_exists(df, column_name)) {
    # Then report the number of rows for the specified value in that column.
    num_rows = num_rows_in_column_for_value(df, column_name, column_value)

    print(paste("There are ", num_rows, " rows in table '", table_name, "' where '", column_name, "' equals '", column_value, "'.", sep = "")) 
  } else {
    print(paste("Column '", column_name, "' does not exist in table '", table_name, "' in schema (database) '", db_name, "'.", sep = ""))
  }

} else {
  print(paste("Table '", table_name, "' does not exist in schema (database) '", db_name, "'.", sep = ""))
}

Scala

Scala Notebook의 이전 myfunctions와 동일한 폴더에 다른 Scala Notebook을 만들고 다음 콘텐츠를 새 Notebook에 추가합니다.

이 새 Notebook의 첫 번째 셀에서 %run 매직을 호출하는 다음 코드를 추가합니다. 이 매직은 myfunctions Notebook의 콘텐츠를 새 노트북에 제공합니다.

%run ./myfunctions

새 Notebook의 두 번째 셀에서 다음 코드를 추가합니다. 필요한 경우 테이블 이름, 스키마(데이터베이스) 이름, 열 이름 및 열 값에 대한 변수 값을 변경합니다. 이후 Notebook을 클러스터에 연결하고 Notebook을 실행하여 결과를 확인합니다.

val tableName   = "diamonds"
val dbName      = "default"
val columnName  = "clarity"
val columnValue = "VVS2"

// If the table exists in the specified database...
if (tableExists(tableName, dbName)) {

  val df = spark.sql("SELECT * FROM " + dbName + "." + tableName)

  // And the specified column exists in that table...
  if (columnExists(df, columnName)) {
    // Then report the number of rows for the specified value in that column.
    val numRows = numRowsInColumnForValue(df, columnName, columnValue)

    println("There are " + numRows + " rows in '" + tableName + "' where '" + columnName + "' equals '" + columnValue + "'.")
  } else {
    println("Column '" + columnName + "' does not exist in table '" + tableName + "' in database '" + dbName + "'.")
  }

} else {
  println("Table '" + tableName + "' does not exist in database '" + dbName + "'.")
}

SQL

이전 Notebook의 새 셀 또는 별도의 Notebook에 있는 셀에 다음 코드를 추가합니다. 필요한 경우 스키마 또는 카탈로그 이름이 일치하도록 변경한 다음, 이 셀을 실행하여 결과를 확인합니다.

SELECT CASE
-- If the table exists in the specified catalog and schema...
WHEN
  table_exists("main", "default", "diamonds")
THEN
  -- And the specified column exists in that table...
  (SELECT CASE
   WHEN
     column_exists("main", "default", "diamonds", "clarity")
   THEN
     -- Then report the number of rows for the specified value in that column.
     printf("There are %d rows in table 'main.default.diamonds' where 'clarity' equals 'VVS2'.",
            num_rows_for_clarity_in_diamonds("VVS2"))
   ELSE
     printf("Column 'clarity' does not exist in table 'main.default.diamonds'.")
   END)
ELSE
  printf("Table 'main.default.diamonds' does not exist.")
END

단위 테스트 작성

이 섹션에서는 이 문서의 시작 부분에 설명된 각 함수를 테스트하는 코드를 설명합니다. 나중에 함수를 변경하는 경우 단위 테스트를 사용하여 해당 함수가 예상대로 작동하는지 여부를 확인할 수 있습니다.

이 문서의 시작 부분에 함수를 Azure Databricks 작업 영역에 추가한 경우 다음과 같이 이러한 함수에 대한 단위 테스트를 작업 영역에 추가할 수 있습니다.

Python

리포지토리의 이전 myfunctions.py 파일과 동일한 폴더에 test_myfunctions.py 파일을 만들고 다음 콘텐츠를 Notebook에 추가합니다. 기본적으로 pytest는 이름이 test_로 시작하거나 _test로 끝나는 .py 파일을 찾아 테스트합니다. 마찬가지로 기본적으로 pytest는 이름이 test_로 시작하는 함수에 대한 파일 내부를 살펴보고 테스트합니다.

일반적으로 프로덕션에서 데이터를 사용하는 함수에 대해 단위 테스트를 실행하지 않는 것이 좋습니다. 이는 데이터를 추가, 제거 또는 변경하는 함수에 있어 특히 중요합니다. 예기치 않은 방식의 단위 테스트로 인해 프로덕션 데이터가 손상되지 않도록 보호하려면 비프로덕션 데이터에 대해 단위 테스트를 실행해야 합니다. 한 가지 일반적인 방법은 프로덕션 데이터에 최대한 가까운 가짜 데이터를 만드는 것입니다. 다음 코드 예제에서는 실행할 단위 테스트에 대한 가짜 데이터를 만듭니다.

import pytest
import pyspark
from myfunctions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

tableName    = "diamonds"
dbName       = "default"
columnName   = "clarity"
columnValue  = "SI2"

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
                    .appName('integrity-tests') \
                    .getOrCreate()

# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema = StructType([ \
  StructField("_c0",     IntegerType(), True), \
  StructField("carat",   FloatType(),   True), \
  StructField("cut",     StringType(),  True), \
  StructField("color",   StringType(),  True), \
  StructField("clarity", StringType(),  True), \
  StructField("depth",   FloatType(),   True), \
  StructField("table",   IntegerType(), True), \
  StructField("price",   IntegerType(), True), \
  StructField("x",       FloatType(),   True), \
  StructField("y",       FloatType(),   True), \
  StructField("z",       FloatType(),   True), \
])

data = [ (1, 0.23, "Ideal",   "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43 ), \
         (2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31 ) ]

df = spark.createDataFrame(data, schema)

# Does the table exist?
def test_tableExists():
  assert tableExists(tableName, dbName) is True

# Does the column exist?
def test_columnExists():
  assert columnExists(df, columnName) is True

# Is there at least one row for the value in the specified column?
def test_numRowsInColumnForValue():
  assert numRowsInColumnForValue(df, columnName, columnValue) > 0

R

리포지토리의 이전 myfunctions.r 파일과 동일한 폴더에 test_myfunctions.r 파일을 만들고 다음 콘텐츠를 Notebook에 추가합니다. 기본적으로 testthat는 이름이 test로 시작하는 .r 파일을 찾아 테스트합니다.

일반적으로 프로덕션에서 데이터를 사용하는 함수에 대해 단위 테스트를 실행하지 않는 것이 좋습니다. 이는 데이터를 추가, 제거 또는 변경하는 함수에 있어 특히 중요합니다. 예기치 않은 방식의 단위 테스트로 인해 프로덕션 데이터가 손상되지 않도록 보호하려면 비프로덕션 데이터에 대해 단위 테스트를 실행해야 합니다. 한 가지 일반적인 방법은 프로덕션 데이터에 최대한 가까운 가짜 데이터를 만드는 것입니다. 다음 코드 예제에서는 실행할 단위 테스트에 대한 가짜 데이터를 만듭니다.

library(testthat)
source("myfunctions.r")

table_name   <- "diamonds"
db_name      <- "default"
column_name  <- "clarity"
column_value <- "SI2"

# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema <- structType(
  structField("_c0",     "integer"),
  structField("carat",   "float"),
  structField("cut",     "string"),
  structField("color",   "string"),
  structField("clarity", "string"),
  structField("depth",   "float"),
  structField("table",   "integer"),
  structField("price",   "integer"),
  structField("x",       "float"),
  structField("y",       "float"),
  structField("z",       "float"))

data <- list(list(as.integer(1), 0.23, "Ideal",   "E", "SI2", 61.5, as.integer(55), as.integer(326), 3.95, 3.98, 2.43),
             list(as.integer(2), 0.21, "Premium", "E", "SI1", 59.8, as.integer(61), as.integer(326), 3.89, 3.84, 2.31))

df <- createDataFrame(data, schema)

# Does the table exist?
test_that ("The table exists.", {
  expect_true(table_exists(table_name, db_name))
})

# Does the column exist?
test_that ("The column exists in the table.", {
  expect_true(column_exists(df, column_name))
})

# Is there at least one row for the value in the specified column?
test_that ("There is at least one row in the query result.", {
  expect_true(num_rows_in_column_for_value(df, column_name, column_value) > 0)
})

Scala

Scala Notebook의 이전 myfunctions와 동일한 폴더에 다른 Scala Notebook을 만들고 다음 콘텐츠를 새 Notebook에 추가합니다.

이 새 Notebook의 첫 번째 셀에서 %run 매직을 호출하는 다음 코드를 추가합니다. 이 매직은 myfunctions Notebook의 콘텐츠를 새 노트북에 제공합니다.

%run ./myfunctions

두 번째 셀에서 다음 코드를 추가합니다. 이 코드는 단위 테스트를 정의하고 실행하는 방법을 지정합니다.

일반적으로 프로덕션에서 데이터를 사용하는 함수에 대해 단위 테스트를 실행하지 않는 것이 좋습니다. 이는 데이터를 추가, 제거 또는 변경하는 함수에 있어 특히 중요합니다. 예기치 않은 방식의 단위 테스트로 인해 프로덕션 데이터가 손상되지 않도록 보호하려면 비프로덕션 데이터에 대해 단위 테스트를 실행해야 합니다. 한 가지 일반적인 방법은 프로덕션 데이터에 최대한 가까운 가짜 데이터를 만드는 것입니다. 다음 코드 예제에서는 실행할 단위 테스트에 대한 가짜 데이터를 만듭니다.

import org.scalatest._
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, FloatType, StringType}
import scala.collection.JavaConverters._

class DataTests extends AsyncFunSuite {

  val tableName   = "diamonds"
  val dbName      = "default"
  val columnName  = "clarity"
  val columnValue = "SI2"

  // Create fake data for the unit tests to run against.
  // In general, it is a best practice to not run unit tests
  // against functions that work with data in production.
  val schema = StructType(Array(
                 StructField("_c0",     IntegerType),
                 StructField("carat",   FloatType),
                 StructField("cut",     StringType),
                 StructField("color",   StringType),
                 StructField("clarity", StringType),
                 StructField("depth",   FloatType),
                 StructField("table",   IntegerType),
                 StructField("price",   IntegerType),
                 StructField("x",       FloatType),
                 StructField("y",       FloatType),
                 StructField("z",       FloatType)
               ))

  val data = Seq(
                  Row(1, 0.23, "Ideal",   "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43),
                  Row(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31)
                ).asJava

  val df = spark.createDataFrame(data, schema)

  // Does the table exist?
  test("The table exists") {
    assert(tableExists(tableName, dbName) == true)
  }

  // Does the column exist?
  test("The column exists") {
    assert(columnExists(df, columnName) == true)
  }

  // Is there at least one row for the value in the specified column?
  test("There is at least one matching row") {
    assert(numRowsInColumnForValue(df, columnName, columnValue) > 0)
  }
}

nocolor.nodurations.nostacks.stats.run(new DataTests)

참고 항목

이 코드 예제에서는 ScalaTest에서 FunSuite 테스트 스타일을 사용합니다. 사용 가능한 다른 테스트 스타일은 프로젝트에 대한 테스트 스타일 선택을 참조하세요.

SQL

단위 테스트를 추가하기 전에 일반적으로 프로덕션의 데이터를 사용하는 함수에 대해 단위 테스트를 실행하지 않는 것이 좋습니다. 이는 데이터를 추가, 제거 또는 변경하는 함수에 있어 특히 중요합니다. 예기치 않은 방식의 단위 테스트로 인해 프로덕션 데이터가 손상되지 않도록 보호하려면 비프로덕션 데이터에 대해 단위 테스트를 실행해야 합니다. 한 가지 일반적인 방법은 테이블 대신 에 대해 단위 테스트를 실행하는 것입니다.

뷰를 만들려면 이전 Notebook 또는 별도의 Notebook의 새 셀에서 CREATE VIEW 명령을 호출할 수 있습니다. 다음 예제에서는 main이라는 카탈로그 내에서 이름이 default인 스키마(데이터베이스) 내에 이름이 diamonds인 기존 테이블이 있다고 가정합니다. 필요한 경우 이러한 이름을 사용자 고유의 이름과 일치하도록 변경한 다음 해당 셀만 실행합니다.

USE CATALOG main;
USE SCHEMA default;

CREATE VIEW view_diamonds AS
SELECT * FROM diamonds;

뷰를 만든 후 다음 SELECT 문을 각각 이전 Notebook의 새 셀 또는 별도의 Notebook에 있는 새 셀에 추가합니다. 필요한 경우 고유의 이름과 일치하도록 변경합니다.

SELECT if(table_exists("main", "default", "view_diamonds"),
          printf("PASS: The table 'main.default.view_diamonds' exists."),
          printf("FAIL: The table 'main.default.view_diamonds' does not exist."));

SELECT if(column_exists("main", "default", "view_diamonds", "clarity"),
          printf("PASS: The column 'clarity' exists in the table 'main.default.view_diamonds'."),
          printf("FAIL: The column 'clarity' does not exists in the table 'main.default.view_diamonds'."));

SELECT if(num_rows_for_clarity_in_diamonds("VVS2") > 0,
          printf("PASS: The table 'main.default.view_diamonds' has at least one row where the column 'clarity' equals 'VVS2'."),
          printf("FAIL: The table 'main.default.view_diamonds' does not have at least one row where the column 'clarity' equals 'VVS2'."));

단위 테스트 실행

이 섹션에서는 이전 섹션에서 코딩한 단위 테스트를 실행하는 방법을 설명합니다. 단위 테스트를 실행하면 통과 및 실패한 단위 테스트를 보여주는 결과가 표시됩니다.

이전 섹션의 단위 테스트를 Azure Databricks 작업 영역에 추가한 경우 작업 영역에서 이러한 단위 테스트를 실행할 수 있습니다. 단위 테스트를 수동으로 실행하거나 일정에 따라 실행할 수 있습니다.

Python

리포지토리의 이전 test_myfunctions.py 파일과 동일한 폴더에 Python Notebook을 만들고 다음 콘텐츠를 추가합니다.

이 새 Notebook의 첫 번째 셀에서 다음 코드를 추가하고, 셀을 실행하면 %pip 매직이 호출됩니다. 이 매직은 pytest를 설치합니다.

%pip install pytest

두 번째 셀에서 다음 코드를 추가한 다음 셀을 실행합니다. 결과에는 통과 및 실패한 단위 테스트가 표시됩니다.

import pytest
import sys

# Skip writing pyc files on a readonly filesystem.
sys.dont_write_bytecode = True

# Run pytest.
retcode = pytest.main([".", "-v", "-p", "no:cacheprovider"])

# Fail the cell execution if there are any test failures.
assert retcode == 0, "The pytest invocation failed. See the log for details."

R

리포지토리의 이전 test_myfunctions.r 파일과 동일한 폴더에 R Notebook을 만들고 다음 콘텐츠를 추가합니다.

첫 번째 셀에서 다음 코드를 추가하고, 셀을 실행하면 install.packages 함수가 호출됩니다. 이 함수는 testthat을 설치합니다.

install.packages("testthat")

두 번째 셀에서 다음 코드를 추가하고 셀을 실행합니다. 결과에는 통과 및 실패한 단위 테스트가 표시됩니다.

library(testthat)
source("myfunctions.r")

test_dir(".", reporter = "tap")

Scala

이전 섹션의 Notebook에서 첫 번째 셀과 두 번째 셀을 실행합니다. 결과에는 통과 및 실패한 단위 테스트가 표시됩니다.

SQL

이전 섹션의 Notebook에서 세 셀을 각각 실행합니다. 결과에는 각 단위 테스트가 통과 또는 실패했는지 여부가 표시됩니다.

단위 테스트를 실행한 후 더 이상 뷰가 필요하지 않은 경우 뷰를 삭제할 수 있습니다. 이 뷰를 삭제하려면 이전 Notebook 중 하나의 새 셀에 다음 코드를 추가한 다음 해당 셀만 실행할 수 있습니다.

DROP VIEW view_diamonds;

클러스터의 드라이버 로그에서 Notebook 실행 결과(단위 테스트 결과 포함)를 볼 수 있습니다. 클러스터의 로그 전달 위치를 지정할 수도 있습니다.

GitHub Actions와 같은 CI/CD(연속 통합 및 지속적인 업데이트 또는 배포) 시스템을 설정하여 코드가 변경될 때마다 단위 테스트를 자동으로 실행할 수 있습니다. 예를 들어 Notebook에 대한 소프트웨어 엔지니어링 모범 사례에서 GitHub Actions 적용 범위를 참조하세요.

추가 리소스

pytest

testthat

ScalaTest

SQL