Python용 Azure Monitor 쿼리 클라이언트 라이브러리 - 버전 1.2.0

Azure Monitor 쿼리 클라이언트 라이브러리는 Azure Monitor의 두 데이터 플랫폼에 대해 읽기 전용 쿼리를 실행하는 데 사용됩니다.

  • 로그 - 모니터링되는 리소스에서 로그 및 성능 데이터를 수집하고 구성합니다. Azure 서비스의 플랫폼 로그, 가상 머신 에이전트의 로그 및 성능 데이터, 앱의 사용량 및 성능 데이터와 같은 다양한 원본의 데이터를 단일 Azure Log Analytics 작업 영역으로 통합할 수 있습니다. Kusto 쿼리 언어 사용하여 다양한 데이터 형식을 함께 분석할 수 있습니다.
  • 메트릭 - 모니터링되는 리소스에서 시계열 데이터베이스로 숫자 데이터를 수집합니다. 메트릭은 정기적으로 수집되며 특정 시간에 시스템의 일부 측면을 설명하는 수치 값입니다. 메트릭은 가볍고 거의 실시간 시나리오를 지원할 수 있으므로 문제를 경고하고 빠르게 검색하는 데 유용합니다.

리소스:

시작

필수 구성 요소

패키지 설치

pip를 사용하여 Python용 Azure Monitor 쿼리 클라이언트 라이브러리를 설치합니다.

pip install azure-monitor-query

클라이언트 만들기

로그 또는 메트릭을 쿼리하려면 인증된 클라이언트가 필요합니다. 라이브러리에는 클라이언트의 동기 및 비동기 형태가 모두 포함됩니다. 인증하려면 토큰 자격 증명의 instance 만듭니다. 또는 MetricsQueryClient를 만들 LogsQueryClient 때 해당 instance 사용합니다. 다음 예제에서는 azure-identity 패키지에서 를 사용합니다DefaultAzureCredential.

동기 클라이언트

로그 및 메트릭 쿼리 모두에 대한 동기 클라이언트를 만드는 다음 예제를 살펴보겠습니다.

from azure.identity import DefaultAzureCredential
from azure.monitor.query import LogsQueryClient, MetricsQueryClient

credential = DefaultAzureCredential()
logs_client = LogsQueryClient(credential)
metrics_client = MetricsQueryClient(credential)

비동기 클라이언트

쿼리 클라이언트 API의 비동기 형태는 접미사가 있는 네임스페이 .aio스에 있습니다. 예를 들면 다음과 같습니다.

from azure.identity.aio import DefaultAzureCredential
from azure.monitor.query.aio import LogsQueryClient, MetricsQueryClient

credential = DefaultAzureCredential()
async_logs_client = LogsQueryClient(credential)
async_metrics_client = MetricsQueryClient(credential)

공용이 아닌 Azure 클라우드에 대한 클라이언트 구성

기본적으로 LogsQueryClientMetricsQueryClient 는 공용 Azure 클라우드에 연결하도록 구성됩니다. 올바른 endpoint 인수를 전달하여 공용이 아닌 Azure 클라우드에 연결하도록 구성할 수 있습니다. 예를 들면 다음과 같습니다.

logs_client = LogsQueryClient(credential, endpoint="https://api.loganalytics.azure.cn/v1")
metrics_client = MetricsQueryClient(credential, endpoint="https://management.chinacloudapi.cn")

참고: 현재 는 MetricsQueryClient 메트릭을 쿼리하는 데 ARM(Azure Resource Manager) 엔드포인트를 사용하므로 이 클라이언트를 사용할 때 클라우드에 해당하는 관리 엔드포인트가 필요합니다. 이는 향후 변경될 수 있습니다.

쿼리 실행

로그 및 메트릭 쿼리의 예는 예제 섹션을 참조하세요 .

주요 개념

쿼리 속도 제한 및 제한을 기록합니다.

Log Analytics 서비스는 요청 속도가 너무 높을 때 제한을 적용합니다. 반환되는 최대 행 수와 같은 제한은 Kusto 쿼리에도 적용됩니다. 자세한 내용은 쿼리 API를 참조하세요.

