Apache Airflow 사용 REST API
Amazon Managed Workflows for Apache Airflow(Amazon MWAA)는 Apache Airflow v2.4.3 이상을 실행하는 환경에서 Apache Airflow를 사용하여 Apache Airflow REST API 환경과 직접 상호 작용할 수 있도록 지원합니다. 이를 통해 프로그래밍 방식으로 Amazon MWAA 환경에 액세스하고 관리할 수 있으며, 데이터 오케스트레이션 워크플로를 호출하고,를 관리하고DAGs, 메타데이터 데이터베이스, 트리거 및 스케줄러와 같은 다양한 Apache Airflow 구성 요소의 상태를 모니터링하는 표준화된 방법을 제공합니다.
APIAmazonMWAA은 Apache Airflow REST를 사용하는 동안 확장성을 지원하기 위해 REST API 요청, 명령줄 인터페이스(CLI) 사용 또는 더 많은 동시 Apache Airflow 사용자 인터페이스(UI) 사용자 등 수요 증가를 처리하기 위해 웹 서버 용량을 수평적으로 확장할 수 있는 옵션을 제공합니다. Amazon이 웹 서버를 MWAA 확장하는 방법에 대한 자세한 내용은 섹션을 참조하세요Amazon MWAA 웹 서버 자동 조정 구성.
Apache Airflow를 사용하여 환경에 대해 다음 사용 사례를 구현RESTAPI할 수 있습니다.
프로그래밍 방식 액세스 - 이제 Apache Airflow UI 또는에 의존하지 않고 Apache Airflow DAG 실행을 시작하고, 데이터 세트를 관리하고, 메타데이터 데이터베이스, 트리거 및 스케줄러와 같은 다양한 구성 요소의 상태를 검색할 수 있습니다CLI.
외부 애플리케이션 및 마이크로서비스와 통합 - REST API 지원을 통해 Amazon MWAA 환경을 다른 시스템과 통합하는 사용자 지정 솔루션을 구축할 수 있습니다. 예를 들어 완료된 데이터베이스 작업 또는 새 사용자 등록과 같은 외부 시스템의 이벤트에 대한 응답으로 워크플로를 시작할 수 있습니다.
중앙 집중식 모니터링 - DAGs 여러 Amazon MWAA 환경에서의 상태를 집계하는 모니터링 대시보드를 구축하여 중앙 집중식 모니터링 및 관리를 활성화할 수 있습니다.
Apache Airflow REST에 대한 자세한 내용은 Apache Airflow 참조API를 참조하세요. REST API
를 사용하면 자격 증명을 REST API 사용하여 AWS Apache Airflow에 액세스할 InvokeRestApi
수 있습니다. 또는 웹 서버 액세스 토큰을 얻은 다음, 이 토큰을 사용해서 직접적으로 호출하여 액세스할 수도 있습니다.
작업을 사용하는 동안 "사용하도록 환경 업데이트InvokeRestApi
" 메시지에 오류가 발생하면 Amazon MWAA 환경을 업데이트해야 함을 나타냅니다. 이 오류는 Amazon MWAA 환경이InvokeRestApi
기능과 관련된 최신 변경 사항과 호환되지 않을 때 발생합니다. 이 문제를 해결하려면 Amazon MWAA 환경을 업데이트하여InvokeRestApi
기능에 필요한 변경 사항을 통합합니다. -
작업의 기본 제한 시간은 10초입니다. 이 10초 기간 내에 작업이 완료되지 않으면 작업이 자동으로 종료되고 오류가 발생합니다. 오류가 발생하지 않도록이 제한 시간 내에 REST API 통화를 완료하도록 설계되었는지 확인합니다.
다음 예제에서는 Apache Airflow를 API 호출RESTAPI하고 새 DAG 실행을 시작하는 방법을 보여줍니다.
Apache Airflow에 대한 액세스 권한 부여RESTAPI: airflow:InvokeRestApi
AWS 자격 증명을 REST API 사용하여 Apache Airflow에 액세스하려면 IAM 정책에 airflow:InvokeRestApi
권한을 부여해야 합니다. 다음 정책 샘플에서, {airflow-role}
에서 Admin
, Op
, User
, Viewer
또는 Public
역할을 지정하여 사용자 액세스 수준을 지정합니다. 자세한 내용은 Apache Airflow 참조 가이드의 기본 역할
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowMwaaRestApiAccess", "Effect": "Allow", "Action": "airflow:InvokeRestApi", "Resource": [ "arn:aws:airflow:{your-region}:YOUR_ACCOUNT_ID:role/{your-environment-name}/{airflow-role}" ] } ] }
프라이빗 웹 서버를 구성하는 동안 Virtual Private Cloud() 외부에서 InvokeRestApi
작업을 호출할 수 없습니다VPC. aws:SourceVpc
키를 사용하여 이 작업에 대해 더 세분화된 액세스 제어를 적용할 수 있습니다. 자세한 내용은 aws:SourceVpc를 참조하세요.
Apache Airflow 호출 REST API
다음 샘플 스크립트에서는 Apache Airflow를 사용하여 환경에서 사용 가능한를 REST API 나열하는 방법과 Apache Airflow 변수를 생성하는 방법을 다룹니다DAGs.
import boto3 env_name = "MyAirflowEnvironment" def list_dags(client): request_params = { "Name": env_name, "Path": "/dags", "Method": "GET", "QueryParameters": { "paused": False } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) def create_variable(client): request_params = { "Name": env_name, "Path": "/variables", "Method": "POST", "Body": { "key": "test-restapi-key", "value": "test-restapi-value", "description": "Test variable created by MWAA InvokeRestApi API", } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) if __name__ == "__main__": client = boto3.client("mwaa") list_dags(client) create_variable(client)
웹 서버 세션 토큰 생성 및 Apache Airflow 호출 REST API
웹 서버 액세스 토큰을 생성하려면 다음 Python 함수를 사용합니다. 이 함수는 먼저 Amazon을 호출MWAAAPI하여 웹 로그인 토큰을 가져옵니다. 60초 후에 만료되는 웹 로그인 토큰은 웹 세션 토큰으로 교환되며, 이를 통해 웹 서버에 액세스하고 Apache Airflow REST를 사용할 수 있습니다API. 초당 10개 이상의 트랜잭션(TPS)의 제한 용량이 필요한 경우이 방법을 사용하여 Apache Airflow REST에 액세스할 수 있습니다API.
세션 토큰은 12시간 후에 만료됩니다.
def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None
인증이 완료되면 API 엔드포인트에 요청을 보내기 시작할 수 있는 자격 증명이 있습니다. 아래 예제에서 dags/{dag_id}/dagRuns
엔드포인트를 사용합니다.
def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)