> [!WARNING] 작성중입니다. 이전 글인 [[DAG 파일이 Meta Database에 등록되는 과정]]에서 이어지는 내용입니다. 아직 이전 글을 보지 않으셨다면 꼭 보고 들어오세요! - 선수 지식: - Apache Airflow가 사용하는 ORM인 [SQLAlchemy](https://github.com/sqlalchemy/sqlalchemy?tab=readme-ov-file#sqlalchemy) #### 3-2-1. `new_serialized_dag` 더 자세히 알아보기 `SerializedDagModel.write_dag()`의 맨 마지막은 `session.merge(new_serialized_dag)`를 실행하는 것입니다. ```python class SerializedDagModel(Base): ... @classmethod @provide_session def write_dag( cls, dag: DAG, min_update_interval: int | None = None, processor_subdir: str | None = None, session: Session = NEW_SESSION, ) -> bool: ... new_serialized_dag = cls(dag, processor_subdir) # [!blue] ... session.merge(new_serialized_dag) # [!green] ``` `session.merge()`에 사용되는 new_serialized_dag가 어떤 것인지 알아야, 비로소 DAG가 Meta Database에 저장되는 과정을 이해할 수 있을 것 같습니다. 그렇다면, `new_serialized_dag`는 어떤 값을 가지고 있을까요? 한번 찾아가보겠습니다. 우선 `cls()`에 대해 알아보겠습니다. 이 함수는 상위 함수인 `write_dag(cls, ...)`를 경유하여 전달된 값입니다. 쉽게 설명하면, 함수가 속한 class 객체에 접근할 수 있도록 해주는 기능입니다. 따라서 아래의 코드와 같이 동작합니다. ```python new_serialized_dag = SerializedDagModel(dag, processor_subdir) ``` `SerializedDagModel`의 `__init__()`은 다음과 같이 정의되어 있습니다. ```python class SerializedDagModel(Base): ... def __init__( self, dag: DAG, processor_subdir: str | None = None ) -> None: self.dag_id = dag.dag_id self.fileloc = dag.fileloc self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc) self.last_updated = timezone.utcnow() self.processor_subdir = processor_subdir dag_data = SerializedDAG.to_dict(dag) dag_data_json = json.dumps(dag_data, sort_keys=True).encode("utf-8") self.dag_hash = md5(dag_data_json).hexdigest() if COMPRESS_SERIALIZED_DAGS: self._data = None self._data_compressed = zlib.compress(dag_data_json) else: self._data = dag_data self._data_compressed = None self.__data_cache = dag_data ``` `__init__(dag, processor_dir)`에 전달된 파라미터와 이를 `__init__()`의 내부에서 어떻게 활용하는지 분석해보아야 합니다. 이 파라미터들은 어디에서 선언되어 넘어오는 것일까요? 이를 확인하기 위해 지금까지 실행된 코드의 순서를 역순으로 쫓아가보았습니다. #### 3-2-2. `dag: DAG` `dag`의 실제 값을 알아보기 위한 흐름은 다음과 같습니다. 1. `SerializedDagModel.__init__(dag, ...)` 2. `SerializedDagModel.write_dag(dag, ...)` 3. `DagBag._serialized_dag_capturing_errors(dag, ...)` 4. `DagBag._sync_to_db(dags, ...)` 5. `DagFileProcessor.save_dag_to_db(dags=dagbag.dags)` 6. `dagbag = DagFileProcessor._get_dagbag(cls, file_path)` 위 실행순서를 보았을 때, `dagbag`에 대해서 알아야 이해할 수 있을 것 같습니다. 따라서 6번의 `dagbag`을 생성하기 위해 사용되는 `DagFileProcessor의 _get_dagbag(...)`을 살펴보도록 하겠습니다. ```python dagbag = DagFileProcessor._get_dagbag(cls, file_path) ``` ```python # airflow/dag_processing/processor.py class DagFileProcessor ... @classmethod def _get_dagbag(cls, file_path: str): try: return DagBag(file_path, include_examples=False) # [!] ... ``` `_get_dag()`는 단순히 `DagBag`을 반환하는 함수입니다. 그렇다면 `DagBag`은 어떤 정보들을 가지고 있을까요? ```python # airflow/models/dagbag.py class DagBag(LoggingMixin): """ A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings. ... """ def __init__( self, dag_folder: str | Path | None = None, include_examples: bool = ..., ... ): self.dags: dict[str, DAG] = {} # dag_id를 key로 하는 DAG 객체 저장소 self.dag_folder = dag_folder ... self.collect_dags(dag_folder, ...) # 생성 시 자동으로 DAG 수집 def collect_dags(self, dag_folder, ...): """지정된 폴더에서 DAG 파일들을 찾아 파싱""" # process_file()을 통해 각 .py 파일을 파싱하고 # 발견된 DAG 객체들을 self.dags에 추가 ``` `DagBag`은 결국 파일 경로를 받아 해당 경로의 Python 파일들을 파싱하고, 그 안에 정의된 DAG 객체들을 `self.dags` 딕셔너리에 수집하는 역할을 수행합니다. 따라서 `dagbag.dags`는 `{dag_id: DAG}` 형태의 딕셔너리이며, 이것이 `save_dag_to_db(dags=dagbag.dags)`로 전달되어 최종적으로 Meta Database에 저장됩니다. <br>