
데이터
Airflow
복잡한 계산을 요하는 작업흐름과 데이터 처리 파이프라인을 조율하기 위해 만든 오픈소스 도구
StackOverflow 질문 수: 10932
Github Stars : ★ 39388
사용 기업

핀다

딜리셔스

뤼이드

에이블리

드라마앤컴퍼니

직방

당근

마이리얼트립

그린랩스

버킷플레이스

와디즈

버즈빌

마켓컬리

에이비일팔공

쏘카

매스프레소

마이셀럽스

오피지지
더 보기
베이글코드
Airflow DAG 좀 살려주세요!
에어플로우 모니터링과 알림 노하우안녕하세요, 베이글코드 데이터팀의 데이터 엔지니어 하석윤 그리고 김경훈 입니다. 저희는 Airflow 2nd 밋업에서 “Airflow DAG 좀 살려주세요!”라는 주제로 발표를 진행하였는데요. 이번 포스트에서는 발표 내용을 글로 정리해 전달 드리고자 합니다!데이터팀은 Airflow를 통해 모든 데이터 파이프라인을 자동화 하여 운영하고 있습니다. 원천 데이터 수집과 적재, Spark ETL, 그리고 각종 지표 Alert 그리고 ML/DL 모델 학습까지 Airflow가 모든 것을 관리해주고 있습니다. 그러나 작업을 Airflow에서 DAG를 자동화 하였다고 해서 끝나는게 아닙니다. 아주 여러 이유로 Airflow에 스케쥴링 한 작업이 실패합니다.스케쥴링 했던 작업이 데이터가 늘어나면서 Timeout을 겪을 수도 있고, API 호출을 하는 벤더 사에 장애가 발생하거나, DAG 로직 자체에 오류가 있었을 수 있습니다 데이터 엔지니어의 유지보수 작업은 Airflow 작업 실패에서 시작합니다. 기존에 자동화 해두었던 작업이 실패하게 되면 데이터 엔지니어가 살펴봐야 하죠.Airflow Document: CallbacksAirflow는 Task가 실행되는 과정에서 발생하는 각종 이벤트에 특정 동작을 실행할 수 있도록 callback 인터페이스를 지원 합니다. 성공, 실패, SLA miss, 재실행와 실행, 스킵될 때 특정 행동이 트리거 되도록 지정할 수 있습니다. 이번 글에서는on_failure_callback과 on_retry_callback을 어떻게 커스텀 했는지를 중점적으로 소개 드리고자 합니다.Airflow Document: CallbacksAirflow 공식 문서에 있는 예제 코드인데요. Task에 대한 callback을 DAG 레벨에서도 정의할 수 있고, Task 레벨에서도 정의할 수 있습니다. Airflow 2.6.0 버전부터는 여러 개의 콜백을 정의하는 것도 가능하다고 합니다!Airflow Alert 메시지 개선의 여정Airflow Callback 기능에 대해 간단하게 소개드렸고, 지금부터는 베이글코드가 어떻게 얼럿을 개선했는지 스토리를 소개 해보겠습니다!평소처럼 한명의 데이터 엔지니어가 slack 채널에서 장애 메시지를 발견 합니다.아, 오늘은 어떤 작업이 터졌을까~~위의 캡쳐는 베이글코드에서 몇년 동안 사용하던 에러 메시지인데요. 저도 회사에 입사한 이후로 3년 넘게 저 메시지를 보면서 작업을 디버그 했습니다. 그런데, 어느 순간 이런 생각이 들었습니다.이게 최선일까?Task는 얼마나 실행된건지, 어떤 Operator가 실패한건지, 디버그 해야 하는데 어디를 들어가야 하는지도 안 나와 있었습니다. 저희는 기존 메시지를 더 개선할 수 있다고 생각했고, 작업을 시작 했습니다!Raw URL 대신 링크에 Alias를 사용일단 첫번째 개선에서는 기존에 지저분하게 있던 Raw URL에 Alias를 부여 했습니다. DAG 웹페이지에도 바로 접속할 수 있도록 링크도 추가하였습니다.DAG 페이지에 대한 링크를 추가한
airflow
slack
spark
쏘카
로그 파이프라인 개선기 - 기존 파이프라인 문제 정의 및 해결 방안 적용
안녕하세요. 쏘카 데이터엔지니어링팀 삐약, 루디입니다.내용을 시작하기에 앞서, 저희 팀의 업무와 역할에 대해 간략히 소개해 드리겠습니다.데이터엔지니어링팀은 신뢰할 수 있는 데이터를 쏘카 구성원들이 안정적으로 활용할 수 있도록 기반을 마련하고, 이를 실제 비즈니스에 적용할 수 있는 서비스를 개발하며 환경을 구축하고 있습니다. 데이터 마트 관리, 데이터 인프라 구축, 그리고 데이터 제품(Data as a Product) 개발 등 폭넓은 업무를 수행하고 있습니다.특히 주요 업무로는 배치 및 실시간 스트리밍 파이프라인을 설계하고 개발하여, 쏘카의 모든 서비스에서 발생하는 데이터를 비즈니스 분석에 효과적으로 활용할 수 있도록 지원하는 역할을 하고 있습니다.이번 글에서는 저희 팀이 관리 및 운영하는 데이터 파이프라인 중, 비즈니스 의사 결정 시 지표로 사용되는 서버 로그를 데이터 웨어하우스로 사용하고 있는 BigQuery에 적재하는 로그 파이프라인 개선 과정을 소개드리고자 합니다개선을 하게 된 가장 주요 이유 중 하나는 데이터 스키마 변경으로 인해 겪는 어려움 이었습니다. 이를 해결하기 위해 데이터 컨트랙트를 도입하게 되었고, 이 과정에서 얻은 경험을 나누고자 합니다. 이번 시리즈는 비슷한 문제를 겪고 계신 분들께 도움이 되길 바랍니다.• 데이터 파이프라인을 구축하거나 개선하고자 하는 데이터 엔지니어• 데이터 컨트랙트를 도입하려는 개발자• 데이터 엔지니어의 업무에 대해 궁금한 분2. 기존 로그 파이프라인 현황기존 파이프라인을 설명하기에 앞서, 아키텍처의 문제를 더 명확히 이해하기 위해 원본 데이터의 구조와 요구사항을 먼저 살펴보겠습니다.원본 데이터에서 하나의 로그 파일은 다음과 같은 형식으로 제공됩니다.하나의 파일에는 여러 종류의 로그 데이터가 섞여 있으며, 모든 데이터에는 로그의 종류(type)와 생성 시간(timeMs) 같은 공통 필드가 포함되어 있습니다. 또한 각 로그 타입마다 고유한 필드도 존재합니다.파이프라인의 요구사항은 아래와 같습니다.• BigQuery 테이블화: 로그 데이터는 타입별로 구분된 BigQuery 테이블에 저장되어야 하며, 이를 통해 조회 및 분석이 가능해야 합니다.• 특정 타입 데이터 적재: 요청한 타입의 데이터만 BigQuery에 적재해야합니다.• 배치성 처리: 데이터 처리와 적재는 최소 2시간 이내에 이루어져야 하며, 가능하면 더 빠르게 처리되어야 합니다.이러한 데이터 구조와 요구사항을 바탕으로, 기존 파이프라인이 어떻게 구성되어 있는지 살펴보겠습니다.2-2. 기존 파이프라인 아키텍처우선 어떻게 데이터 파이프라인을 개선할 것인가에 대한 질문에 대한 답을 하기에 앞서 기존 아키텍처에 대하여 이해하고 어떤 부분에서의 문제가 발생하고 있는지의 파악이 필요합니다.기존 파이프라인 설계 당시의 상황과 고려 사항을 살펴보면, 이미 Amazon Kinesis Data Stream(KDS)과 Firehose를 통해 AWS S3에 데이터가 적재되고 있는 환경이었습니다. 더불어 로그 데이터는 주로 분석 용도로 활용될 예정이었기에 실시간성
airflow
googlebigquery
java
kafka
python
현대자동차그룹
(Airflow #1) 데이터 엔지니어들이 선택하는 Apache Airflow 소개
시작말안녕하세요.저는 ICT본부 CDO 기업데이터실에서 글로벌 기업데이터허브를 구성 & 관리 & 서비스 하고 있습니다.데이터 엔지니어링 업무를 수행하며 고민하거나 현재하고 있는 부분들을 공유 드리려고 합니다.저는 현재 Airflow 를 활용하여 데이터 수집, 처리, 저장 등의 다양한 Task들을 스케줄링 하고 있습니다.먼저 Airflow 가 어떤 것이고 왜 Airflow 를 활용하고 있는지에 대해서 공유 드리려고 합니다.Airflowhttps://airflow.apache.org/docs/apache-airflow/stable/index.html#what-is-airflowApache Airflow 는 Airbnb 에서 workflow 들을 관리하고 스케줄링 하기 위해 만든 파이썬 기반의 오픈 소스 입니다.Workflow 를 Python code 로 작성할 수 있으며, DAG(Directed Acyclic Graph) 라는 대분류 안에 workflow 들이 속하여 스케줄링 됩니다.간단한 모니터링 화면을 제공하지만 제법 필요한 기능들을 포함하고 있습니다.https://airflow.apache.org/docs/apache-airflow/stable/index.html#workflows-as-codeApache Airflow 공식 문서 - DAG 시각화Apache Airflow 공식 문서 - DAG 시각화왜 Airflow 를 사용하는지?기존의 Hadoop 생태계를 사용하셨다면 Oozie 를 많이 활용했던 경험이 있으실 듯 합니다.Oozie 와 비슷한 개념들을 많이 가지고 있지만, 확실히 차별적으로 가지고 있는 기능들도 많이 있기 때문에 Airflow 를 메인 스케줄러로 활용합니다.아래와 같은 이유로 Airflow 사용을 고려했습니다.Open Source 입니다.Python 기반의 Workflow 작성을 제공합니다.팀구성원 분들이 다양하게 사용하는 언어 중 공통적으로 일치하는 부분이 Python 입니다.앞으로 함께 일 하실 분들의 대다수가 Python 활용 가능자 일 듯 합니다.빌드를 하지 않고 수정할 수 있는 스크립트 언어가 유지보수하기 편합니다.Kubernetes 를 지원합니다.물론 Local Executor 나 Celery (자체 클러스터) 를 지원합니다.https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#local-executorshttps://airflow.apache.org/docs/apache-airflow-providers-celery/stable/celery_executor.html현재 Kubernetes 플랫폼을 구축 중이며 상당히 이식이 편리할것으로 생각됩니다.https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/kubernetes_executor.htmlHelm 차트를 활용할 수 있습니다.https://airflow.apache.org/docs/helm-chart/stable/index.html다만 Hadoop YARN 을 지원하지 않습니다.Airflow 를 활용하여 배치 플랫폼 구축이 용이 합니다.Airflow 에서는 Python 파일을 DAGS 경로에 넣어주면 읽히기 때문에 모듈화된 데이터 수집 프로세스를 쉽게 사용할 수 있도록 구성할 수 있습니다.https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/overview.html#distributed-airflow-architectureAirflow 는 API 관련 문서를 잘 정리해두기 때문에 스케줄링, Workflow, Task 들을 API 를 활용하여 관리하기 편리합니다.https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.htmlXCOM 을 통하여 Task 간 간단한 정보 전달이 용이 합니다.Pool 을 지정하여 더욱 타이트한 스케줄 자원 관리를 할 수 있습니다.Sensor 나 Trigger 를 활용하여 이벤트에 의한 배치 flow 를 구성할 수도 있습니다.https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html#sensorshttps://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.htmlAirflow 아키텍처https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins.htmlhttps://airflow.apache.org/docs/apache-airflow/stable/core-concepts/overview.html#distributed-airflow-architecturehttps://airflow.apache.org/docs/apache-airflow/stable/core-concepts/overview.html#separate-dag-processing-architectureAirflow 의 구성에 대해서 소개해 드리겠습니다.Airflow 는 3가지 형태로 구성할 수 있습니다.Basic Airflow deployment기본적으로 Airflow 를 설치할 때 필수로 구성되는 아키텍처 입니다.Apache Airflow 공식 문서 - Basic Airflow deploymentDistributed Airflow architecture흔히들 설치 시 많이 따르고 있는 아키텍처 입니다.Apache Airflow 공식 문서 - Distributed Airflow architectureSeparate DAG processing architectureDAG Processor 를 보안상의 이유로 별도로 분리한 아키텍처 입니다. 기존의 scheduler 에서 DAG 파일을 읽어 들였다면, 해당
airflow
kubernetes
python
당근
당근페이 재무 결산 사례로 보는 백엔드와 데이터의 만남
안녕하세요. 저는 당근페이 머니서비스팀 백엔드 엔지니어 클로버(Clover)에요! 당근페이에서는 동네에서 쉽고 편하게 쓸 수 있는 금융 서비스를 만들고 있어요. 당근에서 일어나는 거래에는 당근머니가 사용되는데요. 머니서비스팀에서는 당근머니와 관련된 일들에 더해, 동네에서 쓰면 다양한 혜택이 적립되는 당근카드와 관련된 일들을 하고 있어요. 다시 말해 저희 팀은 지역에서 생기는 다양한 거래를 연결하는 게 목표예요!재무 결산의 중요성당근페이에서는 매월 셀 수 없이 많은 거래가 이루어지는데요. 매월 이 거래들을 바탕으로 당근페이 내에서 돈이 어떤 경우에 어디에서 어디로 흘러갔는지 재무 결산을 진행해요. 예를 들면 당근머니를 충전하면 사용자의 계좌에서 당근페이 모계좌로 돈이 이동하는데, 당근머니 충전액의 총합은 모계좌 입금액의 총합과 같아야 해요. 재무 결산을 했는데 단 1원이라도 맞지 않는다면 그 1원을 찾기 위해 모든 업무를 중단해야 해요. 그만큼 핀테크 회사인 당근페이에서는 재무 결산이 중요해요.당근페이 초기에는 거래량이나 거래 종류가 많지 않았기 때문에 데이터를 직접 SQL 쿼리로 뽑았었어요. 그러다가 당근페이가 커지면서…결산 어드민 도입 전 머니 결산 스레드위와 같이 매월 초에 열리는 머니 결산 스레드에서, 백엔드 엔지니어들이 태그되기 시작했어요. 머니서비스팀에 도메인 담당자가 많아질수록 매월 결산 슬랙 스레드에 태그되는 사람도 점점 많아졌죠. 결국 머니서비스팀 백엔드 개발자들은 매월 첫 주에는 정기적으로 코드가 아닌 SQL 쿼리를 치고 있는 상황이 일어났어요.이런 비효율을 어떻게 개선해 볼 수 있을까요? 모두가 SQL 쿼리를 더 정확하고 빠르게 작성하도록 SQLD 자격증 스터디를 열어볼 수도 있을 듯해요. 하지만 저희 팀에서는 이를 시스템적으로 개선해보고자 했어요.Spring Batch와 MySQL기반 결산가장 쉽게 떠오르는 방법은 스프링 배치를 이용하는 방법이에요. 대용량 데이터를 처리할 때 가장 흔하게 사용하는 기술이죠. 처음에는 저희 팀도 이 방법으로 결산을 진행했어요. 대략적인 구조는 다음과 같았어요.당근페이 머니서비스팀은 마이크로서비스로 컴포넌트들이 구성돼 있어서, 각자 다른 데이터베이스를 가지고 있어요. 여러 데이터들을 종합해서 진행하는 결산의 특성상, 배치는 여러 개의 데이터베이스를 바라보고 진행해요.이러한 결산은 크게 두 단계로 구성돼 있어요.원본 DB에서 성공 거래를 바탕으로 원장(raw data)을 구성해 결산 전용 DB에 밀어 넣기위에서 가져온 원장을 기반으로 집계해 일별 결산 리포트를 만들고 결산 전용 DB에 쌓기이런 단계별로 구성된 배치를 매일 00시에 Cron을 통해 트리거해요. 재무 담당자나 결산 이해관계자들은 이렇게 단계별, 일자별로 잘 집계된 데이터들을 스프링 기반 백엔드 및 결산 프론트 화면을 통해 언제든지 조회할 수 있어요. 여기까지는 괜찮은 것 같아요. 그러던 어느 날 문제를 마주했죠.구조의 한계배치 실패 알림 발생어느 날 갑자기 이런 배치 작업 실패 알림을 받게 됐어요. 보통 이렇게 결산 배치를 실
airflow
awsredshift
hibernate
mysql
spring
연관 기술 스택

Luigi