Compartilhar via


Conectar-se ao Azure Cosmos DB for Apache Cassandra do Spark

APLICA-SE AO: Cassandra

Este artigo está entre uma série de artigos sobre a integração do Azure Cosmos DB for Apache Cassandra do Spark. Os artigos abordam conectividade, operações de DDL (linguagem de definição de dados), operações de DML (linguagem de manipulação de dados) e integração avançada do Azure Cosmos DB for Apache Cassandra do Spark.

Pré-requisitos

Dependências para conectividade

  • Conector do Spark para Cassandra: o conector do Spark é usado para se conectar ao Azure Cosmos DB for Apache Cassandra. Identifique e use a versão do conector localizada em central do Maven compatível com as versões do Spark e do Scala do seu ambiente do Spark. Recomendamos um ambiente que dá suporte ao Spark 3.2.1 ou superior e ao conector do Spark disponível nas coordenadas do Maven com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Se estiver usando o Spark 2 x, recomendamos um ambiente com o Spark versão 2.4.5, usando o conector do Spark em coordenadas do Maven com.datastax.spark:spark-cassandra-connector_2.11:2.4.3.

  • Biblioteca auxiliar do Azure Cosmos DB para a API do Cassandra: se você estiver usando uma versão Spark 2.x, além do conector do Spark, precisará de outra biblioteca chamada azure-cosmos-cassandra-spark-helper com as coordenadas com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 do Maven do Azure Cosmos DB para lidar com a limitação de taxa. Essa biblioteca contém o connection factory customizado e novas classes de política.

    A política de repetição no Azure Cosmos DB está configurada para lidar com exceções com o código de status HTTP 429 ("Taxa grande de solicitação"). O Azure Cosmos DB for Apache Cassandra converte essas exceções em erros de sobrecarga no protocolo nativo do Cassandra, e você pode repetir com retiradas. Uma vez que o Azure Cosmos DB usa o modelo de taxa de transferência provisionada, exceções de limitação de taxa de solicitação ocorrem quando as taxas de entrada/saída aumentam. A política de repetição protege seus trabalhos do Spark em relação a picos de dados que excedem momentaneamente a taxa de transferência alocada para o contêiner. Se estiver usando o conector do Spark 3.x, a implementação dessa biblioteca não será necessária.

    Observação

    A política de repetição pode proteger seus trabalhos do Spark apenas contra picos momentâneo. Se você não tiver configurado RUs suficientes necessárias para executar sua carga de trabalho, a política de repetição não será aplicável e a classe de política de repetição lançará a exceção novamente.

  • Detalhes da conexão da conta do Azure Cosmos DB: nome da conta, ponto de extremidade da conta e chave da API do Azure para Cassandra.

Otimizando as configuração de taxa de transferência do conector do Spark

Listados na próxima seção estão todos os parâmetros relevantes para controlar a taxa de transferência usando o conector do Spark para o Cassandra. Para otimizar os parâmetros a maximizar a taxa de transferência para trabalhos do Spark, as configurações spark.cassandra.output.concurrent.writes, spark.cassandra.concurrent.reads e spark.cassandra.input.reads_per_sec precisam ser feitas corretamente, para evitar limitação e retirada em excesso (o que, por sua vez, pode levar a uma taxa de transferência menor).

O valor ideal dessas configurações depende de quatro fatores:

  • A quantidade de taxa de transferência (Unidades de Solicitação) configurada para a tabela na qual os dados estão sendo ingeridos.
  • O número de trabalhadores em seu cluster Spark.
  • O número de executores configurados para seu trabalho do Spark (que pode ser controlado usando spark.cassandra.connection.connections_per_executor_max ou spark.cassandra.connection.remoteConnectionsPerExecutor, dependendo da versão do Spark)
  • A latência média de cada solicitação para o Azure Cosmos DB, se você estiver posicionado no mesmo Data Center. Suponha que esse valor seja 10 ms para gravações e 3 ms para leituras.

Por exemplo, se tivermos cinco trabalhadores e um valor de spark.cassandra.output.concurrent.writes = 1, e um valor de spark.cassandra.connection.remoteConnectionsPerExecutor = 1, teremos cinco trabalhadores que estão gravando simultaneamente na tabela, cada um com um thread. Se leva 10 ms para executar uma única gravação, podemos enviar 100 solicitações (1000 milissegundos divididos por 10) por segundo, por thread. Com cinco trabalhadores, isso seriam 500 gravações por segundo. Com um custo médio de cinco unidades de solicitação (RUs) por gravação, a tabela de destino precisaria de um mínimo de 2500 unidades de solicitação provisionadas (5 RUs x 500 gravações por segundo).

Aumentar o número de executores pode aumentar o número de threads em um determinado trabalho, o que pode, por sua vez, aumentar a taxa de transferência. No entanto, o impacto exato disso pode ser variável dependendo do trabalho, enquanto o controle da taxa de transferência com o número de trabalhadores é mais determinístico. Você também pode determinar o custo exato de uma determinada solicitação criando um perfil para ela, a fim de obter os custos da RU (unidade de solicitação). Isso o ajudará a ser mais preciso ao provisionar a taxa de transferência para sua tabela ou keyspace. Vejamos nosso artigo aqui para entender como obter os custos de unidade de solicitação em um nível por solicitação.

Colocar em escala a taxa de transferência no banco de dados

O conector Cassandra Spark saturará a taxa de transferência no Azure Cosmos DB de maneira eficiente. Como resultado, mesmo com novas tentativas efetivas, será necessário garantir que há taxa de transferência (RUs) suficiente provisionada no nível da tabela ou do keyspace para evitar erros relacionados à limitação da taxa. A configuração mínima de 400 RUs em uma determinada tabela ou keyspace não será suficiente. Mesmo com definições de configuração de taxa de transferência mínima, o conector Spark poderá gravar a uma taxa correspondente a cerca de 6.000 unidades de solicitação ou mais.

