Use o Apache Spark MLlib para criar um aplicativo de aprendizado de máquina e analisar um conjunto de dados

Saiba como usar o Apache Spark MLlib para criar um aplicativo de aprendizado de máquina. O aplicativo faz análise preditiva em um conjunto de dados aberto. A partir das bibliotecas de aprendizado de máquina integradas do Spark, este exemplo usa a classificação por meio de regressão logística.

MLlib é uma biblioteca Spark principal que fornece muitos utilitários úteis para tarefas de aprendizado de máquina, como:

  • Classificação
  • Regressão
  • Clustering
  • Modelação
  • Decomposição de valor singular (SVD) e análise de componentes principais (ACP)
  • Teste de hipóteses e cálculo de estatísticas amostrais

Compreender a classificação e a regressão logística

A classificação, uma tarefa popular de aprendizado de máquina, é o processo de classificar dados de entrada em categorias. É o trabalho de um algoritmo de classificação descobrir como atribuir "rótulos" aos dados de entrada que você fornece. Por exemplo, você pode pensar em um algoritmo de aprendizado de máquina que aceita informações de estoque como entrada. Em seguida, divide o estoque em duas categorias: ações que você deve vender e ações que você deve manter.

Regressão logística é o algoritmo que você usa para classificação. A API de regressão logística do Spark é útil para classificação binária ou classificação de dados de entrada em um de dois grupos. Para obter mais informações sobre regressões logísticas, consulte a Wikipédia.

Em resumo, o processo de regressão logística produz uma função logística. Use a função para prever a probabilidade de que um vetor de entrada pertença a um grupo ou outro.

Exemplo de análise preditiva de dados de inspeção de alimentos

Neste exemplo, você usa o Spark para fazer algumas análises preditivas em dados de inspeção de alimentos (Food_Inspections1.csv). Dados adquiridos através do portal de dados da cidade de Chicago. Este conjunto de dados contém informações sobre inspeções de estabelecimentos de alimentos que foram realizadas em Chicago. Incluindo informações sobre cada estabelecimento, as infrações encontradas (se houver) e os resultados da inspeção. O arquivo de dados CSV já está disponível na conta de armazenamento associada ao cluster em /HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections1.csv.

Nas etapas a seguir, você desenvolve um modelo para ver o que é necessário para passar ou falhar em uma inspeção de alimentos.

Criar um aplicativo de aprendizado de máquina Apache Spark MLlib

  1. Crie um Jupyter Notebook usando o kernel PySpark. Para obter instruções, consulte Criar um arquivo do Jupyter Notebook.

  2. Importe os tipos necessários para este aplicativo. Copie e cole o código a seguir em uma célula vazia e pressione SHIFT + ENTER.

    from pyspark.ml import Pipeline
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.feature import HashingTF, Tokenizer
    from pyspark.sql import Row
    from pyspark.sql.functions import UserDefinedFunction
    from pyspark.sql.types import *
    

    Devido ao kernel do PySpark, você não precisa criar nenhum contexto explicitamente. Os contextos Spark e Hive são criados automaticamente quando você executa a primeira célula de código.

Construir o dataframe de entrada

