본문 바로가기

Tech/AWS

[Athena] AWS SDK boto3 활용 쿼리

엑세스키, 시크릿 엑세스키 발급

boto3를 활용해서 AWS client에 접근하기 위해서는 access key, secret access key가 필요하다.
어떻게 엏는지는 아래 링크를 참고하자

https://assaeunji.github.io/aws/2020-04-02-boto3/



boto3를 활용한 Athena Query

import boto3
import time

#
# https://gist.github.com/chrisdpa-tvx/96ad6099da868bf83579fcb0d8caa00c
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html
#

athena_handler = boto3.client(
    'athena',
    aws_access_key_id = '',
    aws_secret_access_key='',
    region_name='ap-northeast-2'
)


def create_athena_table():
    #
    # 아테나 테이블 생성
    # 

    with open('table/billing_data.ddl') as ddl:
        athena_handler.start_query_execution(
            QueryString=ddl.read(),
            ResultConfiguration={'OutputLocation': 's3://test-athena-result/'}
        )
    print("create table")


def partitioning_athena_table():
    with open('partition/test1.ddl') as ddl:
        athena_handler.start_query_execution(
            QueryString=ddl.read(),
            ResultConfiguration={'OutputLocation': 's3://test-athena-result/'}
        )
    print("partitioning success")


def query_athena(query_file):
    #
    # AWS SDK를 사용해서 아테나에 쿼리하면, 
    # 쿼리 결과를 가져오기(출력)하기 위해서는 쿼리가 완료될때까지 상태를 계속해서 확인해야한다
    #
    with open(query_file) as query:
        query_start = athena_handler.start_query_execution(
            QueryString=query.read(),
            ResultConfiguration={'OutputLocation': 's3://test-athena-result/'}
        )

        while True:
            query_status = athena_handler.get_query_execution(
                QueryExecutionId=query_start['QueryExecutionId'])
            query_execution_status = query_status['QueryExecution']['Status'][
                'State']

            if query_execution_status == 'SUCCEEDED':
                result = athena_handler.get_query_results(
                    QueryExecutionId=query_start['QueryExecutionId'])
                break

            print("sleep")
            time.sleep(1)

        print(result)



if __name__ == "__main__":
    #
    # create Table은 계속해서 덮어씌우기 형태로 생성된다.
    # partitioning은 이미 파티셔닝이 되어있으면 수행하지않는다.
    #

    create_athena_table()
    partitioning_athena_table()
    query_athena('query/select.sql')


반응형