Se a configuração de RU necessária para a movimentação de dados usando o Spark for maior que a necessária para a carga de trabalho em estado estável, você poderá facilmente escalar e reduzir verticalmente a taxa de transferência de forma sistemática no Azure Cosmos DB para atender às necessidades da carga de trabalho por um determinado período de tempo. Leia nosso artigo sobre dimensionamento elástico na API para Cassandra para entender as diferentes opções de colocação em escala programaticamente e dinamicamente.

Observação

As diretrizes acima pressupõem uma distribuição razoavelmente uniforme dos dados. Se você tiver uma distorção significativa nos dados (ou seja, um número muito grande de leituras/gravações no mesmo valor de chave de partição), ainda poderá experimentar gargalos, mesmo que tenha um grande número de unidades de solicitação provisionadas na tabela. As unidades de solicitação são divididas igualmente entre as partições físicas e a distorção de dados pesada pode causar um gargalo de solicitações para uma única partição.

Parâmetros de configuração de taxa de transferência do conector do Spark

A tabela a seguir lista os parâmetros de configuração de taxa de transferência específicos do Azure Cosmos DB for Apache Cassandra fornecidos pelo conector. Para obter uma lista detalhada de todos os parâmetros de configuração, veja a página referência de configuração do repositório GitHub do Conector do Cassandra do Spark.

Nome da Propriedade Valor padrão Descrição
spark.cassandra.output.batch.size.rows 1 Número de linhas por lote único. Defina esse parâmetro como 1. Esse parâmetro é usado para atingir uma taxa de transferência maior para cargas de trabalho pesadas.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Nenhum Número máximo de conexões por nó por executor. 10*n é equivalente a 10 conexões por nó em um cluster do Cassandra de n nós. Assim, se você precisar de cinco conexões por nó por executor para um cluster do Cassandra de cinco nós, deverá definir essa configuração como 25. Modifique esse valor com base no grau de paralelismo ou no número de executores para os quais seus trabalhos do Spark estão configurados.
spark.cassandra.output.concurrent.writes 100 Define o número de gravações paralelas que podem ocorrer por executor. Uma vez que você define "batch.size.rows" como 1, aumente esse valor de acordo. Modifique esse valor com base no grau de paralelismo ou na taxa de transferência que você deseja alcançar para sua carga de trabalho.
spark.cassandra.concurrent.reads 512 Define o número de leituras paralelas que podem ocorrer por executor. Modifique esse valor com base no grau de paralelismo ou na taxa de transferência que você deseja alcançar para sua carga de trabalho
spark.cassandra.output.throughput_mb_per_sec Nenhum Define a taxa de transferência de gravação total por executor. Esse parâmetro pode ser usado como um limite superior limite para a taxa de transferência do trabalho do Spark e a baseará na taxa de transferência provisionada do contêiner do Azure Cosmos DB.
spark.cassandra.input.reads_per_sec Nenhum Define a taxa de transferência de leitura total por executor. Esse parâmetro pode ser usado como um limite superior limite para a taxa de transferência do trabalho do Spark e a baseará na taxa de transferência provisionada do contêiner do Azure Cosmos DB.
spark.cassandra.output.batch.grouping.buffer.size 1000 Define o número de lotes por tarefa única do Spark que podem ser armazenados na memória antes do envio à API para Cassandra
spark.cassandra.connection.keep_alive_ms 60000 Define o período até o qual as conexões não usadas estão disponíveis.

Ajuste a taxa de transferência e o grau de paralelismo desses parâmetros com base na carga de trabalho que você espera para seus trabalhos do Spark e a taxa de transferência que você provisionou para sua conta do Azure Cosmos DB.

Conectando-se ao Azure Cosmos DB for Apache Cassandra do Spark

cqlsh

Os comandos a seguir detalham como se conectar ao Azure Cosmos DB for Apache Cassandra usando cqlsh. Isso é útil para validação conforme você percorre as amostras no Spark.
Do Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

O artigo a seguir aborda o provisionamento de cluster do Azure Databricks, a configuração de cluster para conexão ao Azure Cosmos DB for Apache Cassandra e vários notebooks de exemplo que cobrem operações de DDL, DML e muito mais.
Trabalhar com o Azure Cosmos DB for Apache Cassandra do Azure Databricks

2. Azure HDInsight-Spark

O artigo a seguir aborda o serviço HDInsight-Spark, o provisionamento, a configuração do cluster para conexão ao Azure Cosmos DB for Apache Cassandra e vários notebooks de exemplo que cobrem operações de DDL, operações de DML e muito mais.
Trabalhar com o Azure Cosmos DB for Apache Cassandra do Azure HDInsight-Spark

3. Ambiente do Spark em geral

Embora as seções acima fossem específicas para os serviços de PaaS baseados no Spark para Azure, esta seção aborda qualquer ambiente geral do Spark. Dependências de conector, importações e configuração de sessão do Spark são detalhadas abaixo. A seção "Próximas etapas" aborda os exemplos de código para operações de DDL, operações de DML e muito mais.

Dependências do conector:

  1. Adicione as coordenadas do Maven para obter o conector do Cassandra para o Spark
  2. Adicione as coordenadas do Maven para a biblioteca do auxiliar do Azure Cosmos DB para a API do Cassandra

Importações:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Configuração de sessão do Spark:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Próximas etapas

Os artigos a seguir demonstram a integração do Spark com o Azure Cosmos DB for Apache Cassandra.