Use o contexto do Spark para extrair os dados CSV brutos para a memória como texto não estruturado. Em seguida, use a biblioteca CSV do Python para analisar cada linha dos dados.

  1. Execute as seguintes linhas para criar um RDD (Resilient Distributed Dataset) importando e analisando os dados de entrada.

    def csvParse(s):
        import csv
        from io import StringIO
        sio = StringIO(s)
        value = next(csv.reader(sio))
        sio.close()
        return value
    
    inspections = sc.textFile('/HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections1.csv')\
                    .map(csvParse)
    
  2. Execute o código a seguir para recuperar uma linha do RDD, para que você possa dar uma olhada no esquema de dados:

    inspections.take(1)
    

    A saída é:

    [['413707',
        'LUNA PARK INC',
        'LUNA PARK  DAY CARE',
        '2049789',
        "Children's Services Facility",
        'Risk 1 (High)',
        '3250 W FOSTER AVE ',
        'CHICAGO',
        'IL',
        '60625',
        '09/21/2010',
        'License-Task Force',
        'Fail',
        '24. DISH WASHING FACILITIES: PROPERLY DESIGNED, CONSTRUCTED, MAINTAINED, INSTALLED, LOCATED AND OPERATED - Comments: All dishwashing machines must be of a type that complies with all requirements of the plumbing section of the Municipal Code of Chicago and Rules and Regulation of the Board of Health. OBSEVERD THE 3 COMPARTMENT SINK BACKING UP INTO THE 1ST AND 2ND COMPARTMENT WITH CLEAR WATER AND SLOWLY DRAINING OUT. INST NEED HAVE IT REPAIR. CITATION ISSUED, SERIOUS VIOLATION 7-38-030 H000062369-10 COURT DATE 10-28-10 TIME 1 P.M. ROOM 107 400 W. SURPERIOR. | 36. LIGHTING: REQUIRED MINIMUM FOOT-CANDLES OF LIGHT PROVIDED, FIXTURES SHIELDED - Comments: Shielding to protect against broken glass falling into food shall be provided for all artificial lighting sources in preparation, service, and display facilities. LIGHT SHIELD ARE MISSING UNDER HOOD OF  COOKING EQUIPMENT AND NEED TO REPLACE LIGHT UNDER UNIT. 4 LIGHTS ARE OUT IN THE REAR CHILDREN AREA,IN THE KINDERGARDEN CLASS ROOM. 2 LIGHT ARE OUT EAST REAR, LIGHT FRONT WEST ROOM. NEED TO REPLACE ALL LIGHT THAT ARE NOT WORKING. | 35. WALLS, CEILINGS, ATTACHED EQUIPMENT CONSTRUCTED PER CODE: GOOD REPAIR, SURFACES CLEAN AND DUST-LESS CLEANING METHODS - Comments: The walls and ceilings shall be in good repair and easily cleaned. MISSING CEILING TILES WITH STAINS IN WEST,EAST, IN FRONT AREA WEST, AND BY THE 15MOS AREA. NEED TO BE REPLACED. | 32. FOOD AND NON-FOOD CONTACT SURFACES PROPERLY DESIGNED, CONSTRUCTED AND MAINTAINED - Comments: All food and non-food contact equipment and utensils shall be smooth, easily cleanable, and durable, and shall be in good repair. SPLASH GUARDED ARE NEEDED BY THE EXPOSED HAND SINK IN THE KITCHEN AREA | 34. FLOORS: CONSTRUCTED PER CODE, CLEANED, GOOD REPAIR, COVING INSTALLED, DUST-LESS CLEANING METHODS USED - Comments: The floors shall be constructed per code, be smooth and easily cleaned, and be kept clean and in good repair. INST NEED TO ELEVATE ALL FOOD ITEMS 6INCH OFF THE FLOOR 6 INCH AWAY FORM WALL.  ',
        '41.97583445690982',
        '-87.7107455232781',
        '(41.97583445690982, -87.7107455232781)']]
    

    A saída dá uma ideia do esquema do arquivo de entrada. Inclui o nome de cada estabelecimento e o tipo de estabelecimento. Além disso, o endereço, os dados das inspeções e a localização, entre outras coisas.

  3. Execute o código a seguir para criar um dataframe (df) e uma tabela temporária (CountResults) com algumas colunas que são úteis para a análise preditiva. sqlContext é usado para fazer transformações em dados estruturados.

    schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("results", StringType(), False),
    StructField("violations", StringType(), True)])
    
    df = spark.createDataFrame(inspections.map(lambda l: (int(l[0]), l[1], l[12], l[13])) , schema)
    df.registerTempTable('CountResults')
    

    As quatro colunas de interesse no dataframe são ID, nome, resultados e violações.

  4. Execute o seguinte código para obter uma pequena amostra dos dados:

    df.show(5)
    

    A saída é:

    +------+--------------------+-------+--------------------+
    |    id|                name|results|          violations|
    +------+--------------------+-------+--------------------+
    |413707|       LUNA PARK INC|   Fail|24. DISH WASHING ...|
    |391234|       CAFE SELMARIE|   Fail|2. FACILITIES TO ...|
    |413751|          MANCHU WOK|   Pass|33. FOOD AND NON-...|
    |413708|BENCHMARK HOSPITA...|   Pass|                    |
    |413722|           JJ BURGER|   Pass|                    |
    +------+--------------------+-------+--------------------+
    