일괄 처리 로그 쿼리를 실행하는 경우 제한된 요청이 개체를 LogsQueryError 반환합니다. 해당 개체의 값은 code 입니다 ThrottledError.

메트릭 데이터 구조

각 메트릭 값 집합은 다음과 같은 특성을 가진 시계열입니다.

  • 값이 수집된 시간
  • 값과 연결된 리소스
  • 메트릭의 범주처럼 작동하는 네임스페이스
  • 메트릭 이름
  • 값 자체
  • 일부 메트릭에는 다차원 메트릭에 설명된 대로 여러 차원이 있을 수 있습니다. 사용자 지정 메트릭에는 최대 10개의 차원이 있을 수 있습니다.

예제

로그 쿼리

이 예제에서는 Log Analytics 작업 영역을 쿼리하는 방법을 보여 줍니다. 응답을 처리하고 테이블 형식으로 보려면 pandas 라이브러리가 사용됩니다. pandas를 사용하지 않도록 선택한 경우 샘플을 참조하세요.

시간 범위 지정

매개 변수는 timespan 데이터를 쿼리할 기간을 지정합니다. 이 값은 다음 중 하나일 수 있습니다.

  • a timedelta
  • timedelta 시작 날짜/시간
  • start datetime/end datetime

예를 들면 다음과 같습니다.

import os
import pandas as pd
from datetime import datetime, timezone
from azure.monitor.query import LogsQueryClient, LogsQueryStatus
from azure.identity import DefaultAzureCredential
from azure.core.exceptions import HttpResponseError

credential = DefaultAzureCredential()
client = LogsQueryClient(credential)

query = """AppRequests | take 5"""

start_time=datetime(2021, 7, 2, tzinfo=timezone.utc)
end_time=datetime(2021, 7, 4, tzinfo=timezone.utc)

try:
    response = client.query_workspace(
        workspace_id=os.environ['LOG_WORKSPACE_ID'],
        query=query,
        timespan=(start_time, end_time)
        )
    if response.status == LogsQueryStatus.PARTIAL:
        error = response.partial_error
        data = response.partial_data
        print(error)
    elif response.status == LogsQueryStatus.SUCCESS:
        data = response.tables
    for table in data:
        df = pd.DataFrame(data=table.rows, columns=table.columns)
        print(df)
except HttpResponseError as err:
    print("something fatal happened")
    print(err)

로그 쿼리 응답 처리

API는 query_workspace 또는 개체를 LogsQueryResultLogsQueryPartialResult 반환합니다. API는 batch_query , LogsQueryPartialResultLogsQueryError 개체를 포함LogsQueryResult할 수 있는 목록을 반환합니다. 응답의 계층 구조는 다음과 같습니다.

LogsQueryResult
|---statistics
|---visualization
|---tables (list of `LogsTable` objects)
    |---name
    |---rows
    |---columns
    |---columns_types

LogsQueryPartialResult
|---statistics
|---visualization
|---partial_error (a `LogsQueryError` object)
    |---code
    |---message
    |---details
    |---status
|---partial_data (list of `LogsTable` objects)
    |---name
    |---rows
    |---columns
    |---columns_types

LogsQueryResult 편의상 테이블을 직접 반복합니다. 예를 들어 테이블로 로그 쿼리 응답을 처리하고 pandas를 사용하여 표시하려면 다음을 수행합니다.

response = client.query(...)
for table in response:
    df = pd.DataFrame(table.rows, columns=[col.name for col in table.columns])

전체 샘플은 여기에서 찾을 수 있습니다.

비슷한 방식으로 일괄 처리 로그 쿼리 응답을 처리합니다.

for result in response:
    if result.status == LogsQueryStatus.SUCCESS:
        for table in result:
            df = pd.DataFrame(table.rows, columns=table.columns)
            print(df)

전체 샘플은 여기에서 찾을 수 있습니다.

일괄 처리 로그 쿼리

