Interaktív adatkonvergálás az Apache Sparkkal az Azure Machine Learningben
Az adatmeghatolás a gépi tanulási projektek egyik legfontosabb aspektusává válik. Az Azure Machine Learning-integráció és az Azure Synapse Analytics integrációja hozzáférést biztosít az Azure Synapse által támogatott Apache Spark-készlethez az Azure Machine Learning Notebookokat használó interaktív adatszervezéshez.
Ebből a cikkből megtudhatja, hogyan kezelheti az adatkonvergálást a
- Kiszolgáló nélküli Spark-számítás
- Csatolt Synapse Spark-készlet
Előfeltételek
- Azure-előfizetés; ha nem rendelkezik Azure-előfizetéssel, a kezdés előtt hozzon létre egy ingyenes fiókot .
- Egy Azure Machine Learning-munkaterület. További információért látogasson el a Munkaterület-erőforrások létrehozása webhelyre.
- Egy Azure Data Lake Storage (ADLS) Gen 2-tárfiók. További információért látogasson el az Azure Data Lake Storage (ADLS) Gen 2-tárfiókjának létrehozására.
- (Nem kötelező): Azure Key Vault. További információért látogasson el a Create an Azure Key Vault (Azure Key Vault létrehozása) webhelyre.
- (Nem kötelező): Szolgáltatásnév. További információért tekintse meg a Szolgáltatásnév létrehozása című témakört.
- (Nem kötelező): Csatolt Synapse Spark-készlet az Azure Machine Learning-munkaterületen.
Mielőtt elkezdené az adatátszervezési feladatokat, ismerkedjen meg a titkos kulcsok tárolásának folyamatával
- Azure Blob Storage-fiók hozzáférési kulcsa
- Közös hozzáférésű jogosultságkód (SAS) jogkivonat
- Az Azure Data Lake Storage (ADLS) Gen 2 szolgáltatásnévre vonatkozó információi
az Azure Key Vaultban. Azt is tudnia kell, hogyan kezelheti a szerepkör-hozzárendeléseket az Azure Storage-fiókokban. A dokumentum alábbi szakaszai ezeket a fogalmakat ismertetik. Ezután megismerkedünk az interaktív adatszervezés részleteivel az Azure Machine Learning Notebooks Spark-készleteinek használatával.
Tipp.
Ha többet szeretne megtudni az Azure Storage-fiók szerepkör-hozzárendelési konfigurációjáról, vagy ha felhasználói identitásátadással fér hozzá a tárfiókjaiban lévő adatokhoz, további információt a Szerepkör-hozzárendelések hozzáadása az Azure Storage-fiókokban című témakörben talál.
Interaktív adatkonvergálás az Apache Sparkkal
Az Azure Machine Learning notebookokban az Apache Sparkkal való interaktív adatkonvergáláshoz az Azure Machine Learning kiszolgáló nélküli Spark-számítást és csatolt Synapse Spark-készletet kínál. A kiszolgáló nélküli Spark-számításhoz nem szükséges erőforrások létrehozása az Azure Synapse-munkaterületen. Ehelyett egy teljes mértékben felügyelt kiszolgáló nélküli Spark-számítás közvetlenül elérhetővé válik az Azure Machine Learning Notebooksban. A kiszolgáló nélküli Spark-számítás használata a legegyszerűbb módja a Spark-fürtök elérésének az Azure Machine Learningben.
Kiszolgáló nélküli Spark-számítás az Azure Machine Learning Notebooksban
Alapértelmezés szerint egy kiszolgáló nélküli Spark-számítás érhető el az Azure Machine Learning Notebooksban. Ha egy jegyzetfüzetben szeretné elérni, válassza a Kiszolgáló nélküli Spark Compute lehetőséget az Azure Machine Learning Kiszolgáló nélküli Spark alatt a Számítási kijelölés menüben.
A Jegyzetfüzetek felhasználói felülete a Spark-munkamenet konfigurálásának lehetőségeit is biztosítja a kiszolgáló nélküli Spark-számításhoz. Spark-munkamenet konfigurálása:
- Válassza a munkamenet konfigurálása lehetőséget a képernyő tetején.
- Válassza ki az Apache Spark-verziót a legördülő menüből.
Fontos
Azure Synapse Runtime for Apache Spark: Announcements
- Azure Synapse Runtime for Apache Spark 3.2:
- EOLA közlemény dátuma: 2023. július 8.
- Támogatási dátum vége: 2024. július 8. A dátum után a futtatókörnyezet le lesz tiltva.
- Apache Spark 3.3:
- EOLA közlemény dátuma: 2024. július 12.
- Támogatási dátum vége: 2025. március 31. A dátum után a futtatókörnyezet le lesz tiltva.
- A folyamatos támogatás és az optimális teljesítmény érdekében javasoljuk az Apache Spark 3.4-be való migrálást
- Azure Synapse Runtime for Apache Spark 3.2:
- Válassza a Példánytípus lehetőséget a legördülő menüben. Jelenleg az alábbi típusok támogatottak:
Standard_E4s_v3
Standard_E8s_v3
Standard_E16s_v3
Standard_E32s_v3
Standard_E64s_v3
- Adjon meg egy Spark-munkamenet időtúllépési értékét percben.
- Válassza ki, hogy dinamikusan lefoglalja-e a végrehajtókat
- Válassza ki a Spark-munkamenet végrehajtóinak számát.
- Válassza a Végrehajtó mérete lehetőséget a legördülő menüben.
- Válassza az Illesztőprogram mérete lehetőséget a legördülő menüben.
- Ha Conda-fájlt szeretne használni a Spark-munkamenet konfigurálásához, jelölje be a Conda-fájl feltöltése jelölőnégyzetet. Ezután válassza a Tallózás lehetőséget, és válassza ki a Kívánt Spark-munkamenet-konfigurációval rendelkező Conda-fájlt.
- Adja hozzá a konfigurációs beállítások tulajdonságait, a bemeneti értékeket a Tulajdonság és az Érték szövegmezőkben, és válassza a Hozzáadás lehetőséget.
- Válassza az Alkalmazás lehetőséget.
- Az Új munkamenet konfigurálása? előugró ablakban válassza a Munkamenet leállítása lehetőséget.
A munkamenetkonfiguráció módosításai megmaradnak, és elérhetővé válnak egy másik jegyzetfüzet-munkamenet számára, amely a kiszolgáló nélküli Spark-számítással kezdődött.
Tipp.
Ha munkamenetszintű Conda-csomagokat használ, javíthatja a Spark-munkamenet hideg kezdési idejét, ha igazra állítja a konfigurációs változótspark.hadoop.aml.enable_cache
. A munkamenet hideg kezdése a munkamenetszintű Conda-csomagokkal általában 10–15 percet vesz igénybe, amikor a munkamenet első alkalommal elindul. Az ezt követő munkamenet hidegben azonban a konfigurációs változó igaz értékre állításával kezdődik, amely általában 3-5 percet vesz igénybe.
Adatok importálása és leküldése az Azure Data Lake Storage-ból (ADLS) Gen 2
Az Azure Data Lake Storage (ADLS) Gen 2 storage-fiókokban tárolt adatokat adat URL-címekkel abfss://
érheti el és helyezheti el. Ehhez a két adatelérési mechanizmus egyikét kell követnie:
- Felhasználói identitás átadása
- Szolgáltatásnév-alapú adathozzáférés
Tipp.
A kiszolgáló nélküli Spark-számítással való adatátengedés és a felhasználói identitás átadása az Azure Data Lake Storage (ADLS) Gen 2 storage-fiók adatainak eléréséhez a legkisebb számú konfigurációs lépést igényli.
Az interaktív adatátengedés elindítása a felhasználói identitás átadásával:
Ellenőrizze, hogy a felhasználói identitás rendelkezik-e közreműködői és tárolási blobadat-közreműködői szerepkör-hozzárendelésekkel az Azure Data Lake Storage (ADLS) Gen 2 storage-fiókban.
A kiszolgáló nélküli Spark-számítás használatához válassza a Kiszolgáló nélküli Spark-számítást az Azure Machine Learning Kiszolgáló nélküli Spark alatt a Számítás kiválasztása menüből.
Csatolt Synapse Spark-készlet használatához válasszon ki egy csatolt Synapse Spark-készletet a Synapse Spark-készletek alatt a Számítási kijelölés menüből.
Ez a Titanic-adatküldő kódminta egy adat URI használatát mutatja be a következő
pyspark.pandas
formátumbanabfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>
: éspyspark.ml.feature.Imputer
.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled", index_col="PassengerId", )
Feljegyzés
Ez a Python-kódminta a következőt használja
pyspark.pandas
: . Ezt csak a Spark 3.2-es vagy újabb verziója támogatja.
Adatok leküldése szolgáltatásnéven keresztüli hozzáféréssel:
Ellenőrizze, hogy a szolgáltatásnév rendelkezik-e közreműködői és tárolási blobadat-közreműködői szerepkör-hozzárendelésekkel az Azure Data Lake Storage (ADLS) Gen 2 storage-fiókban.
Azure Key Vault-titkos kulcsokat hozhat létre a szolgáltatásnév bérlőazonosítójának, ügyfélazonosítójának és titkos ügyfélkulcs-értékeinek.
A Számítás kiválasztása menüben válassza a Kiszolgáló nélküli Spark-számítás lehetőséget az Azure Machine Learning Kiszolgáló nélküli Spark alatt. A Számítási kijelölés menüben a Synapse Spark-készletek alatt is kiválaszthat egy csatolt Synapse Spark-készletet.
Állítsa be a szolgáltatásnév bérlőazonosítóját, ügyfél-azonosítóját és titkos ügyfélkulcs-értékeit a konfigurációban, és hajtsa végre a következő kódmintát.
A
get_secret()
kódban szereplő hívás az Azure Key Vault nevétől és a szolgáltatásnév bérlőazonosítója, ügyfélazonosítója és ügyféltitkához létrehozott Azure Key Vault-titkos kulcsok nevétől függ. Adja meg a megfelelő tulajdonságnevet/értékeket a konfigurációban:- Ügyfélazonosító tulajdonság:
fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Titkos ügyféltulajdonság:
fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Bérlőazonosító tulajdonság:
fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Bérlőazonosító értéke:
https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary # Set up service principal tenant ID, client ID and secret from Azure Key Vault client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>") tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>") client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>") # Set up service principal which has access of the data sc._jsc.hadoopConfiguration().set( "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth" ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", client_id, ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", client_secret, ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token", )
- Ügyfélazonosító tulajdonság:
A Titanic-adatok használatával importálja és leküldi az adatokat az adat URI-jának használatával a
abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>
kódmintában látható módon.
Adatok importálása és leküldése az Azure Blob Storage-ból
Az Azure Blob Storage-adatokat a tárfiók hozzáférési kulcsával vagy egy közös hozzáférésű jogosultságkód (SAS) jogkivonattal érheti el. Ezeket a hitelesítő adatokat titkos kulcsként kell tárolnia az Azure Key Vaultban, és tulajdonságokként kell beállítania őket a munkamenet-konfigurációban.
Interaktív adatmegrendezés indítása:
Az Azure Machine Learning Studio bal oldali paneljén válassza a Jegyzetfüzetek lehetőséget.
A Számítás kiválasztása menüben válassza a Kiszolgáló nélküli Spark-számítás lehetőséget az Azure Machine Learning Kiszolgáló nélküli Spark alatt. A Számítási kijelölés menüben a Synapse Spark-készletek alatt is kiválaszthat egy csatolt Synapse Spark-készletet.
A tárfiók hozzáférési kulcsának vagy a közös hozzáférésű jogosultságkód (SAS) jogkivonatának konfigurálása az Azure Machine Learning-jegyzetfüzetekben való adathozzáféréshez:
A hozzáférési kulcshoz állítsa be a
fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net
tulajdonságot a kódrészletben látható módon:from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>") sc._jsc.hadoopConfiguration().set( "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key )
Az SAS-jogkivonathoz állítsa be a
fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net
tulajdonságot az alábbi kódrészletben látható módon:from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>") sc._jsc.hadoopConfiguration().set( "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", sas_token, )
Feljegyzés
A
get_secret()
korábbi kódrészletekben szereplő hívásokhoz szükség van az Azure Key Vault nevére, valamint az Azure Blob Storage-fiók hozzáférési kulcsához vagy SAS-jogkivonatához létrehozott titkos kódok nevére.
Hajtsa végre az adatküldő kódot ugyanabban a jegyzetfüzetben. Formázza az adat URI-t
wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>
a kódrészlethez hasonlóan:import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled", index_col="PassengerId", )
Feljegyzés
Ez a Python-kódminta a következőt használja
pyspark.pandas
: . Ezt csak a Spark 3.2-es vagy újabb verziója támogatja.
Adatok importálása és leküldése az Azure Machine Learning Datastore-ból
Ha az Azure Machine Learning Datastore-ból szeretne adatokat elérni, adjon meg egy elérési utat az adattár adataihoz URI formátumbanazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>
. Az Azure Machine Learning Datastore adatainak interaktív módon történő leküldése egy jegyzetfüzet-munkamenetben:
Válassza a Kiszolgáló nélküli Spark-számítást az Azure Machine Learning Kiszolgáló nélküli Spark területén a Számítási kijelölés menüből, vagy válasszon egy csatolt Synapse Spark-készletet a Synapse Spark-készletek alatt a Számítási kijelölés menüből.
Ez a kódminta bemutatja, hogyan olvashatók be és távolíthat el Titanic-adatokat egy Azure Machine Learning Datastore-ból az adattár URI
pyspark.ml.feature.Imputer
pyspark.pandas
ésazureml://
.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "azureml://datastores/workspaceblobstore/paths/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "azureml://datastores/workspaceblobstore/paths/data/wrangled", index_col="PassengerId", )
Feljegyzés
Ez a Python-kódminta a következőt használja
pyspark.pandas
: . Ezt csak a Spark 3.2-es vagy újabb verziója támogatja.
Az Azure Machine Learning-adattárak az Azure Storage-fiók hitelesítő adataival férhetnek hozzá az adatokhoz
- hozzáférési kulcs
- SAS-jogkivonat
- szolgáltatásnév
vagy hitelesítőadat-ritkábban használnak adathozzáférést. Az adattár típusától és az alapul szolgáló Azure Storage-fiók típusától függően válasszon ki egy megfelelő hitelesítési mechanizmust az adathozzáférés biztosításához. Ez a táblázat az Azure Machine Learning-adattárakban lévő adatokhoz való hozzáférés hitelesítési mechanizmusait foglalja össze:
* A felhasználói identitás átadása az Azure Blob Storage-fiókokra mutató hitelesítő adatok nélküli adattárak esetében működik, csak akkor, ha a helyreállítható törlés nincs engedélyezve.
Adatok elérése az alapértelmezett fájlmegosztáson
Az alapértelmezett fájlmegosztás a kiszolgáló nélküli Spark-számításhoz és a csatolt Synapse Spark-készletekhez is csatlakoztatva van.
Az Azure Machine Learning Studióban az alapértelmezett fájlmegosztásban lévő fájlok a Fájlok lap könyvtárfája alatt jelennek meg. A jegyzetfüzetkód további konfigurációk nélkül közvetlenül elérheti az ebben a fájlmegosztásban tárolt fájlokat a file://
protokollal, valamint a fájl abszolút elérési útját. Ez a kódrészlet bemutatja, hogyan férhet hozzá az alapértelmezett fájlmegosztáson tárolt fájlokhoz:
import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer
abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
inputCols=["Age"],
outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")
Feljegyzés
Ez a Python-kódminta a következőt használja pyspark.pandas
: . Ezt csak a Spark 3.2-es vagy újabb verziója támogatja.