s3 클라이언트 세션 생성
def get_s3_session(access_key, secret_access_key):
s3 = boto3.client(
's3',
aws_access_key_id = access_key,
aws_secret_access_key= secret_access_key
)
return s3
버킷 확인
def check_bucket(s3, bucket_name):
#
# bucket 명 확인
#
response = s3.list_buckets()
buckets = [bucket['Name'] for bucket in response['Buckets']]
print(f"[INFO] bucket name list: {buckets}")
if bucket_name not in buckets:
print(f"[ERROR] bucket not exist. check bucket: {bucket_name}")
return False
return True
버킷 생성
def create_bucket(s3, bucket_name):
#
# bucket 생성
# region을 명시해주지 않을 경우, default는 us-west-2이다.
#
print("[INFO] bucket create start")
try:
s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': 'ap-northeast-2'}
)
except Exception as e:
print(f"[ERROR] duplicate bucket name. check bucket name: {bucket_name}")
return False
print("[INFO] bucket create done")
return True
오브젝트 업로드
def upload_bucket(s3, bucket_name, file_path, file_names: list):
#
# bucket에 file upload
# [참고]: 만약 같은 파일명을 업로드할 경우, 파일이 덮어씌우기 형태로 업로드 된다.
#
# 첫번째 인자: src_file_path,
# 두번째 인자: bucket name
# 세번째 인자: s3에 저장될 file path 및 name
#
print("[INFO] file upload start")
#
# 아래부분은 사용자에 맞게 코드를 구성하면된다.
# ===============================================================================
for file_name in file_names:
src_file = f"{file_path}/{file_name}"
split_file_name = file_name.split("_")
backdup_file_date = split_file_name[0].split('.')
backup_file_year = backdup_file_date[0]
backup_file_month = backdup_file_date[1]
backup_file_day = backdup_file_date[2]
group_guid = split_file_name[3].split('.')[0]
event_id = split_file_name[1]
key = f"{group_guid}/" \
f"{backup_file_year}/" \
f"{backup_file_month}/" \
f"{backup_file_day}/" \
f"{event_id}"
try:
s3.upload_file(
src_file,
bucket_name,
f"{key}/{event_id}.{PARQUET_EXTENSION}",
)
# ===============================================================================
print(f"[INFO] upload file: {key}/{event_id}.{PARQUET_EXTENSION}")
except ParamValidationError:
print(f"[ERROR] bucket or key is not exist.\n"
f"check bucket: {bucket_name} \n,"
f"check key: {key}")
return False
print("[INFO] file upload done")
return True
전체 코드
def get_s3_session(access_key, secret_access_key):
s3 = boto3.client(
's3',
aws_access_key_id = access_key,
aws_secret_access_key= secret_access_key
)
return s3
def check_bucket(s3, bucket_name):
#
# bucket 명 확인
#
response = s3.list_buckets()
buckets = [bucket['Name'] for bucket in response['Buckets']]
print(f"[INFO] bucket name list: {buckets}")
if bucket_name not in buckets:
print(f"[ERROR] bucket not exist. check bucket: {bucket_name}")
return False
return True
def create_bucket(s3, bucket_name):
#
# bucket 생성
# region을 명시해주지 않을 경우, default는 us-west-2이다.
#
print("[INFO] bucket create start")
try:
s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': 'ap-northeast-2'}
)
except Exception as e:
print(f"[ERROR] duplicate bucket name. check bucket name: {bucket_name}")
return False
print("[INFO] bucket create done")
return True
def upload_bucket(s3, bucket_name, file_path, file_names: list):
#
# bucket에 file upload
# [참고]: 만약 같은 파일명을 업로드할 경우, 파일이 덮어씌우기 형태로 업로드 된다.
#
# 첫번째 인자: src_file_path,
# 두번째 인자: bucket name
# 세번째 인자: s3에 저장될 file path 및 name
#
print("[INFO] file upload start")
#
# 아래부분은 사용자에 맞게 코드를 구성하면된다.
# ===============================================================================
for file_name in file_names:
src_file = f"{file_path}/{file_name}"
split_file_name = file_name.split("_")
backdup_file_date = split_file_name[0].split('.')
backup_file_year = backdup_file_date[0]
backup_file_month = backdup_file_date[1]
backup_file_day = backdup_file_date[2]
group_guid = split_file_name[3].split('.')[0]
event_id = split_file_name[1]
key = f"{group_guid}/" \
f"{backup_file_year}/" \
f"{backup_file_month}/" \
f"{backup_file_day}/" \
f"{event_id}"
try:
s3.upload_file(
src_file,
bucket_name,
f"{key}/{event_id}.{PARQUET_EXTENSION}",
)
# ===============================================================================
print(f"[INFO] upload file: {key}/{event_id}.{PARQUET_EXTENSION}")
except ParamValidationError:
print(f"[ERROR] bucket or key is not exist.\n"
f"check bucket: {bucket_name} \n,"
f"check key: {key}")
return False
print("[INFO] file upload done")
return True
반응형
'Tech > AWS' 카테고리의 다른 글
AWS Window Server 인스턴스 생성 시 초기 route table 상태 (0) | 2023.03.26 |
---|---|
[S3] boto3 SDK 활용 Key 리스트 확인, 오브젝트 Copy, Move (2) | 2021.12.07 |
[Athena] AWS SDK boto3 활용 쿼리 (0) | 2021.12.03 |
[Athena] 아테나를 사용하면서 필요했던 AWS IAM 정책 (0) | 2021.12.03 |