본문 바로가기
728x90

BigData84

[Airflow] DAG Parsing, DAG Processor 정리 + import와 parsing error 이슈 정리 Airflow DAG Parsingscheduler에 dag_processor라는것이 포함되어 있고, 이것이 dag를 parsing하는 역할을 해줌 1. 이슈 정리Airflow on k8s 이전을 위해 DAG를 개발계의 k8s에 gitsync를 시켜둔 상황import error가 나와 확인 해보니 version up에 따른 method 명칭 변경따라서 명칭을 변경하거나 사용하지 않은 library는 import문에서 제거아무리 기다려도 변경사항 반영이 안된 채로 계속 import에러가 잔류실제 코드는 gitsync를 통해 동기화 됐고, 컨테이너에 들어가서 확인해봐도 동기화 된 상태2. 원인airflow dag 내부에 운영계에 있는 RDB, Redshift와 connect.. 2024. 4. 27.
[Airflow] Airflow 암호화 fernet key 정리 Airflow Metastore를 이관하면서 암호화된 정보를 이관될 metastore에 어떻게 적용하는지, 암호화된 값은 어떻게 확인하는지 정리해봤습니다.1. 정의대칭형 암호화에서 사용되는 키Python 암호화 라이브러리인 cryptography 패키지에서 제공하는 대칭키 암호화 방식 중 하나128비트(16바이트) 또는 256비트(32바이트)의 임의의 바이트 시퀀스로 구성됩니다. 이 키를 사용하여 데이터를 암호화하고 복호화할 수 있습니다. Fernet은 블록 암호 기술 중 하나인 AES를 기반으로 합니다.Fernet은 base64 인코딩을 사용하여 키를 인코딩하고 디코딩Fernet은 키를 생성할 때 32바이트(256비트)의 임의의 바이트 시퀀스를 사용합니다. 이 바이트 시퀀스는 base6.. 2024. 4. 27.
[Airflow] Metastore version, RDBMS 종류에 따른 차이 정리 최근에 Airflow Metastore에 있는 데이터 이관을 하면서 Metastore 이관 관점에서 Metastore에 관련된 테이블, 컬럼 정보들을 정리해봤습니다.1. Airflow version별 metastore의 구성 차이2.1.3(PostgreSQL)1) 테이블 수28개 테이블ab_permissionab_permission_viewab_permission_view_roleab_register_userab_userab_user_roleab_view_menualemberic_versionconnectiondagdag_codedag_pickledag_rundag_tagimport_errorjoblogrendered_task_instance_fieldssens.. 2024. 4. 27.
[Kafka] ubuntu 20.04 lts 위의 docker-compose로 kafka + zookeeper + CMAK 구성하기 이번 포스트에는 개발할 때 사용하기 위해 도커 컴포즈로 빠르고 간단하게 kafka broker와 CMAK() zookeeper를 컨테이너로 띄워보도록 하겠습니다. 1. EC2 생성 t2.medium 스펙(2vCore, 4GB mem)으로 ubuntu 20.04 lts 를 생성합니다. 편하게 접근하기위해 public ip도 부여받습니다.(테스트용) 2. docker 설치 + docker-compose 설치 1) docker 설치 // apt 인덱스 업데이트 $ sudo apt-get update // repository over HTTPS를 사용하기 위한 패키지 설치 $ sudo apt-get install -y apt-transport-https ca-certificates curl gnupg-agent.. 2024. 3. 5.
[Airflow] Trigger Rule을 통해 반드시 실행 시켜야 할 Task 다루기 이번 포스트에는 Daily Batch 중에 비용절감을 위해 수정한 사항에 대해 정리해보겠습니다. 1. 수정이 필요한 이유 Daily Batch 실패 난 건 중에, ML server(GPU 장비 탑재된 EC2)에서 script가 돌아가는 job이 있었습니다. EC2가 켜지는 Task는 성공했지만, script task 단계에서 에러가 났었고, EC2는 stop되지 않았습니다. 물론 후속작업들도 전부 upstream_failed에 의해 failed처리 되었습니다. GPU 장비가 붙은 EC2는 비용이 어마어마하게 많이 나오기 때문에, 몇 시간만 켜져 있더라도 무시 하지 못할 비용이 나오게 됩니다. 따라서 script가 실패하더라도, 반드시 stop되게 DAG를 구성하는 것이 필요했습니다. 2. 기존 코드 기존.. 2024. 2. 26.
[Airflow] Airflow로 ETL 파이프라인 V2 만들기 (Variables + TaskGroup을 활용한 Task Factory 구현) 이번 포스트에는 저번에 작성했었던 각 작업들을 통합하여 만든 DAG인 ETL 파이프라인 구축하기 V1에서 중복된 태스크는 줄이고, 병렬실행을 도입시켰으며, task를 정의한 Variables 변수에 따라 Dynamic하게 갯수와 이름이 변하게 되게 끔 구성하였습니다. 저번에 작성한 포스트를 보고 싶으시다면 아래의 링크를 확인해주세요. https://spidyweb.tistory.com/506 [Airflow] Airflow로 ETL 파이프라인 만들기(python, EMR, glue crawler, Email, Slack, DB반영) 이번 포스트에는 회사에서 수동으로 처리하고 있던 일회성 ETL작업을 Airflow DAG으로 묶어서 파이프라인을 만든 경험에대해서 소개해드리겠습니다. 기존 업무 처리방식과 A.. 2024. 2. 9.
[Spark Tuning] Spark Memory 와 JVM 정리 spark memory를 정리하기 전에 JVM을 알아야 합니다. 왜냐하면 spark는 Java 가상머신(JVM) 기반으로 동작하기 때문이고, 다른 언어(Python,R)로 작성한 spark code도 결국에는 Executor의 JVM에서 실행할 수 있는 Code로 알아서 변환하여 실행되기 때문입니다. JVM(Java Virtual Machine) 자바를 실행하기 위한 가상 기계(컴퓨터) * Java compiler는 JDK를 설치하면 bin 에 존재하는 javac.exe 1. runtime Data Area Runtime Data Area는 JVM이 프로그램을 수행하기 위해 OS로부터 별도로 할당받은 메모리 공간을 말한다. Runtime Data Area는 크게 5가지 영역으로 나눌 수 있습니다. 1) .. 2023. 10. 17.
[Spark] cache() vs persist() 차이점 정리 (feat. storage level) cache() test_df.cache() # test_df를 캐싱 test_df.storageLevel # org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas) persist() import org.apache.spark.storage.StorageLevel # storage 명시를 위한 library import test2_df.persist(StorageLevel.MEMORY_AND_DISK) # test2_df에 persist를 하면서 storage lvl을 명시 test2_df.storageLevel # org.apache.spark.storage.StorageLevel = Stora.. 2023. 9. 19.
[Spark] Spark Cluster mode vs Client mode (feat. 왜 EMR Serverless는 Client mode일까?) EMR Serverless가 client mode로 설정되어 있는 것을 보면서 왜 client 모드 일까? cluster모드는 안되는 걸까? 라는 의문점을 가졌었고, 막상 두 개의 차이점을 설명하려 해봐도 명확히는 설명을 할 수 없어서 이번 기회에 비교하여 정해보겠습니다. Spark Driver 비교해보기에 앞서 driver 개념이 두 개를 비교할 때 필요 하므로, Spark Driver부터 간단하게 짚고 넘어가겠습니다. 프로그램의 main()메소드가 실행되는 프로세스 Spark Context, Spark Session을 생성하고 RDD를 만들고 Transformation, action 등을 실행하는 사용자 코드를 실행 DF, DS, UDF를 생성하고 애플리케이션 정보 유지 관리를 담당 Cluster M.. 2023. 8. 30.
728x90