チュートリアル:Azure HDInsight で対話型クエリを使用してデータの抽出、変換、読み込みを行う
このチュートリアルでは、一般に公開されているフライト データの生の CSV データ ファイルをダウンロードします。 それを HDInsight クラスター ストレージにインポートしてから、Azure HDInsight の Interactive Query を使用してデータを変換します。 データを変換したら、Apache Sqoop を使用して Azure SQL Database のデータベースにデータを読み込みます。
このチュートリアルに含まれるタスクは次のとおりです。
- サンプルのフライト データをダウンロードする
- HDInsight クラスターにデータをアップロードする
- 対話型クエリを使用してデータを変換する
- Azure SQL Database のデータベースにテーブルを作成する
- Sqoop を使用して Azure SQL Database のデータベースにデータをエクスポートする
前提条件
HDInsight 上の対話型クエリ クラスター。 Azure portal を使用した Apache Hadoop クラスターの作成に関するページを参照して、 [クラスターの種類] として [対話型クエリ] を選択します。
Azure SQL Database 内のデータベース。 保存先データ ストアとしてデータベースを使用します。 Azure SQL Database のデータベースがない場合は、Azure portal での Azure SQL Database のデータベースの作成に関するページを参照してください。
SSH クライアント 詳細については、SSH を使用して HDInsight (Apache Hadoop) に接続する方法に関するページを参照してください。
フライト データのダウンロード
米国運輸省研究・革新技術庁/運輸統計局のページに移動します。
ページで、すべてのフィールドをクリアしてから、次の値を選択します。
名前 値 Filter Year 2019 Filter Period January フィールド Year, FlightDate, Reporting_Airline, DOT_ID_Reporting_Airline, Flight_Number_Reporting_Airline, OriginAirportID, Origin, OriginCityName, OriginState, DestAirportID, Dest, DestCityName, DestState, DepDelayMinutes, ArrDelay, ArrDelayMinutes, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay
.[Download] を選択します。 選択したデータ フィールドを含む .ZIP ファイルがダウンロードされます。
HDInsight クラスターにデータをアップロードする
HDInsight クラスターに関連付けられたストレージにデータをアップロードする方法はたくさんあります。 このセクションでは、scp
を使用してデータをアップロードします。 データをアップロードする他の方法については、HDInsight へのデータのアップロードに関する記事をご覧ください。
.zip ファイルを HDInsight クラスターのヘッド ノードにアップロードします。
FILENAME
を .zip ファイルの名前に、CLUSTERNAME
を HDInsight クラスターの名前に置き換えて、以下のコマンドを編集します。 その後、コマンド プロンプトを開き、ファイルの場所に作業ディレクトリを設定してから、コマンドを入力します。scp FILENAME.zip sshuser@CLUSTERNAME-ssh.azurehdinsight.net:FILENAME.zip
確認を求められたら、「yes」または「no」を入力して続行します。 入力するとき、ウィンドウにテキストは表示されません。
アップロードが完了したら、SSH を使用してクラスターに接続します。
CLUSTERNAME
を HDInsight クラスターの名前に置き換えて、以下のコマンドを編集します。 次のコマンドを入力します。ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
SSH 接続が確立されたら、環境変数を設定します。
FILE_NAME
、SQL_SERVERNAME
、SQL_DATABASE
、SQL_USER
、SQL_PASWORD
を適切な値に置き換えます。 その後、コマンドを入力します。export FILENAME=FILE_NAME export SQLSERVERNAME=SQL_SERVERNAME export DATABASE=SQL_DATABASE export SQLUSER=SQL_USER export SQLPASWORD='SQL_PASWORD'
以下のコマンドを入力して、.zip ファイルを解凍します。
unzip $FILENAME.zip
次のコマンドを入力し、HDInsight ストレージにディレクトリを作成してから、そのディレクトリに .csv ファイルをコピーします。
hdfs dfs -mkdir -p /tutorials/flightdelays/data hdfs dfs -put $FILENAME.csv /tutorials/flightdelays/data/
Hive クエリを使用したデータの変換
HDInsight クラスター上で Hive ジョブを実行する方法はたくさんあります。 このセクションでは、Beeline を使用して Hive ジョブを実行します。 Hive ジョブを実行するその他の方法については、HDInsight での Apache Hive の使用に関するページを参照してください。
Hive ジョブの一環として、.csv ファイルから Delays という名前の Hive テーブルにデータをインポートします。
HDInsight クラスター用に既に開いている SSH プロンプトから、次のコマンドを使用して flightdelays.hql という名前の新しいファイルを作成して編集します。
nano flightdelays.hql
このファイルの内容として、次のテキストを使用します。
DROP TABLE delays_raw; -- Creates an external table over the csv file CREATE EXTERNAL TABLE delays_raw ( YEAR string, FL_DATE string, UNIQUE_CARRIER string, CARRIER string, FL_NUM string, ORIGIN_AIRPORT_ID string, ORIGIN string, ORIGIN_CITY_NAME string, ORIGIN_CITY_NAME_TEMP string, ORIGIN_STATE_ABR string, DEST_AIRPORT_ID string, DEST string, DEST_CITY_NAME string, DEST_CITY_NAME_TEMP string, DEST_STATE_ABR string, DEP_DELAY_NEW float, ARR_DELAY_NEW float, CARRIER_DELAY float, WEATHER_DELAY float, NAS_DELAY float, SECURITY_DELAY float, LATE_AIRCRAFT_DELAY float) -- The following lines describe the format and location of the file ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION '/tutorials/flightdelays/data'; -- Drop the delays table if it exists DROP TABLE delays; -- Create the delays table and populate it with data -- pulled in from the CSV file (via the external table defined previously) CREATE TABLE delays AS SELECT YEAR AS year, FL_DATE AS flight_date, substring(UNIQUE_CARRIER, 2, length(UNIQUE_CARRIER) -1) AS unique_carrier, substring(CARRIER, 2, length(CARRIER) -1) AS carrier, substring(FL_NUM, 2, length(FL_NUM) -1) AS flight_num, ORIGIN_AIRPORT_ID AS origin_airport_id, substring(ORIGIN, 2, length(ORIGIN) -1) AS origin_airport_code, substring(ORIGIN_CITY_NAME, 2) AS origin_city_name, substring(ORIGIN_STATE_ABR, 2, length(ORIGIN_STATE_ABR) -1) AS origin_state_abr, DEST_AIRPORT_ID AS dest_airport_id, substring(DEST, 2, length(DEST) -1) AS dest_airport_code, substring(DEST_CITY_NAME,2) AS dest_city_name, substring(DEST_STATE_ABR, 2, length(DEST_STATE_ABR) -1) AS dest_state_abr, DEP_DELAY_NEW AS dep_delay_new, ARR_DELAY_NEW AS arr_delay_new, CARRIER_DELAY AS carrier_delay, WEATHER_DELAY AS weather_delay, NAS_DELAY AS nas_delay, SECURITY_DELAY AS security_delay, LATE_AIRCRAFT_DELAY AS late_aircraft_delay FROM delays_raw;
ファイルを保存するには、Ctrl + X キー、Y キー、Enter キーの順に押します。
Hive を起動し、flightdelays.hql ファイルを実行するには、次のコマンドを使用します。
beeline -u 'jdbc:hive2://localhost:10001/;transportMode=http' -f flightdelays.hql
flightdelays.hql スクリプトの実行が完了したら、次のコマンドを使用して対話型 Beeline セッションを開きます。
beeline -u 'jdbc:hive2://localhost:10001/;transportMode=http'
jdbc:hive2://localhost:10001/>
プロンプトが表示されたら、次のクエリを使用してインポートされたフライト遅延データからデータを取得します。INSERT OVERWRITE DIRECTORY '/tutorials/flightdelays/output' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' SELECT regexp_replace(origin_city_name, '''', ''), avg(weather_delay) FROM delays WHERE weather_delay IS NOT NULL GROUP BY origin_city_name;
このクエリにより、悪天候による遅延が発生した都市の一覧と平均遅延時間が取得され、
/tutorials/flightdelays/output
に保存されます。 その後、Sqoop がこの場所からデータを読み取り、Azure SQL Database にエクスポートします。Beeline を終了するには、プロンプトで「
!quit
」と入力します。
SQL データベース テーブルの作成
SQL Database に接続してテーブルを作成するには、多くの方法があります。 次の手順では、HDInsight クラスターから FreeTDS を使用します。
FreeTDS をインストールするには、クラスターへの開いている SSH 接続から、次のコマンドを使用します。
sudo apt-get --assume-yes install freetds-dev freetds-bin
インストールが完了したら、次のコマンドを使用して SQL Database に接続します。
TDSVER=8.0 tsql -H $SQLSERVERNAME.database.windows.net -U $SQLUSER -p 1433 -D $DATABASE -P $SQLPASWORD
次のテキストのような出力が返されます。
locale is "en_US.UTF-8" locale charset is "UTF-8" using default charset "UTF-8" Default database being set to <yourdatabase> 1>
1>
プロンプトで、以下の行を入力します。CREATE TABLE [dbo].[delays]( [origin_city_name] [nvarchar](50) NOT NULL, [weather_delay] float, CONSTRAINT [PK_delays] PRIMARY KEY CLUSTERED ([origin_city_name] ASC)) GO
GO
ステートメントを入力すると、前のステートメントが評価されます。 このステートメントにより、クラスター化インデックス付きの、delays という名前のテーブルが作成されます。次のクエリを使用して、テーブルが作成されたことを確認します。
SELECT * FROM information_schema.tables GO
出力は次のテキストのようになります。
TABLE_CATALOG TABLE_SCHEMA TABLE_NAME TABLE_TYPE databaseName dbo delays BASE TABLE
Enter
exit
at the1>
」と入力して、tsql ユーティリティを終了します。
Apache Sqoop を使用して SQL Database にデータをエクスポートする
前のセクションで、変換済みデータを /tutorials/flightdelays/output
にコピーしました。 このセクションでは、Sqoop を使用して、/tutorials/flightdelays/output
のデータを、Azure SQL Database に作成したテーブルにエクスポートします。
以下のコマンドを入力して、Sqoop で SQL データベースを認識できることを確認します。
sqoop list-databases --connect jdbc:sqlserver://$SQLSERVERNAME.database.windows.net:1433 --username $SQLUSER --password $SQLPASWORD
このコマンドにより、先ほど
delays
テーブルを作成したデータベースを含む、データベースのリストが返されます。以下のコマンドを入力して、
/tutorials/flightdelays/output
からdelays
テーブルにデータをエクスポートします。sqoop export --connect "jdbc:sqlserver://$SQLSERVERNAME.database.windows.net:1433;database=$DATABASE" --username $SQLUSER --password $SQLPASWORD --table 'delays' --export-dir '/tutorials/flightdelays/output' --fields-terminated-by '\t' -m 1
Sqoop が、
delays
テーブルを含むデータベースに接続され、/tutorials/flightdelays/output
ディレクトリからdelays
テーブルにデータがエクスポートされます。sqoop コマンドが完了した後、以下のコマンドを入力し、tsql ユーティリティを使ってデータベースに接続します。
TDSVER=8.0 tsql -H $SQLSERVERNAME.database.windows.net -U $SQLUSER -p 1433 -D $DATABASE -P $SQLPASWORD
次のステートメントを使って、データが delays テーブルにエクスポートされたことを確認します。
SELECT * FROM delays GO
テーブル内のデータの一覧が表示されます。 テーブルには、都市の名前と、その都市のフライトの平均遅延時間が含まれます。
「
exit
」と入力して、tsql ユーティリティを終了します。
リソースをクリーンアップする
チュートリアルを完了したら、必要に応じてクラスターを削除できます。 HDInsight を使用すると、データは Azure Storage に格納されるため、クラスターは、使用されていない場合に安全に削除できます。 また、HDInsight クラスターは、使用していない場合でも課金されます。 クラスターの料金は Storage の料金の何倍にもなるため、クラスターを使用しない場合は削除するのが経済的にも合理的です。
クラスターを削除するには、「ブラウザー、PowerShell、または Azure CLI を使用して HDInsight クラスターを削除する」を参照してください。
次のステップ
このチュートリアルでは、生の CSV データ ファイルを取得し、それを HDInsight クラスター ストレージにインポートしてから、Azure HDInsight で対話型クエリを使用してデータを変換しました。 次のチュートリアルに進んで、Apache Hive Warehouse Connector について確認してください。