> [!WARNING] 작성중입니다.
이전 글인 [[DAG 파일이 Meta Database에 등록되는 과정]]에서 이어지는 내용입니다. 아직 이전 글을 보지 않으셨다면 꼭 보고 들어오세요!
- 선수 지식:
- Apache Airflow가 사용하는 ORM인 [SQLAlchemy](https://github.com/sqlalchemy/sqlalchemy?tab=readme-ov-file#sqlalchemy)
#### 3-2-1. `new_serialized_dag` 더 자세히 알아보기
`SerializedDagModel.write_dag()`의 맨 마지막은 `session.merge(new_serialized_dag)`를 실행하는 것입니다.
```python
class SerializedDagModel(Base):
...
@classmethod
@provide_session
def write_dag(
cls,
dag: DAG,
min_update_interval: int | None = None,
processor_subdir: str | None = None,
session: Session = NEW_SESSION,
) -> bool:
...
new_serialized_dag = cls(dag, processor_subdir) # [!blue]
...
session.merge(new_serialized_dag) # [!green]
```
`session.merge()`에 사용되는 new_serialized_dag가 어떤 것인지 알아야, 비로소 DAG가 Meta Database에 저장되는 과정을 이해할 수 있을 것 같습니다.
그렇다면, `new_serialized_dag`는 어떤 값을 가지고 있을까요? 한번 찾아가보겠습니다.
우선 `cls()`에 대해 알아보겠습니다.
이 함수는 상위 함수인 `write_dag(cls, ...)`를 경유하여 전달된 값입니다.
쉽게 설명하면, 함수가 속한 class 객체에 접근할 수 있도록 해주는 기능입니다.
따라서 아래의 코드와 같이 동작합니다.
```python
new_serialized_dag = SerializedDagModel(dag, processor_subdir)
```
`SerializedDagModel`의 `__init__()`은 다음과 같이 정의되어 있습니다.
```python
class SerializedDagModel(Base):
...
def __init__(
self,
dag: DAG,
processor_subdir: str | None = None
) -> None:
self.dag_id = dag.dag_id
self.fileloc = dag.fileloc
self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
self.last_updated = timezone.utcnow()
self.processor_subdir = processor_subdir
dag_data = SerializedDAG.to_dict(dag)
dag_data_json = json.dumps(dag_data, sort_keys=True).encode("utf-8")
self.dag_hash = md5(dag_data_json).hexdigest()
if COMPRESS_SERIALIZED_DAGS:
self._data = None
self._data_compressed = zlib.compress(dag_data_json)
else:
self._data = dag_data
self._data_compressed = None
self.__data_cache = dag_data
```
`__init__(dag, processor_dir)`에 전달된 파라미터와 이를 `__init__()`의 내부에서 어떻게 활용하는지 분석해보아야 합니다.
이 파라미터들은 어디에서 선언되어 넘어오는 것일까요?
이를 확인하기 위해 지금까지 실행된 코드의 순서를 역순으로 쫓아가보았습니다.
#### 3-2-2. `dag: DAG`
`dag`의 실제 값을 알아보기 위한 흐름은 다음과 같습니다.
1. `SerializedDagModel.__init__(dag, ...)`
2. `SerializedDagModel.write_dag(dag, ...)`
3. `DagBag._serialized_dag_capturing_errors(dag, ...)`
4. `DagBag._sync_to_db(dags, ...)`
5. `DagFileProcessor.save_dag_to_db(dags=dagbag.dags)`
6. `dagbag = DagFileProcessor._get_dagbag(cls, file_path)`
위 실행순서를 보았을 때, `dagbag`에 대해서 알아야 이해할 수 있을 것 같습니다.
따라서 6번의 `dagbag`을 생성하기 위해 사용되는 `DagFileProcessor의 _get_dagbag(...)`을 살펴보도록 하겠습니다.
```python
dagbag = DagFileProcessor._get_dagbag(cls, file_path)
```
```python
# airflow/dag_processing/processor.py
class DagFileProcessor
...
@classmethod
def _get_dagbag(cls, file_path: str):
try:
return DagBag(file_path, include_examples=False) # [!]
...
```
`_get_dag()`는 단순히 `DagBag`을 반환하는 함수입니다. 그렇다면 `DagBag`은 어떤 정보들을 가지고 있을까요?
```python
# airflow/models/dagbag.py
class DagBag(LoggingMixin):
"""
A dagbag is a collection of dags, parsed out of a folder tree
and has high level configuration settings.
...
"""
def __init__(
self,
dag_folder: str | Path | None = None,
include_examples: bool = ...,
...
):
self.dags: dict[str, DAG] = {} # dag_id를 key로 하는 DAG 객체 저장소
self.dag_folder = dag_folder
...
self.collect_dags(dag_folder, ...) # 생성 시 자동으로 DAG 수집
def collect_dags(self, dag_folder, ...):
"""지정된 폴더에서 DAG 파일들을 찾아 파싱"""
# process_file()을 통해 각 .py 파일을 파싱하고
# 발견된 DAG 객체들을 self.dags에 추가
```
`DagBag`은 결국 파일 경로를 받아 해당 경로의 Python 파일들을 파싱하고, 그 안에 정의된 DAG 객체들을 `self.dags` 딕셔너리에 수집하는 역할을 수행합니다.
따라서 `dagbag.dags`는 `{dag_id: DAG}` 형태의 딕셔너리이며, 이것이 `save_dag_to_db(dags=dagbag.dags)`로 전달되어 최종적으로 Meta Database에 저장됩니다.
<br>