Entenda os dados

Vamos começar a ter uma noção do que o conjunto de dados contém.

  1. Execute o seguinte código para mostrar os valores distintos na coluna de resultados :

    df.select('results').distinct().show()
    

    A saída é:

    +--------------------+
    |             results|
    +--------------------+
    |                Fail|
    |Business Not Located|
    |                Pass|
    |  Pass w/ Conditions|
    |     Out of Business|
    +--------------------+
    
  2. Execute o seguinte código para visualizar a distribuição desses resultados:

    %%sql -o countResultsdf
    SELECT COUNT(results) AS cnt, results FROM CountResults GROUP BY results
    

    A %%sql magia seguida garante -o countResultsdf que a saída da consulta seja mantida localmente no servidor Jupyter (normalmente o nó principal do cluster). A saída é mantida como um dataframe Pandas com o nome especificado countResultsdf. Para obter mais informações sobre a %%sql magia e outras magias disponíveis com o kernel PySpark, consulte Kernels disponíveis em Notebooks Jupyter com clusters Apache Spark HDInsight.

    A saída é:

    Saída de consulta SQL.

  3. Você também pode usar Matplotlib, uma biblioteca usada para construir visualização de dados, para criar um gráfico. Como o gráfico deve ser criado a partir do dataframe countResultsdf persistido localmente, o trecho de código deve começar com a %%local magia. Essa ação garante que o código seja executado localmente no servidor Jupyter.

    %%local
    %matplotlib inline
    import matplotlib.pyplot as plt
    
    labels = countResultsdf['results']
    sizes = countResultsdf['cnt']
    colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']
    plt.pie(sizes, labels=labels, autopct='%1.1f%%', colors=colors)
    plt.axis('equal')
    

    Para prever um resultado de inspeção de alimentos, você precisa desenvolver um modelo com base nas violações. Como a regressão logística é um método de classificação binária, faz sentido agrupar os dados de resultado em duas categorias: Reprovado e Aprovado:

    • Aprovação

      • Aprovação
      • Passe c/ condições
    • Reprovado

      • Reprovado
    • Eliminar

      • Empresa não localizada
      • Fora do negócio

      Os dados com os outros resultados ("Business Not Located" ou "out of Business") não são úteis e representam uma pequena percentagem dos resultados de qualquer forma.

  4. Execute o código a seguir para converter o dataframe(df) existente em um novo dataframe onde cada inspeção é representada como um par de violações de rótulo. Neste caso, um rótulo de 0.0 representa um fracasso, um rótulo de 1.0 representa um sucesso e um rótulo de -1.0 representa alguns resultados além desses dois resultados.

    def labelForResults(s):
        if s == 'Fail':
            return 0.0
        elif s == 'Pass w/ Conditions' or s == 'Pass':
            return 1.0
        else:
            return -1.0
    label = UserDefinedFunction(labelForResults, DoubleType())
    labeledData = df.select(label(df.results).alias('label'), df.violations).where('label >= 0')
    
  5. Execute o seguinte código para mostrar uma linha dos dados rotulados:

    labeledData.take(1)
    

    A saída é:

    [Row(label=0.0, violations=u"41. PREMISES MAINTAINED FREE OF LITTER, UNNECESSARY ARTICLES, CLEANING  EQUIPMENT PROPERLY STORED - Comments: All parts of the food establishment and all parts of the property used in connection with the operation of the establishment shall be kept neat and clean and should not produce any offensive odors.  REMOVE MATTRESS FROM SMALL DUMPSTER. | 35. WALLS, CEILINGS, ATTACHED EQUIPMENT CONSTRUCTED PER CODE: GOOD REPAIR, SURFACES CLEAN AND DUST-LESS CLEANING METHODS - Comments: The walls and ceilings shall be in good repair and easily cleaned.  REPAIR MISALIGNED DOORS AND DOOR NEAR ELEVATOR.  DETAIL CLEAN BLACK MOLD LIKE SUBSTANCE FROM WALLS BY BOTH DISH MACHINES.  REPAIR OR REMOVE BASEBOARD UNDER DISH MACHINE (LEFT REAR KITCHEN). SEAL ALL GAPS.  REPLACE MILK CRATES USED IN WALK IN COOLERS AND STORAGE AREAS WITH PROPER SHELVING AT LEAST 6' OFF THE FLOOR.  | 38. VENTILATION: ROOMS AND EQUIPMENT VENTED AS REQUIRED: PLUMBING: INSTALLED AND MAINTAINED - Comments: The flow of air discharged from kitchen fans shall always be through a duct to a point above the roofline.  REPAIR BROKEN VENTILATION IN MEN'S AND WOMEN'S WASHROOMS NEXT TO DINING AREA. | 32. FOOD AND NON-FOOD CONTACT SURFACES PROPERLY DESIGNED, CONSTRUCTED AND MAINTAINED - Comments: All food and non-food contact equipment and utensils shall be smooth, easily cleanable, and durable, and shall be in good repair.  REPAIR DAMAGED PLUG ON LEFT SIDE OF 2 COMPARTMENT SINK.  REPAIR SELF CLOSER ON BOTTOM LEFT DOOR OF 4 DOOR PREP UNIT NEXT TO OFFICE.")]
    

