본문 바로가기

Tech/AWS

[S3] boto3 SDK 활용 버킷 확인, 생성, 업로드

s3 클라이언트 세션 생성

def get_s3_session(access_key, secret_access_key):
    s3 = boto3.client(
        's3',
        aws_access_key_id = access_key,
        aws_secret_access_key= secret_access_key
    )
    return s3





버킷 확인

def check_bucket(s3, bucket_name):
    #
    # bucket 명 확인
    #
    response = s3.list_buckets()
    buckets = [bucket['Name'] for bucket in response['Buckets']]

    print(f"[INFO] bucket name list: {buckets}")
    if bucket_name not in buckets:
        print(f"[ERROR] bucket not exist. check bucket: {bucket_name}")
        return False
    return True





버킷 생성

def create_bucket(s3, bucket_name):
    #
    # bucket 생성
    # region을 명시해주지 않을 경우, default는 us-west-2이다.
    #
    print("[INFO] bucket create start")
    try:
        s3.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={'LocationConstraint': 'ap-northeast-2'}
        )
    except Exception as e:
        print(f"[ERROR] duplicate bucket name. check bucket name: {bucket_name}")
        return False

    print("[INFO] bucket create done")
    return True





오브젝트 업로드

def upload_bucket(s3, bucket_name, file_path, file_names: list):
    #
    # bucket에 file upload
    # [참고]: 만약 같은 파일명을 업로드할 경우, 파일이 덮어씌우기 형태로 업로드 된다.
    #
    # 첫번째 인자: src_file_path,
    # 두번째 인자: bucket name
    # 세번째 인자: s3에 저장될 file path 및 name
    #
    print("[INFO] file upload start")

    # 
    # 아래부분은 사용자에 맞게 코드를 구성하면된다.
    # ===============================================================================
    for file_name in file_names:
        src_file = f"{file_path}/{file_name}"

        split_file_name = file_name.split("_")

        backdup_file_date = split_file_name[0].split('.')

        backup_file_year = backdup_file_date[0]
        backup_file_month = backdup_file_date[1]
        backup_file_day = backdup_file_date[2]

        group_guid = split_file_name[3].split('.')[0]
        event_id = split_file_name[1]

        key = f"{group_guid}/" \
              f"{backup_file_year}/" \
              f"{backup_file_month}/" \
              f"{backup_file_day}/" \
              f"{event_id}"

        try:
            s3.upload_file(
                src_file,
                bucket_name,
                f"{key}/{event_id}.{PARQUET_EXTENSION}",
            )
    # ===============================================================================

            print(f"[INFO] upload file: {key}/{event_id}.{PARQUET_EXTENSION}")

        except ParamValidationError:
            print(f"[ERROR] bucket or key is not exist.\n"
                  f"check bucket: {bucket_name} \n,"
                  f"check key: {key}")
            return False

    print("[INFO] file upload done")
    return True





전체 코드


def get_s3_session(access_key, secret_access_key):
    s3 = boto3.client(
        's3',
        aws_access_key_id = access_key,
        aws_secret_access_key= secret_access_key
    )
    return s3


def check_bucket(s3, bucket_name):
    #
    # bucket 명 확인
    #
    response = s3.list_buckets()
    buckets = [bucket['Name'] for bucket in response['Buckets']]

    print(f"[INFO] bucket name list: {buckets}")
    if bucket_name not in buckets:
        print(f"[ERROR] bucket not exist. check bucket: {bucket_name}")
        return False
    return True


def create_bucket(s3, bucket_name):
    #
    # bucket 생성
    # region을 명시해주지 않을 경우, default는 us-west-2이다.
    #
    print("[INFO] bucket create start")
    try:
        s3.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={'LocationConstraint': 'ap-northeast-2'}
        )
    except Exception as e:
        print(f"[ERROR] duplicate bucket name. check bucket name: {bucket_name}")
        return False

    print("[INFO] bucket create done")
    return True


def upload_bucket(s3, bucket_name, file_path, file_names: list):
    #
    # bucket에 file upload
    # [참고]: 만약 같은 파일명을 업로드할 경우, 파일이 덮어씌우기 형태로 업로드 된다.
    #
    # 첫번째 인자: src_file_path,
    # 두번째 인자: bucket name
    # 세번째 인자: s3에 저장될 file path 및 name
    #
    print("[INFO] file upload start")

    # 
    # 아래부분은 사용자에 맞게 코드를 구성하면된다.
    # ===============================================================================
    for file_name in file_names:
        src_file = f"{file_path}/{file_name}"

        split_file_name = file_name.split("_")

        backdup_file_date = split_file_name[0].split('.')

        backup_file_year = backdup_file_date[0]
        backup_file_month = backdup_file_date[1]
        backup_file_day = backdup_file_date[2]

        group_guid = split_file_name[3].split('.')[0]
        event_id = split_file_name[1]

        key = f"{group_guid}/" \
              f"{backup_file_year}/" \
              f"{backup_file_month}/" \
              f"{backup_file_day}/" \
              f"{event_id}"

        try:
            s3.upload_file(
                src_file,
                bucket_name,
                f"{key}/{event_id}.{PARQUET_EXTENSION}",
            )
    # ===============================================================================

            print(f"[INFO] upload file: {key}/{event_id}.{PARQUET_EXTENSION}")

        except ParamValidationError:
            print(f"[ERROR] bucket or key is not exist.\n"
                  f"check bucket: {bucket_name} \n,"
                  f"check key: {key}")
            return False

    print("[INFO] file upload done")
    return True
반응형