다음 예제에서는 일괄 처리 쿼리 API를 사용하여 동시에 여러 쿼리를 보내는 방법을 보여 줍니다. 쿼리는 개체 목록 LogsBatchQuery 또는 사전으로 나타낼 수 있습니다. 이 예제에서는 이전 접근 방식을 사용합니다.

import os
from datetime import timedelta, datetime, timezone
import pandas as pd
from azure.monitor.query import LogsQueryClient, LogsBatchQuery, LogsQueryStatus
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
client = LogsQueryClient(credential)
requests = [
    LogsBatchQuery(
        query="AzureActivity | summarize count()",
        timespan=timedelta(hours=1),
        workspace_id=os.environ['LOG_WORKSPACE_ID']
    ),
    LogsBatchQuery(
        query= """bad query""",
        timespan=timedelta(days=1),
        workspace_id=os.environ['LOG_WORKSPACE_ID']
    ),
    LogsBatchQuery(
        query= """let Weight = 92233720368547758;
        range x from 1 to 3 step 1
        | summarize percentilesw(x, Weight * 100, 50)""",
        workspace_id=os.environ['LOG_WORKSPACE_ID'],
        timespan=(datetime(2021, 6, 2, tzinfo=timezone.utc), datetime(2021, 6, 5, tzinfo=timezone.utc)), # (start, end)
        include_statistics=True
    ),
]
results = client.query_batch(requests)

for res in results:
    if res.status == LogsQueryStatus.FAILURE:
        # this will be a LogsQueryError
        print(res.message)
    elif res.status == LogsQueryStatus.PARTIAL:
        ## this will be a LogsQueryPartialResult
        print(res.partial_error)
        for table in res.partial_data:
            df = pd.DataFrame(table.rows, columns=table.columns)
            print(df)
    elif res.status == LogsQueryStatus.SUCCESS:
        ## this will be a LogsQueryResult
        table = res.tables[0]
        df = pd.DataFrame(table.rows, columns=table.columns)
        print(df)

리소스 로그 쿼리

다음 예제에서는 Log Analytics 작업 영역을 사용하지 않고 Azure 리소스에서 직접 로그를 쿼리하는 방법을 보여 줍니다. query_resource 여기서 메서드는 대신 query_workspace사용되며 작업 영역 ID 대신 Azure 리소스 식별자가 전달됩니다(예: /subscriptions/{subscription-id}/resourceGroups/{resource-group-name}/providers/{resource-provider}/{resource-type}/{resource-name}).

import os
import pandas as pd
from datetime import timedelta
from azure.monitor.query import LogsQueryClient, LogsQueryStatus
from azure.core.exceptions import HttpResponseError
from azure.identity import DefaultAzureCredential

credential  = DefaultAzureCredential()
client = LogsQueryClient(credential)

query = """AzureActivity | take 5"""

try:
    response = client.query_resource(os.environ['LOGS_RESOURCE_ID'], query, timespan=timedelta(days=1))
    if response.status == LogsQueryStatus.PARTIAL:
        error = response.partial_error
        data = response.partial_data
        print(error)
    elif response.status == LogsQueryStatus.SUCCESS:
        data = response.tables
    for table in data:
        df = pd.DataFrame(data=table.rows, columns=table.columns)
        print(df)
except HttpResponseError as err:
    print("something fatal happened")
    print(err)

고급 로그 쿼리 시나리오

로그 쿼리 시간 제한 설정

다음 예제에서는 서버 시간 제한(초)을 설정하는 방법을 보여줍니다. 쿼리가 언급된 시간 제한보다 더 많은 시간이 걸리는 경우 게이트웨이 시간 제한이 발생합니다. 기본값은 180초이며 최대 10분(600초)까지 설정할 수 있습니다.

import os
from azure.monitor.query import LogsQueryClient
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
client = LogsQueryClient(credential)

response = client.query_workspace(
    os.environ['LOG_WORKSPACE_ID'],
    "range x from 1 to 10000000000 step 1 | count",
    timespan=timedelta(days=1),
    server_timeout=600 # sets the timeout to 10 minutes
    )

