본문 바로가기
728x90

airflow22

[Airflow] Metastore version, RDBMS 종류에 따른 차이 정리 최근에 Airflow Metastore에 있는 데이터 이관을 하면서 Metastore 이관 관점에서 Metastore에 관련된 테이블, 컬럼 정보들을 정리해봤습니다.1. Airflow version별 metastore의 구성 차이2.1.3(PostgreSQL)1) 테이블 수28개 테이블ab_permissionab_permission_viewab_permission_view_roleab_register_userab_userab_user_roleab_view_menualemberic_versionconnectiondagdag_codedag_pickledag_rundag_tagimport_errorjoblogrendered_task_instance_fieldssens.. 2024. 4. 27.
[Airflow] Trigger Rule을 통해 반드시 실행 시켜야 할 Task 다루기 이번 포스트에는 Daily Batch 중에 비용절감을 위해 수정한 사항에 대해 정리해보겠습니다. 1. 수정이 필요한 이유 Daily Batch 실패 난 건 중에, ML server(GPU 장비 탑재된 EC2)에서 script가 돌아가는 job이 있었습니다. EC2가 켜지는 Task는 성공했지만, script task 단계에서 에러가 났었고, EC2는 stop되지 않았습니다. 물론 후속작업들도 전부 upstream_failed에 의해 failed처리 되었습니다. GPU 장비가 붙은 EC2는 비용이 어마어마하게 많이 나오기 때문에, 몇 시간만 켜져 있더라도 무시 하지 못할 비용이 나오게 됩니다. 따라서 script가 실패하더라도, 반드시 stop되게 DAG를 구성하는 것이 필요했습니다. 2. 기존 코드 기존.. 2024. 2. 26.
[Airflow] Airflow로 ETL 파이프라인 V2 만들기 (Variables + TaskGroup을 활용한 Task Factory 구현) 이번 포스트에는 저번에 작성했었던 각 작업들을 통합하여 만든 DAG인 ETL 파이프라인 구축하기 V1에서 중복된 태스크는 줄이고, 병렬실행을 도입시켰으며, task를 정의한 Variables 변수에 따라 Dynamic하게 갯수와 이름이 변하게 되게 끔 구성하였습니다. 저번에 작성한 포스트를 보고 싶으시다면 아래의 링크를 확인해주세요. https://spidyweb.tistory.com/506 [Airflow] Airflow로 ETL 파이프라인 만들기(python, EMR, glue crawler, Email, Slack, DB반영) 이번 포스트에는 회사에서 수동으로 처리하고 있던 일회성 ETL작업을 Airflow DAG으로 묶어서 파이프라인을 만든 경험에대해서 소개해드리겠습니다. 기존 업무 처리방식과 A.. 2024. 2. 9.
[회고] 2023년을 마무리하며 회고 엄청 오랜만에 글을 포스팅하는 느낌인데.. 거의 1달만인 것 같습니다. 최근까지도 회사에서 하는 프로젝트며, 대학원 준비 같은 업무들로 바쁜 시간을 보내느라 글 포스팅하는 것이 후순위가 되어버렸습니다. 이번에도 회고록이기 때문에, 생각나는대로 한 해를 정리하면서 생각한 것들에 대해서 포스팅하겠습니다. 1. 생각나는 것들 1) 경조사 이번년도는 유독 경조사가 많았던 한 해였던 것 같습니다. 올해로 3년차, 이제 내년에 4년차가 되는데 이 시기쯤 되니 친구들 무리에서도 처음 결혼하는 친구가 나오고, 여기저기서 소식들이 많이 들려왔던 것 같습니다. 2024년도 예정된 결혼소식들이 많이 들려옵니다. 제 주변에 행복한 일들이 더 많이 생겼으면 좋겠습니다. 그러면서 생각되는게 역시 건강을 최우선 목표로 두어야 행복.. 2023. 12. 29.
[Python] Boto3 + Airflow로 특정 기간 지난 S3 데이터 삭제하기 업무를 하면서 개인정보 데이터에 대해서 6개월이 지나면 자동으로 파기가 되는 로직을 구현해야되는 일이 있었습니다. 따라서 하루단위 배치로 6개월이 지났는지 검사하고, 지났으면 데이터를 삭제하는 로직을 구현한 것에 대해 정리해보겠습니다. 요구사항, 상세정보 및 코드 사용 라이브러리 : boto3 배치 스케줄링 주기: 매일 00:05 요구사항: 6개월이 지난 개인정보 포함된 데이터는 파기가 되어야 함 해결 방법: s3 uri경로가 s3:bucket/~~/history 혹은 latest로 되어 있고 이후에 stnd_ymd=yyyy-mm-dd 파티션으로 구별 됨 → stnd_ymd 기준 6개월이 지나면 매일매일 검사하여 삭제하는 로직 구현 코드 from airflow import DAG import boto3 .. 2023. 11. 20.
[Python] Logging 라이브러리 정리, Airflow에서 Logging사용하기 Airflow에서 PythonOperator를 실행할 때 로그가 나오지 않아서 테스크가 잘 진행되고 있는지 확인하기가 어려웠던 상황이 있었습니다. logging 라이브러리를 사용해도 airflow UI에서 확인 할 수가 없었는데 관련해서 정리를 해보고자 합니다. 1. Logging The only time that print is a better option than logging is when the goal is to display a help statement for a command line application. Shell과 같은 커맨드 인터페이스에서 --help 옵션을 받아 사용법 도움말을 출력할 때 이외에는 항상 logging이 print보다 낫다는 뜻으로, 사실상 웬만한 상황에서는 prin.. 2023. 11. 20.
[Database] Redshift Procedure를 airflow로 제출하기 (error 정리) 에러 airflow에서 redshift를 pyscogp2를 이용하여 연결하고, procedure를 제출했는데, 다음과 같은 에러가 있었다. psycopg2.errors.InvalidTransactionTermination: ROLLBACK cannot be invoked from a procedure that is executing in an atomic context. HINT: Try calling the procedure as a top-level call i.e. not from within an explicit transaction block. Or, if this procedure (or one of its ancestors in the call chain) was created with SE.. 2023. 10. 20.
[Airflow] Airflow DAGs 이상감지, 알림받기, 결과전송 (EmailOperator, Slack) 이번 포스트에는 Airflow DAGs이 success 및 failed 또는 Task중에 보내고 싶은 결과가 있는 경우 전송하는 방법에 대해 정리해보겠습니다. 방법으로는 EmailOperator와 Slack을 사용하는 방법으로 크게 2가지가 있습니다. EmailOperator 1. 준비 사항 1) GMAIL 계정 생성 EmailOperator를 사용하기 위해서는 stmp로 설정할 host가 필요합니다. 저는 gmail을 사용하기로 했고 새로운 계정을 하나 만들었습니다. 2) IMAP 켜기 GMAIL → 설정 → 모든 설정 보기 전달 및 POP/IMAP → IMAP 사용 3) 보안 설정 구글 계정 관리 → 보안 → 2단계 인증 앱 비밀번호 클릭 메일, 기기 선택 16자리 비밀번호가 생성 되는데, 보관했다가.. 2023. 8. 30.
[Spark] EMR Serverless + Airflow로 spark job 제출해보기 (EmrServerlessStartJobOperator, boto3 + PythonOperator) 이번 포스트는 EMR Serverless로 전환하면서 생긴 꿀팁들과 Airflow로 EMR Serverless에 Spark job을 제출하는 것을 포스팅하려고 합니다. 이번 포스트의 목차 EMR Serverless란? EMR Serverless로 전환 이유 EMR Serverless로 전환 대상 EMR Serverless 생성 방법 EMR에 Airflow로 Spark job 제출하는 방법(EmrServerlessStartJobOperator) EMR에 Airflow로 Spark job 제출하는 방법(boto3 + PythonOperator) 전환 시 얻은 효과(성능, 비용) EMR Serverless란? EMR(Elastic Map Reduce) 서비스를 인프라 관리할 필요 없이 Serverless로서 .. 2023. 8. 27.
728x90