我正在使用 Pyathena 运行基本查询:
from pyathena import connect as pyathena_connect #to distinguish from other connect methods
import pandas as pd
class AthenaDataConnection():
def __init__(self, S3_STAGING_DIR, SEP=';', REGION='us-east-1', ACCESS_KEY=None, S_KEY=None):
self.S3_STAGING_DIR = S3_STAGING_DIR
self.REGION = REGION
self.SEP = SEP
if ACCESS_KEY and S_KEY:
self.athena_conn = pyathena_connect(s3_staging_dir=self.S3_STAGING_DIR, region_name=self.REGION,
aws_access_key_id=ACCESS_KEY, aws_secret_access_key=S_KEY)
else:
self.athena_conn = pyathena_connect(s3_staging_dir=self.S3_STAGING_DIR, region_name=self.REGION)
def get_athena_data(self, sql_dict):
print(f"Athena connection established; starting to query data using pd-sql integration")
sql_results = {}
for filename, sql in sql_dict.items():
try:
load_data = pd.read_sql(sql,self.athena_conn)
print(f"{filename} data fetched from Athena but not saved (returned in dict only).")
sql_results[filename] = load_data
except:
print(f"Reading {filename} failed")
return sql_results
athena = AthenaDataConnection('s3://athena-staging/',ACCESS_KEY=ACCESS_KEY, S_KEY=S_KEY)
sql_dict = {'foobar':"select * from foo.bar where foo='bar'"}
df_dict = athena.get_athena_data(sql_dict)
df = df_dict.get('foobar')
#assume this is the end of the script; i.e., I did NOT save the query results myself
因此,当查询执行时,一个文件会出现在暂存文件夹中,例如:
s3://athena-staging/abc123_45678_91011.csv
我希望我的代码能够捕获该文件名并将其保存用于其他目的。但如何呢?我在 Pyathena 文档中找不到任何内容。更新- 我刚刚了解到文件名是查询 ID + .csv!所以我现在正在寻找一种获取 Athena 查询 ID 的方法。
千万里不及你
相关分类