여러 작업 영역 쿼리

여러 Log Analytics 작업 영역에서 동일한 로그 쿼리를 실행할 수 있습니다. Kusto 쿼리 외에도 다음 매개 변수가 필요합니다.

  • workspace_id - 첫 번째(기본) 작업 영역 ID입니다.
  • additional_workspaces - 매개 변수에 제공된 작업 영역을 제외한 작업 영역 목록 workspace_id 입니다. 매개 변수의 목록 항목은 다음 식별자 형식으로 구성됩니다.
    • 정규화된 작업 영역 이름
    • 작업 영역 ID
    • Azure 리소스 ID

예를 들어 다음 쿼리는 세 개의 작업 영역에서 실행됩니다.

client.query_workspace(
    <workspace_id>,
    query,
    timespan=timedelta(days=1),
    additional_workspaces=['<workspace 2>', '<workspace 3>']
    )

전체 샘플은 여기에서 찾을 수 있습니다.

통계 포함

CPU 및 메모리 사용량과 같은 로그 쿼리 실행 통계를 얻으려면 다음을 수행합니다.

  1. include_statistics 매개 변수를 True로 설정합니다.
  2. 개체 내의 statistics 필드에 액세스합니다 LogsQueryResult .

다음 예제에서는 쿼리 실행 시간을 출력합니다.

query = "AzureActivity | top 10 by TimeGenerated"
result = client.query_workspace(
    <workspace_id>,
    query,
    timespan=timedelta(days=1),
    include_statistics=True
    )

execution_time = result.statistics.get("query", {}).get("executionTime")
print(f"Query execution time: {execution_time}")

statistics 필드는 dict 원시 JSON 응답에 해당하는 이며 해당 구조는 쿼리에 따라 달라질 수 있습니다. 통계는 속성 내에서 찾을 수 query 있습니다. 예를 들면 다음과 같습니다.

{
  "query": {
    "executionTime": 0.0156478,
    "resourceUsage": {...},
    "inputDatasetStatistics": {...},
    "datasetStatistics": [{...}]
  }
}

시각화 포함

render 연산자를 사용하여 로그 쿼리에 대한 시각화 데이터를 얻으려면 다음을 수행합니다.

  1. include_visualization 속성을 True로 설정합니다.
  2. 개체 내의 visualization 필드에 액세스합니다 LogsQueryResult .

예를 들면 다음과 같습니다.

query = (
    "StormEvents"
    "| summarize event_count = count() by State"
    "| where event_count > 10"
    "| project State, event_count"
    "| render columnchart"
)
result = client.query_workspace(
    <workspace_id>,
    query,
    timespan=timedelta(days=1),
    include_visualization=True
    )

print(f"Visualization result: {result.visualization}")

visualization 필드는 dict 원시 JSON 응답에 해당하는 이며 해당 구조는 쿼리에 따라 달라질 수 있습니다. 예를 들면 다음과 같습니다.

{
  "visualization": "columnchart",
  "title": "the chart title",
  "accumulate": False,
  "isQuerySorted": False,
  "kind": None,
  "legend": None,
  "series": None,
  "yMin": "NaN",
  "yMax": "NaN",
  "xAxis": None,
  "xColumn": None,
  "xTitle": "x axis title",
  "yAxis": None,
  "yColumns": None,
  "ySplit": None,
  "yTitle": None,
  "anomalyColumns": None
}

메트릭 쿼리

다음 예제에서는 Event Grid 구독에 대한 메트릭을 가져옵니다. 리소스 URI는 Event Grid 토픽의 URI입니다.

리소스 URI는 메트릭이 쿼리되는 리소스의 URI여야 합니다. 일반적으로 형식 /subscriptions/<id>/resourceGroups/<rg-name>/providers/<source>/topics/<resource-name>입니다.

