DynamoDBをダウンロードしてローカルにCSVで保存する。 1回のScanでは1MBしか取れないので、最後までループして取得している。
データはtimestamでソートしている。
import boto3 import pandas as pd aws_access_key_id = 'xxx' aws_secret_access_key = 'yyyy' region_name = 'ap-northeast-1' table_name = 'zzz' def scan_dynamodb_table(table_name): dynamodb = boto3.resource('dynamodb', region_name=region_name, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key) table = dynamodb.Table(table_name) items = [] last_evaluated_key = None while True: # ページネーションでScanを行います response = table.scan(ExclusiveStartKey=last_evaluated_key) if last_evaluated_key else table.scan() items.extend(response['Items']) # LastEvaluatedKeyがあれば次のページがあるということです if 'LastEvaluatedKey' in response: last_evaluated_key = response['LastEvaluatedKey'] else: break return items def export_to_csv(items, csv_file_path): df = pd.DataFrame(items) # timestamp列をIndexに設定 df.set_index('timestamp', inplace=True) # Indexでソート df.sort_index(inplace=True) # ファイル名に使用する最初と最後のタイムスタンプを取得 start_timestamp = str(df.index.min()) end_timestamp = str(df.index.max()) # ファイル名を作成 file_name = f"data_{start_timestamp}_{end_timestamp}.csv" # DataFrameをCSVファイルに保存 df.to_csv(file_name) if __name__ == "__main__": # DynamoDBテーブルをスキャンしてデータを取得します items = scan_dynamodb_table(table_name) # 取得したデータをCSVにエクスポートします export_to_csv(items)