language/Python

[Python] Boto3 + Airflow로 특정 기간 지난 S3 데이터 삭제하기

스파이디웹 2023. 11. 20. 21:48
728x90

업무를 하면서 개인정보 데이터에 대해서 6개월이 지나면 자동으로 파기가 되는 로직을 구현해야되는 일이 있었습니다.

따라서 하루단위 배치로 6개월이 지났는지 검사하고, 지났으면 데이터를 삭제하는 로직을 구현한 것에 대해 정리해보겠습니다.

 


요구사항, 상세정보 및 코드

  • 사용 라이브러리 : boto3
  • 배치 스케줄링 주기: 매일 00:05
  • 요구사항: 6개월이 지난 개인정보 포함된 데이터는 파기가 되어야 함
  • 해결 방법: s3 uri경로가 s3:bucket/~~/history 혹은 latest로 되어 있고 이후에 stnd_ymd=yyyy-mm-dd 파티션으로 구별 됨 → stnd_ymd 기준 6개월이 지나면 매일매일 검사하여 삭제하는 로직 구현
  • 코드
from airflow import DAG
import boto3
from datetime import datetime,date
import pendulum
from airflow.operators.python import PythonOperator
import re
import logging

aws_access_key_id = '엑세스키'
aws_secret_access_key = '시크릿키'
region_name = 'ap-northeast-2'
Bucket = '버킷 이름'


history_Prefix = ['prefix1','prefix2'] # stnd_ymd 파티션 전까지의 경로
latest_Prefix = ['prefix3','prefix4'] # stnd_ymd 파티션 전까지의 경로

s3_client = boto3.client('s3',aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key,region_name=region_name)

today = date.today()

task_logger = logging.getLogger("airflow.task")
task_logger.setLevel(logging.INFO)

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False
}

def history_deletion():
    # HIU 180일 경과 시 폴더 및 데이터 삭제
    for lists in HIU_Prefix:
        objects_to_delete = s3_client.list_objects_v2(Bucket=Bucket, Prefix=lists)

		# S3://bucket 이후의 /directory/~~ 값으로 나옴
        keys_to_delete = [{'Key': obj['Key']} for obj in objects_to_delete['Contents']]

        pattern = r'stnd_ymd=(\d{4}-\d{2}-\d{2})' # stnd_ymd= 뒤의 yyyy-MM-dd를 찾아내는 패턴
        delete_key_list = []

        for i in keys_to_delete: # re 모듈로 매칭해서 찾기
            match = re.search(pattern, list(i.values())[0])
            if match != None:
                stnd_ymd_date = match.group(1)
                if (today - datetime.strptime(stnd_ymd_date,"%Y-%m-%d").date()).days >= 180: # 180일이 지난경우 수행
                    delete_key_list.append(i) # 지워야 되는 특정 경로를 list에 담음

        if delete_key_list == []:
            pass
        else:
            s3_client.delete_objects(Bucket=Bucket, Delete={'Objects': delete_key_list}) # 지우는 객체가 list_objects_v2에서 나오는 형태와 같아야 함
            task_logger.info(f"history data: {delete_key_list} has been deleted!")

def latest_deletion():
    # HI 180일 경과 시 폴더 및 데이터 삭제
    for lists in HI_Prefix:
        objects_to_delete = s3_client.list_objects_v2(Bucket=Bucket, Prefix=lists)
		
        # S3://bucket 이후의 /directory/~~ 값으로 나옴
        keys_to_delete = [{'Key': obj['Key']} for obj in objects_to_delete['Contents']]

        pattern = r'stnd_ymd=(\d{4}-\d{2}-\d{2})' # stnd_ymd= 뒤의 yyyy-MM-dd를 찾아내는 패턴
        delete_key_list = []

        for i in keys_to_delete:  # re 모듈로 매칭해서 찾기
            match = re.search(pattern, list(i.values())[0])
            if match != None:
                stnd_ymd_date = match.group(1)
                if (today - datetime.strptime(stnd_ymd_date, "%Y-%m-%d").date()).days >= 180: # 180일이 지난경우 수행
                    delete_key_list.append(i) # 지워야 되는 특정 경로를 list에 담음

        if delete_key_list == []:
            pass
        else:
            s3_client.delete_objects(Bucket=Bucket, Delete={'Objects': delete_key_list}) # 지우는 객체가 list_objects_v2에서 나오는 형태와 같아야 함
            task_logger.info(f"history data: {delete_key_list} has been deleted!")

# 굳이 history, latest구분 안 둘거면 하나의 def로 통합 해도 됨

with DAG(
        dag_id='DAG_ID',
        default_args=DEFAULT_ARGS,
        dagrun_timeout=timedelta(hours=2),
        start_date=datetime(2023,10,31),
        schedule_interval='5 0 * * *',
        tags=['DAG_TAG'],
        catchup=False
) as dag:
    history_deletion= PythonOperator(
        task_id='history_deletion',
        python_callable=history_deletion
    )

    latest_deletion= PythonOperator(
        task_id='latest_deletion',
        python_callable=latest_deletion
    )

    history_deletion >> latest_deletion

 

728x90