리소스 URI를 찾으려면 다음을 수행합니다.

  1. Azure Portal 리소스 페이지로 이동합니다.
  2. 개요 블레이드에서 JSON 보기 링크를 선택합니다.
  3. 결과 JSON에서 속성 값을 복사합니다 id .

참고: 메트릭은 전송된 metric_names 순서대로 반환됩니다.

import os
from datetime import timedelta, datetime
from azure.monitor.query import MetricsQueryClient
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
client = MetricsQueryClient(credential)
start_time = datetime(2021, 5, 25)
duration = timedelta(days=1)
metrics_uri = os.environ['METRICS_RESOURCE_URI']
response = client.query_resource(
    metrics_uri,
    metric_names=["PublishSuccessCount"],
    timespan=(start_time, duration)
    )

for metric in response.metrics:
    print(metric.name)
    for time_series_element in metric.timeseries:
        for metric_value in time_series_element.data:
            print(metric_value.time_stamp)

메트릭 쿼리 응답 처리

메트릭 쿼리 API는 개체를 MetricsQueryResult 반환합니다. 개체에는 MetricsQueryResult 형식화된 개체granularity, , namespace및 의 Metric목록과 timespan같은 속성이 포함됩니다. Metric 매개 변수를 사용하여 metrics 개체 목록에 액세스할 수 있습니다. 이 목록의 TimeSeriesElementMetric 개체에는 개체 목록이 포함됩니다. 각 TimeSeriesElement 개체에는 및 metadata_values 속성이 포함됩니다data. 시각적 형식에서 응답의 개체 계층 구조는 다음 구조와 유사합니다.

MetricsQueryResult
|---granularity
|---timespan
|---cost
|---namespace
|---resource_region
|---metrics (list of `Metric` objects)
    |---id
    |---type
    |---name
    |---unit
    |---timeseries (list of `TimeSeriesElement` objects)
        |---metadata_values
        |---data (list of data points represented by `MetricValue` objects)

응답 처리 예제

import os
from azure.monitor.query import MetricsQueryClient, MetricAggregationType
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
client = MetricsQueryClient(credential)

metrics_uri = os.environ['METRICS_RESOURCE_URI']
response = client.query_resource(
    metrics_uri,
    metric_names=["MatchedEventCount"],
    aggregations=[MetricAggregationType.COUNT]
    )

for metric in response.metrics:
    print(metric.name)
    for time_series_element in metric.timeseries:
        for metric_value in time_series_element.data:
            if metric_value.count != 0:
                print(
                    "There are {} matched events at {}".format(
                        metric_value.count,
                        metric_value.time_stamp
                    )
                )

문제 해결

다양한 오류 시나리오를 진단하는 방법에 대한 자세한 내용은 문제 해결 가이드 를 참조하세요.

다음 단계

Azure Monitor에 대한 자세한 내용은 Azure Monitor 서비스 설명서를 참조하세요.

샘플

다음 코드 샘플은 Azure Monitor 쿼리 클라이언트 라이브러리의 일반적인 시나리오를 보여 줍니다.

로그 쿼리 샘플

메트릭 쿼리 샘플

참여

이 프로젝트에 대한 기여와 제안을 환영합니다. 대부분의 경우 기여하려면 권한을 부여하며 실제로 기여를 사용할 권한을 당사에 부여한다고 선언하는 CLA(기여자 라이선스 계약)에 동의해야 합니다. 자세한 내용은 cla.microsoft.com.

끌어오기 요청을 제출하면 CLA-bot은 CLA를 제공하고 PR을 적절하게 데코레이팅해야 하는지 여부를 자동으로 결정합니다(예: 레이블, 설명). 봇에서 제공하는 지침을 따르기만 하면 됩니다. CLA를 사용하여 모든 리포지토리에서 한 번만 수행하면 됩니다.

이 프로젝트에는 Microsoft Open Source Code of Conduct(Microsoft 오픈 소스 준수 사항)가 적용됩니다. 자세한 내용은 Code of Conduct FAQ(규정 FAQ)를 참조하세요. 또는 추가 질문이나 의견은 opencode@microsoft.com으로 문의하세요.