Apache Airflow 사용 REST API - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Airflow 사용 REST API

Amazon Managed Workflows for Apache Airflow(Amazon MWAA)는 Apache Airflow v2.4.3 이상을 실행하는 환경에서 Apache Airflow를 사용하여 Apache Airflow REST API 환경과 직접 상호 작용할 수 있도록 지원합니다. 이를 통해 프로그래밍 방식으로 Amazon MWAA 환경에 액세스하고 관리할 수 있으며, 데이터 오케스트레이션 워크플로를 호출하고,를 관리하고DAGs, 메타데이터 데이터베이스, 트리거 및 스케줄러와 같은 다양한 Apache Airflow 구성 요소의 상태를 모니터링하는 표준화된 방법을 제공합니다.

APIAmazonMWAA은 Apache Airflow REST를 사용하는 동안 확장성을 지원하기 위해 REST API 요청, 명령줄 인터페이스(CLI) 사용 또는 더 많은 동시 Apache Airflow 사용자 인터페이스(UI) 사용자 등 수요 증가를 처리하기 위해 웹 서버 용량을 수평적으로 확장할 수 있는 옵션을 제공합니다. Amazon이 웹 서버를 MWAA 확장하는 방법에 대한 자세한 내용은 섹션을 참조하세요Amazon MWAA 웹 서버 자동 조정 구성.

Apache Airflow를 사용하여 환경에 대해 다음 사용 사례를 구현RESTAPI할 수 있습니다.

  • 프로그래밍 방식 액세스 - 이제 Apache Airflow UI 또는에 의존하지 않고 Apache Airflow DAG 실행을 시작하고, 데이터 세트를 관리하고, 메타데이터 데이터베이스, 트리거 및 스케줄러와 같은 다양한 구성 요소의 상태를 검색할 수 있습니다CLI.

  • 외부 애플리케이션 및 마이크로서비스와 통합 - REST API 지원을 통해 Amazon MWAA 환경을 다른 시스템과 통합하는 사용자 지정 솔루션을 구축할 수 있습니다. 예를 들어 완료된 데이터베이스 작업 또는 새 사용자 등록과 같은 외부 시스템의 이벤트에 대한 응답으로 워크플로를 시작할 수 있습니다.

  • 중앙 집중식 모니터링 - DAGs 여러 Amazon MWAA 환경에서의 상태를 집계하는 모니터링 대시보드를 구축하여 중앙 집중식 모니터링 및 관리를 활성화할 수 있습니다.

Apache Airflow REST에 대한 자세한 내용은 Apache Airflow 참조API를 참조하세요. REST API

를 사용하면 자격 증명을 REST API 사용하여 AWS Apache Airflow에 액세스할 InvokeRestApi수 있습니다. 또는 웹 서버 액세스 토큰을 얻은 다음, 이 토큰을 사용해서 직접적으로 호출하여 액세스할 수도 있습니다.

참고
  • InvokeRestApi 작업을 사용하는 동안 "사용하도록 환경 업데이트InvokeRestApi" 메시지에 오류가 발생하면 Amazon MWAA 환경을 업데이트해야 함을 나타냅니다. 이 오류는 Amazon MWAA 환경이 InvokeRestApi 기능과 관련된 최신 변경 사항과 호환되지 않을 때 발생합니다. 이 문제를 해결하려면 Amazon MWAA 환경을 업데이트하여 InvokeRestApi 기능에 필요한 변경 사항을 통합합니다.

  • InvokeRestApi 작업의 기본 제한 시간은 10초입니다. 이 10초 기간 내에 작업이 완료되지 않으면 작업이 자동으로 종료되고 오류가 발생합니다. 오류가 발생하지 않도록이 제한 시간 내에 REST API 통화를 완료하도록 설계되었는지 확인합니다.

다음 예제에서는 Apache Airflow를 API 호출RESTAPI하고 새 DAG 실행을 시작하는 방법을 보여줍니다.

Apache Airflow에 대한 액세스 권한 부여RESTAPI: airflow:InvokeRestApi

AWS 자격 증명을 REST API 사용하여 Apache Airflow에 액세스하려면 IAM 정책에 airflow:InvokeRestApi 권한을 부여해야 합니다. 다음 정책 샘플에서, {airflow-role}에서 Admin, Op, User, Viewer 또는 Public 역할을 지정하여 사용자 액세스 수준을 지정합니다. 자세한 내용은 Apache Airflow 참조 가이드기본 역할을 참조하십시오.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowMwaaRestApiAccess", "Effect": "Allow", "Action": "airflow:InvokeRestApi", "Resource": [ "arn:aws:airflow:{your-region}:YOUR_ACCOUNT_ID:role/{your-environment-name}/{airflow-role}" ] } ] }
참고

프라이빗 웹 서버를 구성하는 동안 Virtual Private Cloud() 외부에서 InvokeRestApi 작업을 호출할 수 없습니다VPC. aws:SourceVpc 키를 사용하여 이 작업에 대해 더 세분화된 액세스 제어를 적용할 수 있습니다. 자세한 내용은 aws:SourceVpc를 참조하세요.

Apache Airflow 호출 REST API

다음 샘플 스크립트에서는 Apache Airflow를 사용하여 환경에서 사용 가능한를 REST API 나열하는 방법과 Apache Airflow 변수를 생성하는 방법을 다룹니다DAGs.

import boto3 env_name = "MyAirflowEnvironment" def list_dags(client): request_params = { "Name": env_name, "Path": "/dags", "Method": "GET", "QueryParameters": { "paused": False } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) def create_variable(client): request_params = { "Name": env_name, "Path": "/variables", "Method": "POST", "Body": { "key": "test-restapi-key", "value": "test-restapi-value", "description": "Test variable created by MWAA InvokeRestApi API", } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) if __name__ == "__main__": client = boto3.client("mwaa") list_dags(client) create_variable(client)

웹 서버 세션 토큰 생성 및 Apache Airflow 호출 REST API

웹 서버 액세스 토큰을 생성하려면 다음 Python 함수를 사용합니다. 이 함수는 먼저 Amazon을 호출MWAAAPI하여 웹 로그인 토큰을 가져옵니다. 60초 후에 만료되는 웹 로그인 토큰은 웹 세션 토큰으로 교환되며, 이를 통해 웹 서버에 액세스하고 Apache Airflow REST를 사용할 수 있습니다API. 초당 10개 이상의 트랜잭션(TPS)의 제한 용량이 필요한 경우이 방법을 사용하여 Apache Airflow REST에 액세스할 수 있습니다API.

참고

세션 토큰은 12시간 후에 만료됩니다.

def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

인증이 완료되면 API 엔드포인트에 요청을 보내기 시작할 수 있는 자격 증명이 있습니다. 아래 예제에서 dags/{dag_id}/dagRuns 엔드포인트를 사용합니다.

def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)