[Airflow] Airflow cluster, celery executor + flower + RabbitMQ 환경 구성하기
이번 포스트에는 AWS EC2 3대로 구성된 airflow cluster를 CeleryExecutor로 설치해보겠습니다.
서버 구성 스펙
OS - Amazon linux2 AMI (HVM) - Kernel 5.10, SSD Volume type
instance type - t2.Large 2vCPUs, 8GiB Memory
storage - 30GB gp2
node 1: postgresql, redis, airflow webserver, airflow scheduler, airflow flower, airflow worker(queue1)
node 2: airflow worker(queue2)
node 3: airflow worker(queue3)
네트워크
SG group
inboud rules
outbound rules
airflow version
v2.5.1(current version)
1. Airflow 구성 요소 확인
- flower
- 환경 모니터링을 위한 flower 애플리케이션입니다. localhost:5555에서 사용할 수 있음
- 클러스터와 그에 속한 노드의 정보를 보여줌
- MySQL
- airflow의 메타DB 역할
- 설치 시 기본값으로 설정되어 있는 SQLite는 Sequential Executor만 사용 가능하므로 다른 RDB를 필수적으로 메타DB로 설정해야 함(실제 프로덕션에서 사용하는 DB 중 하나)
- RabbitMQ
- 스케줄러에서 작업자로 메시지를 전달하는 브로커
- RabbitMQ말고 redis 를 사용하기도 함
- celery
- Python으로 작성된 비동기 태스크 큐/작업 큐, worker의 한 종류
- 병렬처리를 가능하게 해줌
- Worker Process가 Job을 Broker를 통해서 전달하면 하나 이상의 Worker에서 처리하는 구조
- 클러스터여야 사용이 의미가 있음
airflow cluster 구성요소
- webserver
- scheduler
- executor
- sequential executor
- celery executor
- kubernetes executor
- DB
- message queue(message broker)
- redis
- RabbitMQ
RabbitMQ vs Redis
RabbitMQ는 DB보다는 메세징 브로커로 잘 알려져 있습니다. 메시지의 우선순위를 지원하며 크고 복잡한 메시지를 다룰때 적합합니다.
Redis는 NoSQL DB로 잘 알려져 있습니다. In Memeory 방식이며 key-value데이터 구조 스토어 이기 때문에 빠른 Read, Write 성능을 보장합니다.
Airflow의 브로커로 어떤걸 사용할지는 현재 서비스할 비즈니스 프로세스에 따라 판단하면 됩니다.
1. RabbitMQ 설치
1) RabbitMQ 서버 설치
$ sudo amazon-linux-extras install epel
$ sudo yum install epel-release
$ sudo yum install rabbitmq-server
2) RabbitMQ management 대쉬보드 활성화
$ sudo rabbitmq-plugins enable rabbitmq_management
3) 시작 시 RabbitMQ 서버 자동으로 띄우기 설정
아래의 명령어로 시작과 동시에 데몬으로 띄워지게 설정
$ sudo systemctl enable rabbitmq-server
4) 서버 시작/중지/상태 확인
# Start: sudo systemctl start rabbitmq-server
# Stop: sudo systemctl stop rabbitmq-server
# Status: sudo systemctl status rabbitmq-server
5) 유저 생성, 관리자 부여, 권한 부여
# 유저 생성
$ sudo rabbitmqctl add_user admin admin
# 관리자 부여
$ sudo rabbitmqctl set_user_tags admin administrator
# 권한부여
$ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
6) 대쉬보드 접속
- AWS SG에서 inbound rule 15672 포트 open
- EC2 server의 publicDNS:15672로 접속
- 위에서 생성한 admin/admin으로 로그인
2. Airflow 설치(Celery + flower)
1) 라이브러리 및 기타 패키지 설치
# pip 업그레이드
$ pip3 install --upgrade pip
$ sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel \
sqlite-devel readline-devel tk-devel gcc make libffi-devel wget
$ sudo yum install python3-devel
# mysql 연결을 위한 라이브러리 패키지 설치
$ sudo yum install mysql-devel
리눅스에서 dev, devel이 붙은 패키지는 컴파일을 위한 헤더 및 라이브러리 패키지입니다.
(Ubuntu와 같은 데비안 계열 리눅스 : -dev, CentOS와 같은 레드햇 계열 리눅스 : -devel)
일반적으로 *. h, *. so, *. a 확장자를 가진 파일로 구성되어있습니다.
devel 패키지의 주요 역할은 다른 프로그램들을 위한 라이브러리 역할과 소스코드 컴파일
2) MySQL 설치
# yum repository 등록
$ sudo yum install https://dev.mysql.com/get/mysql80-community-release-el7-5.noarch.rpm
# mysql community server 구축
$ sudo yum -y install mysql-community-server
# 안될 경우
$ wget https://dev.mysql.com/get/mysql80-community-release-el7-5.noarch.rpm
$ sudo yum localinstall mysql80-community-release-el7-5.noarch.rpm
$ sudo yum -y install mysql-community-server
MySQL 서비스 실행
$ sudo systemctl enable --now mysqld
MySQL 서비스 상태 확인
$ systemctl status mysqld
임시 비밀번호 확인
$ sudo grep 'temporary password' /var/log/mysqld.log
root 접속 및 비밀번호 변경
$ mysql -u root -p
# 임시 비밀번호 입력 + enter
-- 대소문자+숫자+특수기호로 해야 비밀번호 정책에 만족한다고 나옴
ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'QWer12!!';
-- 권한 부여
FLUSH PRIVILEGES;
Airflow DB 생성
$ create database airflow_db;
3) Airflow(Celery + flower),MySQL 패키지 설치
Airflow 설치
$ pip3 install apache-airflow
celery, flower 그리고 airflow와 celery 사이에 필요한 모든 dependencies를 설치
celery 설치
# airflow와 flower과의 버전 호환성이 매우 중요함
$ pip3 install celery==5.2.3
패키지 의존성 버전 문제로 발생한 오류가 있을 땐 아래의 패키지를 설치하여 다운그레이드 시킨다.
$ pip3 install importlib-metadata==4.13.0
flower 설치
# airflow와 flower과의 버전 호환성이 매우 중요함
$ pip3 install flower==1.0.0
mysql connector, client 설치
$ pip3 install mysql-connector-python
$ pip3 install mysqlclient
airflow config 파일 수정
$ cd airlfow
$ nano airflow.cfg
executor = CeleryExecutor
sql_alchemy_conn = mysql+mysqldb://root:QWer12!!@localhost:3306/airflow_db
broker_url = amqp://admin:admin@localhost:5672/
result_backend = db+mysql+mysqlconnector://root:QWer12!!@localhost:3306/airflow_db
default_queue = airflow.queue
# airflow example dags를 제거하려면 아래의 설정 값 조정
# load_examples = True(기본값) 를 False로 바꿔준다.
3) Airflow DB 초기화 및 확인
$ airflow db init
mysql -u root -p 를 통해 mysql prompt로 접속하여 초기화된 데이터베이스, 테이블확인
4) Airflow 사용자 생성
$ airflow users create --role Admin --username admin \
--email admin@admin.com --firstname admin --lastname admin --password admin
5) Celery Worker
$ airflow celery worker -D
# 데몬(백그라운드)으로 worker process 띄우기
6) Celery Flower
$ airflow celery flower -D
# 데몬(백그라운드)으로 flower process 띄우기
7) Airflow scheduler
scheduler 확인 필요 with celery
$ airflow scheduler -D
# 데몬(백그라운드)으로 scheduler process 띄우기
8) Airflow webserver
$ airflow webserver -D
# 데몬(백그라운드)으로 flower process 띄우기
3. multi node(Cluster) 설정
worker node에는 아래의 순서대로 필요한 것만 설치 합니다.
(MySQL 설치만 진행, master node에 airflow 설치한것과 동일한 환경으로 설치)
worker node
1) 라이브러리 및 기타 패키지 설치
# pip 업그레이드
$ pip3 install --upgrade pip
$ sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel \
sqlite-devel readline-devel tk-devel gcc make libffi-devel wget
$ sudo yum install python3-devel
# mysql 연결을 위한 라이브러리 패키지 설치
$ sudo yum install mysql-devel
2) MySQL 설치
# yum repository 등록
$ sudo yum install https://dev.mysql.com/get/mysql80-community-release-el7-5.noarch.rpm
# mysql community server 구축
$ sudo yum -y install mysql-community-server
MySQL 서비스 실행
$ sudo systemctl enable --now mysqld
MySQL 서비스 상태 확인
$ systemctl status mysqld
mysql connector, client 설치
* 꼭 실행해줘야 오류안남
$ pip install mysql-connector-python
$ pip install mysqlclient
3) Airflow 설치 및 설정
$ pip install apache-airflow
celery 설치
# airflow와 flower과의 버전 호환성이 매우 중요함
$ pip install celery==5.2.3
패키지 의존성 버전 문제로 발생한 오류가 있을 땐 아래의 패키지를 설치하여 다운그레이드 시킨다.
$ pip install importlib-metadata==4.13.0
airflow config 파일 수정
$ cd airflow
$ nano airflow.cfg
executor = CeleryExecutor
sql_alchemy_conn = mysql+mysqldb://root:QWer12!!@<master_host_ip>:3306/airflow_db
# master_host_ip는 private IP DNS로 넣는다.
broker_url = amqp://admin:admin@<master_host_ip>:5672/
result_backend = db+mysql+mysqlconnector://root:QWer12!!@<master_host_ip>:3306/airflow_db
default_queue = airflow.queue
master node
my sql config file에 bind-address 추가
$ sudo nano /etc/my.cnf
# bind-address = 0.0.0.0 추가
worker node mysql user 생성 및 권한 부여
CREATE USER 'airflow_user_slave'@'remote_server_ip' IDENTIFIED BY 'airflow_pass';
-- CREATE USER 'root'@'private_ip' IDENTIFIED BY 'password';
GRANT CREATE, ALTER, DROP, INSERT, UPDATE, DELETE, SELECT, REFERENCES, RELOAD on *.* TO 'root'@'private_ip' WITH GRANT OPTION;
FLUSH PRIVILEGES;
방화벽 다운로드
$ sudo yum install --enablerepo="epel" ufw
방화벽 설정
$ sudo ufw allow from 'private_ip' to any port 3306
# mysql port open
$ sudo ufw allow from 'private_ip' to any port 5672
# RabiitMQ port open
# worker 노드 전부 설정해준다.
방화벽 가동(옵션)
$ sudo ufw enable
# 방화벽을 가동하면 local(내 pc)에서 UI를 못보게 된다. 내 PC가 해당 서버에 대한 포트를 접근할 권한이 없기 때문
# 즉 네이버에서 내 ip주소를 쳐서 나오는 public ip주소를 허용해줘야 한다.
# sudo ufw allow from 'public_ip' to any port 8080
# sudo ufw allow from 'public_ip' to any port 5555
# sudo ufw allow from 'public_ip' to any port 15672
다시 worker node
master node mysql 로 접속해보기
$ mysql -u root -h [master_node_public_ip or master_node_public_dns] -p
# 같은 VPC일 경우 private dns, private ip로도 접근 가능
airflow worker 띄우기
$ airflow celery worker -D
# 데몬(백그라운드)으로 worker process 띄우기
UI 확인
airflow webserver
실행해보면 일도 분산 처리해서 하는 것을 확인 할 수 있음.
RabbitMQ도 3개의 노드 확인 가능
RabbitMQ queue
참조:
https://tkdguq05.github.io/2020/12/13/airflow-on-ec2/
https://mightytedkim.tistory.com/32
https://www.accionlabs.com/how-to-setup-airflow-multinode-cluster-with-celery-rabbitmq
https://dev.to/metaverse/install-rabbitmq-on-amamzon-ec2-amazon-linux-2-3dpd
https://docs.celeryq.dev/en/stable/
https://airflow.apache.org/docs/apache-airflow-providers-celery/stable/index.html#pip-requirements