본문 바로가기
728x90

분류 전체보기313

[Airflow] Airflow로 ETL 파이프라인 만들기(python, EMR, glue crawler, Email, Slack, DB반영) 이번 포스트에는 회사에서 수동으로 처리하고 있던 일회성 ETL작업을 Airflow DAG으로 묶어서 파이프라인을 만든 경험에대해서 소개해드리겠습니다. 기존 업무 처리방식과 Airflow DAG의 필요성 ETL 방식 우리가 처리하는 ETL방식은 ETL요청이 들어왔을 때, 최초 적재 → 증분값 daily하게 배치로 적재 하는 개념이였습니다. ETL 최초 적재 세부 과정 위의 과정을 거쳐 1개의 테이블에 대한 ETL과정이 끝납니다. (정확히는 우리는 data lake형태로 사용하고 있어, ELT입니다.) 언급 된대로 위의 과정을 일일히 서버를 키고, 서버에 명령어 날리고 zeppelin에 들어가서 코드 날리고 그러한 과정들이 모두 수동으로 이뤄지다보니, 되게 간단한 작업임에도 불구하고, 일처리하는데 있어서는 .. 2023. 8. 7.
[Airflow] EMR create + Step 제출(Spark job) + StepSensor Dag 구성하기(feat. ETL) 아직까지도 많은 기업에서는 EMR을 원하는 시간대에 띄워서 batch job을 airflow schedule에 맞게 실행시키고 종료시키는 ETL 형태를 많이 사용하고 있습니다. 그래서 이번 포스트에는 Airflow로 EMR을 띄우고, spark job을 제출하고 job이 끝나는 대로 EMR을 종료시키는 DAG를 구성해보겠습니다. Airflow는 미리 구성되어 있다고 가정하고 시작하겠습니다. Airflow 구성부터 해보고 싶으시면 아래의 링크를 참조해주세요. https://spidyweb.tistory.com/449 [Airflow] Airflow cluster, celery executor + flower + RabbitMQ 환경 구성하기 이번 포스트에는 AWS EC2 3대로 구성된 airflow clu.. 2023. 7. 30.
[Trouble shooting] CSV파일 MariaDB에 적재시키기 업무를 하다보니 HTML태그가 포함된 엄청난 길이의 HTML태그가 포함된 값과 여러 텍스트값들을 RDS Mariadb로 적재해달라는 요청이 있었습니다. 사실 MongoDB같은 NoSQL로 적재시키면 간단할거라 생각은 되었지만, 쿼리적으로 편하게 쓰고 싶다하셔서 MariaDB로의 이전을 부탁을 하셨습니다. 여기서 여러 에러를 만났는데, 생긴 이슈와 해결방법에 대해 정리해보겠습니다. [데이터 길이 이슈] 이정도면 되겠지? 문제 해결 테이블의 각 컬럼 크기를 너무 크게 잡지 않기위해 최적의 길이로 varchar를 설정하려다보니, import과정에서 데이터 길이가 크다는 에러를 많이 만남 대부분은 varchar를 조금씩 늘려가며 테스트했지만, HTML태그가 들어간 정말 텍스트성으로 보이는 컬럼은 데이터타입을 t.. 2023. 7. 15.
[Spark] spark dataframe -> RDB로 적재하기 오랜만에 글을 씁니다. 이직하고 나서 적응하느라 글쓸 기회가 없었는데, 그동안 어떤 공부를 해야될지 그리고 어떤 글부터 쓸지 순서를 정하느라 뜸했었습니다. 우선은 일을하면서 생긴 이슈와 겪은 업무들 위주로 정리하려고 합니다. 1. 배경 API 서빙을 위해 parquet파일을 RDB로 적재시켜야 했고, 실시간 API서빙이 가능했어야 하는 상황이였습니다. 따라서 spark을 이용하여 truncate없이 RDB로 적재시켜야 했습니다. 1) test db, table 생성 create table if not exists sparktordb.rdbtable( id int(10),rdbtable name varchar(30), create_dt timestamp, update_dt timestamp, PRIMARY.. 2023. 7. 10.
[Spark] Scala Spark 앱 만들기 (feat. Intellij) 이직을 하게 되면서 python이 아닌, scala 언어로 Spark을 돌리고 있었습니다. 언젠가는 scala를 배워봐야지 생각만 했었는데, 생각보다 기회가 앞당겨진 것 같아, 요즘 열심히 scala 언어를 공부하고 있는 중입니다. 그래서 이번 포스트에는 scala언어로 spark앱을 만들고, jar파일로 만들어 보겠습니다. 1. 프로젝트 생성 스칼라 새 프로젝트 생성 build 시스템은 sbt로 지정 settings에서 scala plugins생성 . ├── build.sbt # MergeStrategy.discard case x if x.endsWith("modlue-info.class") => MergeStrategy.discard case x => val oldStrategy = (assembly.. 2023. 6. 17.
[Scala] 스칼라 배우기 7. 스칼라 기본 문법6(반복문, 정렬, 그룹핑, 필터링) 반복문 for to는 이하의 리스트를 생성하고, until은 미만의 시퀀스를 생성 // 0에서 3이하의 시퀀스 for (num "v5") for ((k, v) List(1, 2, 3), "B" -> List(4, 5, 6), "C" -> List(7, 8, 9)) maps.mapValues(_.sum).foreach({ case (k, v) => printf("key: %s, value: %s\n", k, v) }) // 결과 key: A, value: 6 key: B, value: 15 key: C, value: 24 정렬 sort 정렬은 sorted, sortWith, sortBy 세가지 메소드를 이용 // sorted 사용방법 val list = List( 4, 6, 1, 6, 0) val l_so.. 2023. 6. 11.
[Scala] 스칼라 배우기 6. 스칼라 기본 문법5(collection,배열,리스트,튜플,맵) Collection 배열(array) 길이가 고정된 고정된 자료구조(값을 변경 할 수 있는 순서가 정해진 시퀀스) val array1 = Array(1, 2, 3) // 배열의 데이터 접근 scala> array1(0) res0: Int = 1 // 배열의 데이터 변경 scala> array1(1) = 10 scala> array1(1) res5: Int = 10 val array2 = Array(3, 4, 5) // 배열 연결하기 ++ val array3 = array1 ++ array2 // 배열의 앞에 데이터 추가 val array4 = 0 +: array3 // 배열의 뒤에 데이터 추가 val array5 = array3 :+ 100 리스트(list) 가변적인 길이의 데이터를 저장하기 위한 자료구.. 2023. 6. 11.
[Scala] 스칼라 배우기 5. 스칼라 기본 문법4(trait, singleton object) Trait(트레잇)트레잇(trait)은 자바의 인터페이스와 유사메소드를 정의만 해놓을 수도 있고, 기본 구현을 할 수도 있음추상 클래스와 달리 생성자 파라미터는 가질 수 없음트레잇에서는 가변 변수, 불변 변수 모두 선언 가능트레잇을 구현하는 클래스에서 가변 변수는 수정이 가능하지만, 불변 변수는 수정할 수 없음트레잇의 기본 메소드는 상속되고, override 키워드를 이용하여 메소드를 재정의 할 수 있음트레잇은 extends를 사용하여 상속가능하고, 여러개의 트레잇을 with 키워드로 동시에 구현 가능멤버변수를 가질 수는 없음추상클래스는 하나만 상속할 수 있지만, 트레잇은 여러개를 상속 할 수 있음생성자 멤버변수가 필요하면 추상클래스를 이용하는 것이 좋고, 멤버 변수가 필요 없다면 트레잇을 이용하는 것이.. 2023. 6. 11.
[Scala] 스칼라 배우기 4. 스칼라 기본 문법3(클래스) 클래스(class) 클래스는 class를 이용하여 생성 // 클래스 선언 class Person(name:String, age:Int) // 클래스 생성 val p = new Person("David", 30) // 멤버 변수 생략 가능 class A 클래스 멤버 변수 가변 변수는 컴파일러가 클래스 내부에 자동으로 getter, setter 메소드를 생성 가변 변수로 선언된 값은 읽고, 쓰는 것이 가능 불변 변수는 컴파일러가 getter만 생성,. 불변 변수로 선언된 값은 읽는 것만 가능 가변 변수, 불변 변수로 선언되지 않은 변수는 getter, setter 가 생성되지 않기 때문에 클래스 내부에서만 사용할 수 있음 // 기본형 class Animal(name: String) { println(s"${.. 2023. 5. 23.
728x90