Criar um modelo de regressão logística a partir do dataframe de entrada

A tarefa final é converter os dados rotulados. Converta os dados em um formato analisado por regressão logística. A entrada para um algoritmo de regressão logística precisa de um conjunto de pares vetoriais label-feature. Onde o "vetor de recurso" é um vetor de números que representam o ponto de entrada. Então, você precisa converter a coluna "violações", que é semi-estruturada e contém muitos comentários em texto livre. Converta a coluna em uma matriz de números reais que uma máquina poderia entender facilmente.

Uma abordagem padrão de aprendizado de máquina para processar linguagem natural é atribuir a cada palavra distinta um índice. Em seguida, passe um vetor para o algoritmo de aprendizado de máquina. De tal forma que o valor de cada índice contém a frequência relativa dessa palavra na cadeia de texto.

MLlib fornece uma maneira fácil de fazer essa operação. Primeiro, "tokenize" cada string de violações para obter as palavras individuais em cada string. Em seguida, use a HashingTF para converter cada conjunto de tokens em um vetor de recurso que pode ser passado para o algoritmo de regressão logística para construir um modelo. Você conduz todas essas etapas em sequência usando um pipeline.

tokenizer = Tokenizer(inputCol="violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(labeledData)

Avaliar o modelo usando outro conjunto de dados

Você pode usar o modelo criado anteriormente para prever quais são os resultados de novas inspeções. As previsões baseiam-se nas violações observadas. Você treinou esse modelo no conjunto de dados Food_Inspections1.csv. Você pode usar um segundo conjunto de dados, Food_Inspections2.csv, para avaliar a força desse modelo nos novos dados. Esse segundo conjunto de dados (Food_Inspections2.csv) está no contêiner de armazenamento padrão associado ao cluster.

  1. Execute o código a seguir para criar um novo dataframe, predictionsDf que contém a previsão gerada pelo modelo. O snippet também cria uma tabela temporária chamada Previsões com base no dataframe.

    testData = sc.textFile('wasbs:///HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections2.csv')\
                .map(csvParse) \
                .map(lambda l: (int(l[0]), l[1], l[12], l[13]))
    testDf = spark.createDataFrame(testData, schema).where("results = 'Fail' OR results = 'Pass' OR results = 'Pass w/ Conditions'")
    predictionsDf = model.transform(testDf)
    predictionsDf.registerTempTable('Predictions')
    predictionsDf.columns
    

    Você deve ver uma saída como o seguinte texto:

    ['id',
        'name',
        'results',
        'violations',
        'words',
        'features',
        'rawPrediction',
        'probability',
        'prediction']
    
  2. Veja uma das previsões. Execute este trecho:

    predictionsDf.take(1)
    

    Há uma previsão para a primeira entrada no conjunto de dados de teste.

  3. O model.transform() método aplica a mesma transformação a quaisquer novos dados com o mesmo esquema e chega a uma previsão de como classificar os dados. Você pode fazer algumas estatísticas para ter uma noção de como as previsões eram:

    numSuccesses = predictionsDf.where("""(prediction = 0 AND results = 'Fail') OR
                                            (prediction = 1 AND (results = 'Pass' OR
                                                                results = 'Pass w/ Conditions'))""").count()
    numInspections = predictionsDf.count()
    
    print ("There were", numInspections, "inspections and there were", numSuccesses, "successful predictions")
    print ("This is a", str((float(numSuccesses) / float(numInspections)) * 100) + "%", "success rate")
    

    A saída se parece com o seguinte texto:

    There were 9315 inspections and there were 8087 successful predictions
    This is a 86.8169618894% success rate
    

    O uso da regressão logística com o Spark fornece um modelo da relação entre as descrições de violações em inglês. E se uma determinada empresa passaria ou reprovaria uma inspeção de alimentos.

Criar uma representação visual da previsão

Agora você pode construir uma visualização final para ajudá-lo a raciocinar sobre os resultados deste teste.

  1. Você começa extraindo as diferentes previsões e resultados da tabela temporária Previsões criada anteriormente. As consultas a seguir separam a saída como true_positive, false_positive, true_negative e false_negative. Nas consultas abaixo, você desativa a visualização usando -q e também salva a saída (usando -o) como dataframes que podem ser usados com a %%local mágica.

    %%sql -q -o true_positive
    SELECT count(*) AS cnt FROM Predictions WHERE prediction = 0 AND results = 'Fail'
    
    %%sql -q -o false_positive
    SELECT count(*) AS cnt FROM Predictions WHERE prediction = 0 AND (results = 'Pass' OR results = 'Pass w/ Conditions')
    
    %%sql -q -o true_negative
    SELECT count(*) AS cnt FROM Predictions WHERE prediction = 1 AND results = 'Fail'
    
    %%sql -q -o false_negative
    SELECT count(*) AS cnt FROM Predictions WHERE prediction = 1 AND (results = 'Pass' OR results = 'Pass w/ Conditions')
    
  2. Finalmente, use o seguinte trecho para gerar o gráfico usando Matplotlib.

    %%local
    %matplotlib inline
    import matplotlib.pyplot as plt
    
    labels = ['True positive', 'False positive', 'True negative', 'False negative']
    sizes = [true_positive['cnt'], false_positive['cnt'], false_negative['cnt'], true_negative['cnt']]
    colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']
    plt.pie(sizes, labels=labels, autopct='%1.1f%%', colors=colors)
    plt.axis('equal')
    

    Deverá ver o seguinte resultado:

    Saída do aplicativo de aprendizado de máquina Spark - porcentagens de gráficos de pizza de inspeções de alimentos com falha.

    Neste gráfico, um resultado "positivo" refere-se à inspeção de alimentos falhada, enquanto um resultado negativo refere-se a uma inspeção aprovada.

Desligue o bloco de notas

Depois de executar o aplicativo, você deve desligar o bloco de anotações para liberar os recursos. Para tal, no menu File (Ficheiro) do bloco de notas, selecione Close and Halt (Fechar e Parar). Esta ação encerra e fecha o bloco de notas.

Próximos passos