Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Potoki danych stanowią podstawę wielu rozwiązań do analizy danych. Jak sugeruje nazwa, potok danych pobiera nieprzetworzone dane, czyści i zmienia je zgodnie z potrzebami, a następnie zazwyczaj wykonuje obliczenia lub agregacje przed zapisaniem przetworzonych danych. Przetworzone dane są wykorzystywane przez klientów, raporty lub interfejsy API. Potok danych musi dostarczać powtarzalne wyniki, zarówno zgodnie z harmonogramem, jak i po wyzwoleniu przez nowe dane.
W tym artykule opisano sposób operacjonalizacji potoków danych w celu zapewnienia powtarzalności przy użyciu usługi Oozie działającego w klastrach hadoop usługi HDInsight. Przykładowy scenariusz przeprowadzi Cię przez potok danych, który przygotowuje i przetwarza dane szeregów czasowych lotów lotniczych.
W poniższym scenariuszu dane wejściowe są plikiem płaskim zawierającym partię danych lotniczych za jeden miesiąc. Te dane lotu obejmują informacje, takie jak lotnisko początkowe i docelowe, mile latane, czasy odlotu i przylotu itd. Celem tego procesu jest podsumowanie codziennych wyników linii lotniczych, gdzie każda linia lotnicza ma jeden wiersz dla każdego dnia ze średnim opóźnieniem odlotu i przylotu w minutach oraz łączną liczbą mil przebyta tego dnia.
YEAR | MIESIĄC | DZIEŃ_MIESIĄCA | PRZEWOŹNIK | ŚREDNIE OPÓŹNIENIE ODLOTU | Średnie_Opóźnienie_Przyjazdu | CAŁKOWITA_ODLEGŁOŚĆ |
---|---|---|---|---|---|---|
2017 | 1 | 3 | AA | 10.142229 | 7.862926 | 2644539 |
2017 | 1 | 3 | AS | 9.435449 | 5.482143 | 572289 |
2017 | 1 | 3 | DL | 6.935409 | -2.1893024 | 1909696 |
Przykładowy potok czeka na nadejście danych lotu nowego okresu, a następnie przechowuje szczegółowe informacje o locie w magazynie danych apache Hive na potrzeby długoterminowych analiz. Potok tworzy również znacznie mniejszy zestaw danych, który podsumowuje tylko codzienne dane lotu. Te codzienne dane podsumowania lotów są wysyłane do usługi SQL Database w celu dostarczania raportów, takich jak witryna internetowa.
Na poniższym diagramie przedstawiono przykładowy potok.
Omówienie rozwiązania Apache Oozie
Ten pipeline wykorzystuje Apache Oozie działający na klastrze Hadoop HDInsight.
Oozie opisuje swoje potoki pod względem akcji, przepływów pracy i koordynatorów. Akcje określają rzeczywistą pracę do wykonania, taką jak uruchamianie zapytania hive. Przepływy pracy definiują sekwencję akcji. Koordynatorzy definiują harmonogram uruchamiania przepływu pracy. Koordynatorzy mogą również czekać na dostępność nowych danych przed uruchomieniem wystąpienia przepływu pracy.
Na poniższym diagramie przedstawiono ogólny zarys tego przykładowego potoku Oozie.
Aprowizuj zasoby platformy Azure
Ten przepływ wymaga usługi Azure SQL Database i klastra usługi HDInsight Hadoop w tej samej lokalizacji. Usługa Azure SQL Database przechowuje zarówno dane podsumowania utworzone przez potok, jak i magazyn metadanych Oozie.
Utwórz usługę Azure SQL Database
Tworzenie bazy danych Azure SQL Database. Zobacz Tworzenie bazy danych Azure SQL Database w witrynie Azure Portal.
Aby upewnić się, że klaster usługi HDInsight może uzyskać dostęp do połączonej usługi Azure SQL Database, skonfiguruj reguły zapory usługi Azure SQL Database, aby zezwolić usługom i zasobom platformy Azure na dostęp do serwera. Tę opcję można włączyć w witrynie Azure Portal, wybierając pozycję Ustaw zaporę serwera i wybierając pozycję WŁĄCZONE poniżej pozycji Zezwalaj usługom i zasobom platformy Azure na dostęp do tego serwera dla usługi Azure SQL Database. Aby uzyskać więcej informacji, zobacz Tworzenie reguł zapory bazujących na adresach IP i zarządzanie nimi.
Użyj Edytora zapytań do wykonania następujących poleceń SQL w celu utworzenia tabeli
dailyflights
, która będzie przechowywać podsumowane dane z każdego uruchomienia potoku.CREATE TABLE dailyflights ( YEAR INT, MONTH INT, DAY_OF_MONTH INT, CARRIER CHAR(2), AVG_DEP_DELAY FLOAT, AVG_ARR_DELAY FLOAT, TOTAL_DISTANCE FLOAT ) GO CREATE CLUSTERED INDEX dailyflights_clustered_index on dailyflights(YEAR,MONTH,DAY_OF_MONTH,CARRIER) GO
Usługa Azure SQL Database jest teraz gotowa.
Przygotuj klaster Apache Hadoop
Utwórz klaster Apache Hadoop z niestandardowym magazynem metadanych. Podczas tworzenia klastra z poziomu portalu na karcie Magazyn upewnij się, że wybrano bazę danych SQL Database w obszarze Ustawienia magazynu metadanych. Aby uzyskać więcej informacji na temat wybierania magazynu metadanych, zobacz Wybieranie niestandardowego magazynu metadanych podczas tworzenia klastra. Aby uzyskać więcej informacji na temat tworzenia klastra, zobacz Wprowadzenie do usługi HDInsight w systemie Linux.
Weryfikowanie konfiguracji tunelowania SSH
Aby wyświetlić stan wystąpień koordynatora i przepływu pracy za pomocą konsoli sieci Web Oozie, skonfiguruj tunel SSH w klastrze usługi HDInsight. Aby uzyskać więcej informacji, zobacz Tunel SSH.
Uwaga
Możesz również użyć przeglądarki Chrome z rozszerzeniem Foxy Proxy , aby przeglądać zasoby internetowe klastra w tunelu SSH. Skonfiguruj go, aby proxy kierowało wszystkie żądania przez hosta localhost
na porcie 9876 tunelu. Takie podejście jest zgodne z Podsystem Windows dla systemu Linux, znanym również jako Bash w systemie Windows 10.
Uruchom następujące polecenie, aby otworzyć tunel SSH w klastrze, gdzie
CLUSTERNAME
jest nazwą klastra:ssh -C2qTnNf -D 9876 sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Sprawdź, czy tunel działa, przechodząc do narzędzia Ambari w węźle głównym, przechodząc do:
http://headnodehost:8080
Aby uzyskać dostęp do konsoli sieci Web Oozie z poziomu systemu Ambari, przejdź do Oozie>Quick Links> [Aktywny serwer] >Oozie Web UI.
Konfigurowanie programu Hive
Przekazywanie danych
Pobierz przykładowy plik CSV zawierający dane lotu przez jeden miesiąc. Pobierz plik
2017-01-FlightData.zip
ZIP z repozytorium GitHub usługi HDInsight i rozpakuj go do pliku2017-01-FlightData.csv
CSV .Skopiuj ten plik CSV do konta usługi Azure Storage dołączonego do klastra usługi HDInsight i umieść go w folderze
/example/data/flights
.Użyj SCP, aby skopiować pliki z komputera lokalnego do lokalnego magazynu węzła głównego klastra HDInsight.
scp ./2017-01-FlightData.csv sshuser@CLUSTERNAME-ssh.azurehdinsight.net:2017-01-FlightData.csv
Użyj polecenia ssh, aby nawiązać połączenie z klastrem. Zmodyfikuj poniższe polecenie, zastępując
CLUSTERNAME
nazwą twojego klastra, a następnie wykonaj polecenie.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Z poziomu sesji SSH użyj polecenia HDFS, aby skopiować plik z magazynu lokalnego węzła głównego do usługi Azure Storage.
hadoop fs -mkdir /example/data/flights hdfs dfs -put ./2017-01-FlightData.csv /example/data/flights/2017-01-FlightData.csv
Utwórz tabele
Przykładowe dane są teraz dostępne. Jednak pipeline wymaga dwóch tabel Hive do przetwarzania, jedna dla danych przychodzących (rawFlights
) i jedna dla podsumowanych danych (flights
). Utwórz te tabele w narzędziu Ambari w następujący sposób.
Zaloguj się do systemu Ambari, przechodząc do
http://headnodehost:8080
.Z listy usług wybierz pozycję Hive.
Wybierz pozycję Przejdź do Widoku obok etykiety Widok Hive 2.0.
W obszarze tekstowym zapytania wklej następujące instrukcje, aby utworzyć tabelę
rawFlights
. TabelarawFlights
zawiera schemat do odczytu plików CSV w folderze/example/data/flights
w usłudze Azure Storage.CREATE EXTERNAL TABLE IF NOT EXISTS rawflights ( YEAR INT, MONTH INT, DAY_OF_MONTH INT, FL_DATE STRING, CARRIER STRING, FL_NUM STRING, ORIGIN STRING, DEST STRING, DEP_DELAY FLOAT, ARR_DELAY FLOAT, ACTUAL_ELAPSED_TIME FLOAT, DISTANCE FLOAT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( "separatorChar" = ",", "quoteChar" = "\"" ) LOCATION '/example/data/flights'
Wybierz pozycję Wykonaj , aby utworzyć tabelę.
Aby utworzyć tabelę
flights
, zastąp tekst w obszarze tekstu zapytania następującymi instrukcjami. Tabelaflights
jest tabelą zarządzaną przez program Hive, która partycjonuje dane załadowane do niej według roku, miesiąca i dnia miesiąca. Ta tabela będzie zawierać wszystkie dane historyczne dotyczące lotów, z najniższym poziomem szczegółowości w danych źródłowych jednego wiersza na lot.SET hive.exec.dynamic.partition.mode=nonstrict; CREATE TABLE flights ( FL_DATE STRING, CARRIER STRING, FL_NUM STRING, ORIGIN STRING, DEST STRING, DEP_DELAY FLOAT, ARR_DELAY FLOAT, ACTUAL_ELAPSED_TIME FLOAT, DISTANCE FLOAT ) PARTITIONED BY (YEAR INT, MONTH INT, DAY_OF_MONTH INT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( "separatorChar" = ",", "quoteChar" = "\"" );
Wybierz pozycję Wykonaj , aby utworzyć tabelę.
Tworzenie przepływu pracy Oozie
Potoki zwykle przetwarzają dane w partiach według danego interwału czasu. W takim przypadku system przetwarzania przetwarza dane lotu codziennie. Takie podejście umożliwia pobieranie wejściowych plików CSV codziennie, co tydzień, co miesiąc lub co rok.
Przykładowy przepływ pracy przetwarza dane lotu dziennie w trzech głównych krokach:
- Uruchom zapytanie programu Hive, aby wyodrębnić dane z zakresu dat tego dnia z źródłowego pliku CSV reprezentowanego przez
rawFlights
tabelę i wstawić dane doflights
tabeli. - Uruchom zapytanie programu Hive, aby dynamicznie utworzyć tabelę przemieszczania w programie Hive dla dnia, która zawiera kopię danych lotu podsumowanych według dnia i operatora.
- Użyj narzędzia Apache Sqoop, aby skopiować wszystkie dane z dziennej tabeli przejściowej w programie Hive do tabeli docelowej
dailyflights
w usłudze Azure SQL Database. Narzędzie Sqoop odczytuje wiersze źródłowe z danych znajdujących się w tabeli Hive znajdującej się w usłudze Azure Storage i ładuje je do usługi SQL Database przy użyciu połączenia JDBC.
Te trzy kroki są koordynowane przez przepływ pracy Oozie.
Na lokalnej stacji roboczej utwórz plik o nazwie
job.properties
. Użyj poniższego tekstu jako zawartości początkowej pliku. Następnie zaktualizuj wartości dla określonego środowiska. Poniższa tabela zawiera podsumowanie każdej właściwości i wskazuje, gdzie można znaleźć wartości dla własnego środowiska.nameNode=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net jobTracker=[ACTIVERESOURCEMANAGER]:8050 queueName=default oozie.use.system.libpath=true appBase=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie oozie.wf.application.path=${appBase}/load_flights_by_day hiveScriptLoadPartition=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-load-flights-partition.hql hiveScriptCreateDailyTable=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-create-daily-summary-table.hql hiveDailyTableName=dailyflights${year}${month}${day} hiveDataFolder=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/day/${year}/${month}/${day} sqlDatabaseConnectionString="jdbc:sqlserver://[SERVERNAME].database.windows.net;user=[USERNAME];password=[PASSWORD];database=[DATABASENAME]" sqlDatabaseTableName=dailyflights year=2017 month=01 day=03
Własność Źródło wartości nameNode Pełna ścieżka do kontenera usługi Azure Storage dołączonego do klastra usługi HDInsight. jobTracker Wewnętrzna nazwa hosta węzła głównego YARN w aktywnym klastrze. Na stronie głównej systemu Ambari wybierz pozycję YARN z listy usług, a następnie wybierz pozycję Active Resource Manager. Nazwa hosta w formie URI jest wyświetlana na górze strony. Dołącz port 8050. queueName Nazwa kolejki YARN używana podczas planowania akcji programu Hive. Pozostaw wartość domyślną. oozie.use.system.libpath Pozostaw wartość true. appBase Ścieżka do podfolderu w usłudze Azure Storage, w którym wdrażasz przepływ pracy Oozie i pliki pomocnicze. oozie.wf.application.path Lokalizacja przepływu pracy Oozie workflow.xml
do uruchomienia.hiveScriptLoadPartition Ścieżka w usłudze Azure Storage do pliku zapytania Hive hive-load-flights-partition.hql
.hiveScriptCreateDailyTable Ścieżka w usłudze Azure Storage do pliku zapytania Hive hive-create-daily-summary-table.hql
.hiveDailyTableName Generowana dynamicznie nazwa do użycia dla tabeli przejściowej. hiveDataFolder Ścieżka w usłudze Azure Storage do danych zawartych w tabeli przejściowej. ciąg połączeniowy bazy danych SQL Ciąg połączenia JDBC dla Twojej bazy danych Azure SQL. nazwaTabeliBazyDanychSql Nazwa tabeli w usłudze Azure SQL Database, do której są wstawione wiersze podsumowania. Pozostaw wartość dailyflights
.rok Składnik roku dnia, dla którego są obliczane podsumowania lotów. Pozostaw tak, jak jest. miesiąc Składnik miesiąca dnia, dla którego są obliczane podsumowania lotów. Pozostaw w takiej postaci, w jakiej jest. dzień Składnik dnia miesiąca, dla którego są obliczane podsumowania lotów. Pozostaw jak jest. Na lokalnej stacji roboczej utwórz plik o nazwie
hive-load-flights-partition.hql
. Użyj poniższego kodu jako zawartości pliku.SET hive.exec.dynamic.partition.mode=nonstrict; INSERT OVERWRITE TABLE flights PARTITION (YEAR, MONTH, DAY_OF_MONTH) SELECT FL_DATE, CARRIER, FL_NUM, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, ACTUAL_ELAPSED_TIME, DISTANCE, YEAR, MONTH, DAY_OF_MONTH FROM rawflights WHERE year = ${year} AND month = ${month} AND day_of_month = ${day};
Zmienne Oozie używają składni
${variableName}
. Te zmienne są ustawiane wjob.properties
pliku. Oozie zastępuje rzeczywiste wartości podczas wykonywania.Na lokalnej stacji roboczej utwórz plik o nazwie
hive-create-daily-summary-table.hql
. Użyj poniższego kodu jako zawartości pliku.DROP TABLE ${hiveTableName}; CREATE EXTERNAL TABLE ${hiveTableName} ( YEAR INT, MONTH INT, DAY_OF_MONTH INT, CARRIER STRING, AVG_DEP_DELAY FLOAT, AVG_ARR_DELAY FLOAT, TOTAL_DISTANCE FLOAT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION '${hiveDataFolder}'; INSERT OVERWRITE TABLE ${hiveTableName} SELECT year, month, day_of_month, carrier, avg(dep_delay) avg_dep_delay, avg(arr_delay) avg_arr_delay, sum(distance) total_distance FROM flights GROUP BY year, month, day_of_month, carrier HAVING year = ${year} AND month = ${month} AND day_of_month = ${day};
To zapytanie tworzy tabelę przejściową, która będzie przechowywać tylko podsumowane dane przez jeden dzień. Zanotuj instrukcję SELECT, która oblicza średnie opóźnienia i łączną odległość przepływaną przez operatora według dnia. Dane wstawione do tej tabeli przechowywane w znanej lokalizacji (ścieżka wskazywana przez zmienną hiveDataFolder), dzięki czemu mogą być używane jako źródło sqoop w następnym kroku.
Na lokalnej stacji roboczej utwórz plik o nazwie
workflow.xml
. Użyj poniższego kodu jako zawartości pliku. Powyższe kroki są wyrażane jako oddzielne akcje w pliku przepływu pracy Oozie.<workflow-app name="loadflightstable" xmlns="uri:oozie:workflow:0.5"> <start to = "RunHiveLoadFlightsScript"/> <action name="RunHiveLoadFlightsScript"> <hive xmlns="uri:oozie:hive-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <script>${hiveScriptLoadPartition}</script> <param>year=${year}</param> <param>month=${month}</param> <param>day=${day}</param> </hive> <ok to="RunHiveCreateDailyFlightTableScript"/> <error to="fail"/> </action> <action name="RunHiveCreateDailyFlightTableScript"> <hive xmlns="uri:oozie:hive-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <script>${hiveScriptCreateDailyTable}</script> <param>hiveTableName=${hiveDailyTableName}</param> <param>year=${year}</param> <param>month=${month}</param> <param>day=${day}</param> <param>hiveDataFolder=${hiveDataFolder}/${year}/${month}/${day}</param> </hive> <ok to="RunSqoopExport"/> <error to="fail"/> </action> <action name="RunSqoopExport"> <sqoop xmlns="uri:oozie:sqoop-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.compress.map.output</name> <value>true</value> </property> </configuration> <arg>export</arg> <arg>--connect</arg> <arg>${sqlDatabaseConnectionString}</arg> <arg>--table</arg> <arg>${sqlDatabaseTableName}</arg> <arg>--export-dir</arg> <arg>${hiveDataFolder}/${year}/${month}/${day}</arg> <arg>-m</arg> <arg>1</arg> <arg>--input-fields-terminated-by</arg> <arg>"\t"</arg> <archive>mssql-jdbc-7.0.0.jre8.jar</archive> </sqoop> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name="end"/> </workflow-app>
Do dwóch zapytań Hive uzyskuje się dostęp przez ich ścieżkę w usłudze Azure Storage, a pozostałe wartości zmiennych udostępnia plik job.properties
. Ten plik konfiguruje przepływ pracy do uruchomienia dla daty 3 stycznia 2017 r.
Wdrażanie i uruchamianie przepływu pracy Oozie
Użyj SCP z sesji bash, aby wdrożyć przepływ pracy Oozie (workflow.xml
), zapytania Hive (hive-load-flights-partition.hql
i hive-create-daily-summary-table.hql
) oraz konfigurację zadania (job.properties
). W usłudze Oozie tylko plik job.properties
może być przechowywany w lokalnym magazynie node główny. Wszystkie inne pliki muszą być przechowywane w systemie plików HDFS, w tym przypadku w usłudze Azure Storage. Akcja Sqoop używana przez przepływ pracy zależy od sterownika JDBC do komunikacji z Twoją bazą danych SQL, który musi zostać skopiowany z węzła głównego do systemu plików HDFS.
load_flights_by_day
Utwórz podfolder pod ścieżką użytkownika w lokalnym magazynie węzła głównego. W otwartej sesji SSH wykonaj następujące polecenie:mkdir load_flights_by_day
Skopiuj wszystkie pliki w bieżącym katalogu (pliki
workflow.xml
ijob.properties
) do podfolderuload_flights_by_day
. Na lokalnej stacji roboczej wykonaj następujące polecenie:scp ./* sshuser@CLUSTERNAME-ssh.azurehdinsight.net:load_flights_by_day
Skopiuj pliki przepływu pracy do systemu plików HDFS. W otwartej sesji SSH wykonaj następujące polecenia:
cd load_flights_by_day hadoop fs -mkdir -p /oozie/load_flights_by_day hdfs dfs -put ./* /oozie/load_flights_by_day
Skopiuj
mssql-jdbc-7.0.0.jre8.jar
z lokalnego węzła głównego do folderu przepływu pracy w systemie plików HDFS. Popraw polecenie zgodnie z potrzebami, jeśli klaster zawiera inny plik jar. Poprawworkflow.xml
według potrzeby, aby odzwierciedlał inny plik jar. W otwartej sesji SSH wykonaj następujące polecenie:hdfs dfs -put /usr/share/java/sqljdbc_7.0/enu/mssql-jdbc*.jar /oozie/load_flights_by_day
Uruchom przepływ pracy. W otwartej sesji SSH wykonaj następujące polecenie:
oozie job -config job.properties -run
Obserwuj stan przy użyciu konsoli sieci Web Oozie. W aplikacji Ambari wybierz Oozie, Szybkie łącza, a następnie Konsolę sieci Web Oozie. Na karcie Zadania przepływu pracy wybierz pozycję Wszystkie zadania.
Gdy stan to POWODZENIE, wykonaj zapytanie względem tabeli usługi SQL Database, aby wyświetlić wstawione wiersze. W portalu Azure przejdź do okienka dla bazy danych SQL, wybierz Narzędzia i otwórz Edytor zapytań.
SELECT * FROM dailyflights
Teraz, gdy przepływ pracy jest uruchomiony dla pojedynczego dnia testu, możesz opakować ten przepływ pracy za pomocą koordynatora, który planuje przepływ pracy, aby był uruchamiany codziennie.
Uruchom przepływ pracy z koordynatorem
Aby zaplanować ten przepływ pracy tak, aby był uruchamiany codziennie (lub wszystkie dni w zakresie dat), możesz użyć koordynatora. Koordynator jest definiowany przez plik XML, na przykład coordinator.xml
:
<coordinator-app name="daily_export" start="2017-01-01T00:00Z" end="2017-01-05T00:00Z" frequency="${coord:days(1)}" timezone="UTC" xmlns="uri:oozie:coordinator:0.4">
<datasets>
<dataset name="ds_input1" frequency="${coord:days(1)}" initial-instance="2016-12-31T00:00Z" timezone="UTC">
<uri-template>${sourceDataFolder}${YEAR}-${MONTH}-FlightData.csv</uri-template>
<done-flag></done-flag>
</dataset>
</datasets>
<input-events>
<data-in name="event_input1" dataset="ds_input1">
<instance>${coord:current(0)}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${appBase}/load_flights_by_day</app-path>
<configuration>
<property>
<name>year</name>
<value>${coord:formatTime(coord:nominalTime(), 'yyyy')}</value>
</property>
<property>
<name>month</name>
<value>${coord:formatTime(coord:nominalTime(), 'MM')}</value>
</property>
<property>
<name>day</name>
<value>${coord:formatTime(coord:nominalTime(), 'dd')}</value>
</property>
<property>
<name>hiveScriptLoadPartition</name>
<value>${hiveScriptLoadPartition}</value>
</property>
<property>
<name>hiveScriptCreateDailyTable</name>
<value>${hiveScriptCreateDailyTable}</value>
</property>
<property>
<name>hiveDailyTableNamePrefix</name>
<value>${hiveDailyTableNamePrefix}</value>
</property>
<property>
<name>hiveDailyTableName</name>
<value>${hiveDailyTableNamePrefix}${coord:formatTime(coord:nominalTime(), 'yyyy')}${coord:formatTime(coord:nominalTime(), 'MM')}${coord:formatTime(coord:nominalTime(), 'dd')}</value>
</property>
<property>
<name>hiveDataFolderPrefix</name>
<value>${hiveDataFolderPrefix}</value>
</property>
<property>
<name>hiveDataFolder</name>
<value>${hiveDataFolderPrefix}${coord:formatTime(coord:nominalTime(), 'yyyy')}/${coord:formatTime(coord:nominalTime(), 'MM')}/${coord:formatTime(coord:nominalTime(), 'dd')}</value>
</property>
<property>
<name>sqlDatabaseConnectionString</name>
<value>${sqlDatabaseConnectionString}</value>
</property>
<property>
<name>sqlDatabaseTableName</name>
<value>${sqlDatabaseTableName}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
Jak widać, większość koordynatora przekazuje tylko informacje o konfiguracji do wystąpienia przepływu pracy. Istnieje jednak kilka ważnych elementów, które należy podkreślić.
Punkt 1: Atrybuty
start
iend
w samym elemenciecoordinator-app
kontrolują interwał czasu, w którym działa koordynator.<coordinator-app ... start="2017-01-01T00:00Z" end="2017-01-05T00:00Z" frequency="${coord:days(1)}" ...>
Koordynator jest odpowiedzialny za planowanie działań w zakresie dat od
start
doend
, zgodnie z interwałem określonym przez atrybutfrequency
. Każda zaplanowana akcja z kolei uruchamia przepływ pracy zgodnie z konfiguracją. W powyższej definicji koordynatora, jest on skonfigurowany do wykonywania akcji od 1 stycznia 2017 r. do 5 stycznia 2017 r. Częstotliwość jest ustawiona na jeden dzień przez wyrażenie częstotliwości języka Wyrażeń Oozie. Powoduje to, że koordynator planuje działania (a tym samym przepływ pracy) raz dziennie. W przypadku zakresów dat, które znajdują się w przeszłości, tak jak w tym przykładzie, akcja zostanie uruchomiona bez opóźnień. Początek daty, od której zaplanowano uruchomienie akcji, jest nazywany godziną nominalną. Na przykład w celu przetworzenia danych z 1 stycznia 2017 r. koordynator zaplanuje akcję z nominalnym czasem 2017-01-01T00:00:00 GMT.Punkt 2: W zakresie dat przepływu pracy element
dataset
określa, gdzie szukać danych w systemie plików HDFS dla określonego zakresu dat, i konfiguruje sposób, w jaki Oozie określa, czy dane są już dostępne do przetworzenia.<dataset name="ds_input1" frequency="${coord:days(1)}" initial-instance="2016-12-31T00:00Z" timezone="UTC"> <uri-template>${sourceDataFolder}${YEAR}-${MONTH}-FlightData.csv</uri-template> <done-flag></done-flag> </dataset>
Ścieżka do danych w systemie plików HDFS jest tworzona dynamicznie zgodnie z wyrażeniem podanym w elemecie
uri-template
. W tym koordynatorze częstotliwość jednego dnia jest również używana z zestawem danych. Podczas gdy daty rozpoczęcia i zakończenia na elemencie koordynatora kontrolują, kiedy akcje są zaplanowane (i definiują ich nominalne czasy), elementyinitial-instance
ifrequency
w zestawie danych kontrolują obliczenie daty używanej przy konstruowaniuuri-template
. W takim przypadku ustaw początkowe wystąpienie na jeden dzień przed rozpoczęciem pracy koordynatora, aby upewnić się, że zostaną zebrane dane z pierwszego dnia (1 stycznia 2017 r.). Obliczanie daty zestawu danych zaczyna od wartościinitial-instance
(12/31/2016), posuwając się w przyrostach zgodnych z częstotliwością zestawu danych (jeden dzień), dopóki nie znajdzie najnowszej daty, która nie przekracza nominalnego czasu ustawionego przez koordynatora dla pierwszej akcji (2017-01-01T00:00:00 GMT).done-flag
Pusty element wskazuje, że gdy Oozie sprawdza obecność danych wejściowych w wyznaczonym czasie, Oozie określa dane, czy są dostępne przez obecność katalogu lub pliku. W takim przypadku jest to obecność pliku CSV. Jeśli obecny jest plik csv, Oozie zakłada, że dane są gotowe i uruchamia instancję przepływu pracy, aby przetworzyć plik. Jeśli nie ma pliku CSV, firma Oozie zakłada, że dane nie są jeszcze gotowe i uruchomienie przepływu pracy przechodzi w stan oczekiwania.Punkt 3. Element
data-in
określa określony znacznik czasu, który ma być używany jako nominalny czas podczas zastępowania wartości wuri-template
skojarzonym zestawie danych.<data-in name="event_input1" dataset="ds_input1"> <instance>${coord:current(0)}</instance> </data-in>
W takim przypadku ustaw wystąpienie na wyrażenie
${coord:current(0)}
, które przekłada się na użycie nominalnego czasu akcji zgodnie z harmonogramem koordynatora. Innymi słowy, gdy koordynator planuje uruchomienie akcji z nominalnym czasem 01.01.2017, wówczas 01.01.2017 zostaje użyty do zastąpienia wartości zmiennych YEAR (2017) i MONTH (01) w szablonie identyfikatora URI. Po obliczeniu szablonu identyfikatora URI dla tego wystąpienia usługa Oozie sprawdza, czy oczekiwany katalog lub plik jest dostępny i odpowiednio planuje następny przebieg przepływu pracy.
Trzy poprzednie punkty łączą się w celu uzyskania sytuacji, w której koordynator planuje przetwarzanie danych źródłowych w sposób codzienny.
Punkt 1: Koordynator zaczyna z nominalną datą 2017-01-01.
Punkt 2: Oozie szuka danych dostępnych w
sourceDataFolder/2017-01-FlightData.csv
.Punkt 3. Gdy Oozie wykryje ten plik, planuje instancję przepływu pracy, która będzie przetwarzać dane z 1 stycznia 2017 r. Oozie kontynuuje przetwarzanie na dzień 2017-01-02. Ta ocena jest powtarzana do (ale nie włącznie z) 2017-01-05.
Podobnie jak w przypadku przepływów pracy, konfiguracja koordynatora jest definiowana w pliku job.properties
, który zawiera rozszerzony zestaw ustawień używanych przez przepływ pracy.
nameNode=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net
jobTracker=[ACTIVERESOURCEMANAGER]:8050
queueName=default
oozie.use.system.libpath=true
appBase=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie
oozie.coord.application.path=${appBase}
sourceDataFolder=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/
hiveScriptLoadPartition=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-load-flights-partition.hql
hiveScriptCreateDailyTable=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-create-daily-summary-table.hql
hiveDailyTableNamePrefix=dailyflights
hiveDataFolderPrefix=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/day/
sqlDatabaseConnectionString="jdbc:sqlserver://[SERVERNAME].database.windows.net;user=[USERNAME];password=[PASSWORD];database=[DATABASENAME]"
sqlDatabaseTableName=dailyflights
Jedyne nowe właściwości wprowadzone w tym job.properties
pliku to:
Własność | Źródło wartości |
---|---|
oozie.coord.application.path | Wskazuje lokalizację coordinator.xml pliku zawierającego koordynatora Oozie do uruchomienia. |
hiveDailyTableNamePrefix | Prefiks używany podczas dynamicznego tworzenia nazwy tabeli przejściowej. |
hiveDataFolderPrefix | Prefiks ścieżki, w której będą przechowywane wszystkie tabele przejściowe. |
Wdrażanie i uruchamianie koordynatora Oozie
Aby uruchomić rurociąg z koordynatorem, postępuj podobnie jak w przypadku przepływu pracy, z tą różnicą, że pracujesz z folderu leżącego o jeden poziom wyżej niż folder zawierający przepływ pracy. Ta konwencja folderów oddziela koordynatorów od przepływów pracy na dysku, dzięki czemu można skojarzyć jednego koordynatora z różnymi podrzędnymi przepływami pracy.
Użyj usługi SCP z komputera lokalnego, aby skopiować pliki koordynatora do lokalnego magazynu węzła głównego klastra.
scp ./* sshuser@CLUSTERNAME-ssh.azurehdinsight.net:~
Połączenie SSH z węzłem głównym.
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Skopiuj pliki koordynatora do systemu plików HDFS.
hdfs dfs -put ./* /oozie/
Uruchom koordynatora.
oozie job -config job.properties -run
Sprawdź stan przy użyciu konsoli sieci Web Oozie, tym razem wybierając kartę Zadania koordynatora, a następnie pozycję Wszystkie zadania.
Wybierz wystąpienie koordynatora, aby wyświetlić listę zaplanowanych działań. W takim przypadku powinny zostać wyświetlone cztery akcje z nominalnymi czasami w zakresie od 1 stycznia 2017 r. do 4 stycznia 2017 r.
Każda akcja na tej liście odpowiada wystąpieniu przepływu pracy, które przetwarza dane o wartości jednego dnia, gdzie początek tego dnia jest wskazywany przez nominalny czas.