#apache-airflow #dag-file-processing
![[apache-airflow-logo.png]]
***=="Apache Airflow 2.10.4 버전을 기준으로 작성하였습니다."==***
![[airflow-dag-registration 1.png]]
이 글은 Apache Airflow에서 새로 등록된 DAG 파일이 어떤 과정을 통해 Meta Database에 등록되고 실행 가능한 상태가 되는지에 대해 **분석**하여 **코드레벨**로 설명한 글입니다.
- 목차:
1. 뜯어보기로 결심한 이유
2. 실행 과정의 간략한 정리
3. 코드 분석하기
4. 마치며
## 1. 뜯어보기로 결심한 이유
DAG 파일이 `dags/`에 등록되면 어떤 과정을 통해 Meta Database에 등록되고 실행 가능한 상태가 되어 Web UI에 표시되고 활용이 가능한지 궁금해져서, 시간을 들여 천천히 코드 저 아래로 내려가 보았습니다.
Apache Airflow 프로젝트는 Apache 프로젝트 인기도(Star수) 4위인 만큼, 전세계의 기여자들로부터 많은 기여가 발생하는 큰 프로젝트입니다. 그에 걸맞게 드넓은 코드 베이스를 가지고 있으며, 각 컴포넌트 간의 추상화 레벨이 높습니다.
그렇기에 이곳저곳 헤매며 많은 시간을 썼지만, 결국 제가 원하는 동작을 찾아내어 기쁜 마음으로 여러분께 글로써 공유드리고자 합니다.
제가 설명하는 코드의 흐름을 물 흐르듯 따라오시고, 제가 정성스럽게 연결해놓은 각 로직에 해당하는 코드를 통해 이 과정을 저와 같이 깊게 이해해보시면 좋을 것 같습니다.
## 2. 실행 과정의 간략한 정리
먼저, 우리가 작성한 DAG 파일이 Meta Database에 등록되는 간략한 과정을 설명드리고자 합니다.
DAG 파일이 DAGs 폴더에 저장되면, Scheduler에 의해 실행된 **DAG File Processing**을 통해 Meta Database에 저장됩니다.
[공식 문서](https://airflow.apache.org/docs/apache-airflow/2.10.4/authoring-and-scheduling/dagfile-processing.html#dag-file-processing)에서는 DAG File Processing을 다음과 같이 정의하고 있습니다.
> [!NOTE] DAG File Processing
> DAG File Processing refers to the process of turning Python files contained in the DAGs folder into DAG objects that contain tasks to be scheduled.
>
> There are two primary components involved in DAG file processing.
> The `DagFileProcessorManager` is a process executing an infinite loop that determines which files need to be processed, and the `DagFileProcessorProcess` is a separate process that is started to convert an individual file into one or more DAG objects.
> > **DAG File Processing**은 DAGs 폴더의 Python 파일을 분석해서 스케줄링에 사용할 수 있는 DAG 객체로 만드는 작업입니다.
> >
> > 이 작업에는 어떤 파일을 처리할지 결정하는 무한 루프인 DagFileProcessorManager와 개별 파일을 DAG 객체로 변환하는 DagFileProcessorProcess로 구성되어 있습니다.
이로써 **DAG File Processing**이 어떤 작업인지 간략하게나마 알 수 있게 되었습니다.
그렇다면 **DAG File Processing**은 언제 실행될까요? 이 또한 공식문서에 친절하게 설명되어 있습니다.
> ... otherwise, starting the scheduler process (`airflow scheduler`) also starts the DagFileProcessorManager.
**DAG File Processing**의 주요 요소 중 하나인 **DagFileProcessorManager**는 `airflow scheduler` CLI를 통해 실행된다는 것을 확인할 수 있었습니다.
지금까지의 설명을 정리하면 다음과 같습니다.
> 1. **DAG File Processing** 작업은 DAGs 폴더에 저장된 Python 파일 내부의 DAG 객체를 실행 가능한 형태로 만든다.
>
> 2. **DAG File Processing**에는 두 가지의 주요 요소가 있으며, 이는 **DagFileProcessorManager**와 **DagFileProcessorProcess**이다.
>
> 3. **DagFileProcessorManager**는 어떤 파일을 처리할지 결정하는 무한 루프를 실행하는 프로세스이며, **DagFileProcessorProcess**는 각 Python 파일을 분석하여 포함된 DAG 정보를 객체로 변환하는 작업을 수행한다.
>
> 4. **DAG File Processing** 작업은 Airflow CLI 명령 중 하나인 `airflow scheduler`에서 실행된다.
> [!CHECK] 현재까지의 과정
> `airflow scheduler` 명령어에 의해 **DagFileProcessorManager**가 실행되고, 이후 **Meta Database**에 처리된 DAG가 저장됩니다.
> ```mermaid
> graph TD;
> AS[<code> airflow scheduler </code>]
> --> DagFileProcessorManager
> --> meta_database[(Meta Database)]
> ```
같은 페이지에는 다음의 다이어그램도 존재합니다. 이를 통해 시각적으로 **DAG File Processing**의 자세한 동작을 이해할 수 있습니다.
![[airflow-docs-dag-file-processing.png]]
크게 두 가지의 컴포넌트로 나뉘어 있는 것을 확인할 수 있으며, `DagFileProcessorManager`의 실행과정 중 `DagFileProcessorProcess`가 호출되는 것을 알 수 있습니다.
위에서 설명했던 것과 같이, `DagFileProcessorManager`는 어떤 파일을 처리할지 결정하는 무한루프를 실행하는 프로세스이며, 이 프로세스가 지속적으로 실행되면서 `DagFileProcessorProcess`를 실행하는 것을 알 수 있습니다.
아래의 순서는 위 다이어그램을 풀어서 설명한 것입니다.
1. **DagFileProcessorManager**
1. 신규 파일 확인
2. 최근 처리된 파일 제외
3. Queue에 처리할 파일 경로 삽입
4. 처리할 파일들을 지정하여 DagFileProcessorProcess 실행
2. **DagFileProcessorProcess**
1. DagFileProcessorManager에 의해 전달된 파일을 하나씩 처리
2. 전달된 파일에 명시된 모듈들을 로드
3. 모듈들을 처리
4. **DagBag** 반환
3. 다시 **DagFileProcessorManager**
1. 처리 결과 수집
2. 통계치 로깅
위 내용을 기반으로, 처리된 DAG를 Meta Database에 저장하는 로직은 **DagFileProcessorProcess**의 내부에 있을 것으로 예상됩니다. **DagBag**을 반환하는 과정의 가까운 곳에 위치할 것 같네요.
아직 어느 부분에서 DAG를 Meta Database에 저장하는 것인지 정확히는 알 수 없습니다. 이제 코드를 직접 뜯어 볼 차례입니다.
## 3. 코드 분석하기
> [!info]
> 실행과정의 코드를 전부 보여드리며 설명드리려 했으나 글이 많이 지루해지는 것 같습니다. 따라서 전부를 보여드리는 것이 아닌 실행 과정만 보여드리고 메인 로직에 대한 설명을 하는 방식으로 진행하였습니다.
>
> 만약 전체 코드를 보고 싶으시면, 각 영역에 걸어둔 링크를 통해 확인하시길 바랍니다(각 링크마다 해당 코드의 시작점에 해당하는 Line을 함께 넣은 디테일을 확인해주세요).
먼저, `airflow scheduler`를 진입점으로 시작하여, 최종 처리된 DAG가 Meta Database에 저장되는 과정의 코드를 살펴보도록 하겠습니다.
### 3-1. Scheduler에서 DAG File Processing 작업 실행
공식문서의 내용과 같이, DAG File Processing 작업은 Scheduler에 의해 실행됩니다.
문서의 내용을 미루어 보았을 때, "Scheduler는 DagFileProcessorManager를 통해 DAG File Processing 작업을 실행하는구나."라고 생각할 수 있습니다.
하지만 코드를 직접 들여다보니 실제로는 좀 더 세분화된 작업에 의해 실행되는 것을 확인할 수 있었습니다.
**Scheduler**에 의해 `SchedulerJobRunner`가 호출되고,
`SchedulerJobRunner`를 통해 `DagFileProcessorAgent`가 호출된다는 것을 말이죠.
이어서 `DagFileProcessorAgent`에 의해 `DagFileProcessorManager`가 호출되는 것을 확인하였습니다.
과정이 실행되는 순서를 코드로 나열해보았습니다.
(각 메서드에 포함된 파라미터들은 제외하였습니다. 이 또한 걸어둔 링크를 통해 확인해보시길 바랍니다)
1. **Scheduler** - **"Start Airflow Scheduler"**
1. [`scheduler()`](https://github.com/apache/airflow/blob/2.10.4/airflow/cli/commands/scheduler_command.py#L53)
1. [`_run_scheduler_job()`](https://github.com/apache/airflow/blob/2.10.4/airflow/cli/commands/scheduler_command.py#L41)
2. [**SchedulerJobRunner**](https://github.com/apache/airflow/blob/2.10.4/airflow/jobs/scheduler_job_runner.py#L139) - **"SchedulerJobRunner runs for a specific time interval and schedules jobs that are ready to run."**
1. [`_execute()`](https://github.com/apache/airflow/blob/2.10.4/airflow/jobs/scheduler_job_runner.py#L947)
3. [**DagFileProcessorAgent**](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L104) - **"Agent for DAG file processing."**
1. [`start()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L152)
1. [`_run_processor_manager()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L220)
4. **DagFileProcessorManager**
그렇다면, DAG를 Meta Database에 저장하는 로직은 어디에 있을까요? **DagFileProcessorManager** 부터 쭉 따라가보도록 하겠습니다.
4. [**DagFileProcessorManager**](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L238) - **"Manage processes responsible for parsing DAGs."**
1. [`start()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L472)
1. [`_run_parsing_loop()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L555)
1. `hearbeat()`
1. `_heartbeat_manager()`
2. [`start_new_processes()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L1214)
5. [**DagFileProcessorProcess**](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L1227) - **"Runs DAG processing in a seperate process using DagFileProcessor."**
1. [`start()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L220)
1. [`_run_file_processor()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L142)
1. [`_handle_dag_file_processing()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L177)
6. **DagFileProcessor**
위 순서는 공식문서의 내용과 (당연하게도) 일치합니다.
`DagFileProcessorManger`의 `start()`를 시작으로, 그 안에서 `_run_parsing_loop()`를 호출하고, `_run_parsing_loop()`의 내부에서 `heartbeat()`를 호출합니다.
`heartbeat()`는 `_heartbeat_manager()`를 호출하고, `_heartbeat_manager()`는 또 다시 `start()`를 호출합니다.
이로써 공식문서의 내용인 **==DagFileProcessorManager는 어떤 파일을 처리할지 결정하는 무한루프 프로세스==** 라는 것을 코드에서 확인할 수 있었습니다.
**DagFileProcessorManager**의 Process files 작업은 **DagFileProcessorProcess**을 호출을 통해 수행됩니다.
6. [**DagFileProcessor**](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L415) - **"Process a Python file containing Airflow DAGs."**
1. [`process_file()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L889)
1. [`dagbag = DagFileProcessor._get_dagbag()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L880)
2. [`save_dag_to_db()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L976)
7. [**DagBag**](https://github.com/apache/airflow/blob/2.10.4/airflow/models/dagbag.py#L99) - **"A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings."**
1. [`_sync_to_db()`](https://github.com/apache/airflow/blob/2.10.4/airflow/models/dagbag.py#L663)
1. [`_serialize_dag_capturing_errors()`](https://github.com/apache/airflow/blob/2.10.4/airflow/models/dagbag.py#L676)
- `_sync_to_db(dags)`를 통해 전달된 dags를 하나씩 순회하며 실행됩니다.
8. [**SerializedDagModel**](https://github.com/apache/airflow/blob/2.10.4/airflow/models/serialized_dag.py#L57) - **"A table for serialized DAGs"**
1. [`write_dag()`](https://github.com/apache/airflow/blob/2.10.4/airflow/models/serialized_dag.py#L137)
1. `dag_hash`로 기존 버전 확인
2. [`session.merge(new_serialized_dag)`](https://github.com/apache/airflow/blob/2.10.4/airflow/models/serialized_dag.py#L183)
위와 같은 코드의 동작을 통해 최종적으로 DAG 파일이 Meta Database에 저장되는 메인 로직인 `write_dag()`가 실행됩니다.
### 3-2. DAG 파일을 Meta Database에 저장하는 코드 분석
DAG 파일을 저장하는 메서드를 찾았으니, 자세히 분석해보도록 하겠습니다.
```python
class SerializedDagModel(Base):
"""
A table for serialized DAGs.
serialized_dag table is a snapshot of DAG files synchronized by scheduler.
This feature is controlled by:
...
"""
...
@classmethod
@provide_session
def write_dag(
cls,
dag: DAG,
min_update_interval: int | None = None,
processor_subdir: str | None = None,
session: Session = NEW_SESSION,
) -> bool:
"""
Serialize a DAG and writes it into database.
If the record already exists, it checks if the Serialized DAG changed or not.
If it is changed, it updates the record, ignores otherwise.
...
"""
# Checks if (Current Time - Time when the DAG was written to DB) < min_update_interval
# If Yes, does nothing
# If No or the DAG does not exists, updates / writes Serialized DAG to DB
# 기준에 맞지 않다면 처리를 생략합니다.
if min_update_interval is not None:
if session.scalar(
select(literal(True)).where(
cls.dag_id == dag.dag_id,
(timezone.utcnow() - timedelta(seconds=min_update_interval)) < cls.last_updated,
)
):
return False
log.debug("Checking if DAG (%s) changed", dag.dag_id)
# cls의 dag와 processor_subdir에 대해 더 알아봐야 해석할 수 있는 구문이네요.
new_serialized_dag = cls(dag, processor_subdir)
# 새로 저장할 DAG와 비교하기 위해, 기존 DB에 저장된 레코드를 가져옵니다.
serialized_dag_db = session.execute(
select(cls.dag_hash, cls.processor_subdir).where(cls.dag_id == dag.dag_id)
).first()
# 새로 저장할 DAG와 앞서 가져온 기존의 레코드를 비교합니다.
if (
serialized_dag_db is not None
and serialized_dag_db.dag_hash == new_serialized_dag.dag_hash
and serialized_dag_db.processor_subdir == new_serialized_dag.processor_subdir
):
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
return False
log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
# 여기에서 최종적으로 저장이 되겠네요.
session.merge(new_serialized_dag)
log.debug("DAG: %s written to the DB", dag.dag_id)
return True
```
`session.merge(new_serialized_dag)`를 통해 Meta Database에 DAG가 등록되기까지의 과정을 살펴보았습니다.
이어지는 내용으로 [SQLAlchemy](https://github.com/sqlalchemy/sqlalchemy?tab=readme-ov-file#sqlalchemy)의 `session.merge()` 동작 방식, `new_serialized_dag`에 담긴 실제 데이터 형태, 그리고 Meta Database의 테이블에 저장된 결과까지 다루고 싶었으나, 하나의 글에 모두 담기엔 내용이 방대하여 [[DAG 파일이 Meta Database에 등록되는 과정 - session.merge와 new_serialized_dag|다음 글]]에서 이어서 설명드리겠습니다.
현재까지 분석한 내용을 정리하면 다음과 같습니다.
```mermaid
graph TD;
classDef scheduler fill:#e1f5fe,stroke:#01579b,stroke-width:2px;
classDef processor fill:#fff3e0,stroke:#e65100,stroke-width:2px;
classDef storage fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px;
classDef database fill:#f3e5f5,stroke:#4a148c,stroke-width:2px;
%% 1. Scheduler 영역
subgraph SG1 [Scheduler]
S[Scheduler]
--> SJR[SchedulerJobRunner]
--> DFPA[DagFileProcessorAgent]
end
%% 2. DagFileProcessor 영역
subgraph SG2 [DagFileProcessor]
DFPA
--> DFPM[DagFileProcessorManager]
--> DFPP[DagFileProcessorProcess]
--> DFP[DagFileProcessor]
end
%% 3. Dag 파싱 및 직렬화 영역
subgraph SG3 [Serialization]
DFP
--> DBG[DagBag]
--> SDM[SerializedDagModel]
end
%% 4. 최종 저장소
SDM -- "session.merge()" --> MDB[(Meta Database)]
%% 클래스 적용
class SG1 scheduler;
class SG2 processor;
class SG3 storage;
class MDB database;
%% 스타일 보정
style SG1 color:#01579b,stroke-dasharray: 5 5
style SG2 color:#e65100,stroke-dasharray: 5 5
style SG3 color:#1b5e20,stroke-dasharray: 5 5
```
<br>
## 4. 마치며
Apache Airflow 오랫동안 Apache 재단의 프로젝트 최상위권을 차지하고 있는 매우 인기있는 프로젝트입니다.
아래의 그래프를 보시면, 이 프로젝트가 얼마나 많은 인기를 가지고 있는지 바로 이해하실 수 있습니다(이미 많은 Star 수를 가지고 있음에도 꾸준하게 우상향으로 나아가는 것을 볼 수 있습니다).

Apache Airflow는 이 만큼이나 많은 사람들이 관심을 가지고 지켜보는 프로젝트이며, 많은 기여자들이 꾸준히 발전시키고 있습니다.
업무를 하면서 프레임워크를 잘 사용하는 것도 물론 중요하지만, 단순한 사용을 넘어, 그 안의 동작을 이해하면 훨씬 더 깊이있는 프로그래밍 생활이 될 것입니다. 그리고 내부 동작 원리를 이해한다면 디버깅을 빠르게 할 수 있고 더 효율적으로 동작하는 코드를 작성할 수 있겠죠.
그렇기에, 저는 앞으로도 꾸준히 관심있는 오픈소스 프로젝트들의 코드베이스를 탐구할 예정입니다. 이런 습관을 지속한다면, Airflow와 같은 대규모 프로젝트의 코드레벨에 직접 기여하거나, 더 나아가 직접 만들어 볼 수 있게 되지 않을까요? (Creator of something...)
아, 여기까지 읽으신 분들에게만 재미있는 정보를 공유드립니다. 혹시 여러분은 Apache Airflow의 초기 프로젝트명을 알고 계신가요? 단순한 호기심으로 Airflow 창시자분의 초기 Repositary를 들춰보던 중에 발견하였습니다.
==그 이름은 바로 ***Flux***.==
아래의 코드에서 발견할 수 있었습니다.
([Apache Airflow Github Repositary의 첫 Commit 중 일부](https://github.com/apache/airflow/commit/1047940ca4363b04044c4963b9c88f7632746407#diff-b335630551682c19a781afebcf4d07bf978fb1f8ac04c6bf87428ed5106870f5R1))
![[apache-airflow-first_official_commit_by_mistercrunch.png]]
참고로 Airflow 창시자는 Maxime Beauchemin입니다([Github 닉네임은 mistercrunch](https://github.com/mistercrunch)).
창시자 본인이 직접 커밋한 내용들을 Github에서 찾아보면, Airbnb와 관련된 프로젝트라는 것을 확인할 수 있습니다(Airflow는 Airbnb에서 오픈소스로 공개하였으며, 이후 Airflow 재단에 인수되었습니다).
긴 글 끝까지 읽어주셔서 정말 감사합니다!