PythonでDynamoDBをローカルにバックアップするコード

DynamoDBをダウンロードしてローカルにCSVで保存する。 1回のScanでは1MBしか取れないので、最後までループして取得している。

データはtimestamでソートしている。

import boto3
import pandas as pd

aws_access_key_id = 'xxx'
aws_secret_access_key = 'yyyy'
region_name = 'ap-northeast-1'
table_name = 'zzz'

def scan_dynamodb_table(table_name):
    dynamodb = boto3.resource('dynamodb', region_name=region_name,
                          aws_access_key_id=aws_access_key_id,
                          aws_secret_access_key=aws_secret_access_key)

    table = dynamodb.Table(table_name)

    items = []
    last_evaluated_key = None

    while True:
        # ページネーションでScanを行います
        response = table.scan(ExclusiveStartKey=last_evaluated_key) if last_evaluated_key else table.scan()
        items.extend(response['Items'])

        # LastEvaluatedKeyがあれば次のページがあるということです
        if 'LastEvaluatedKey' in response:
            last_evaluated_key = response['LastEvaluatedKey']
        else:
            break

    return items

def export_to_csv(items, csv_file_path):
    df = pd.DataFrame(items)
    # timestamp列をIndexに設定
    df.set_index('timestamp', inplace=True)

    # Indexでソート
    df.sort_index(inplace=True)

    # ファイル名に使用する最初と最後のタイムスタンプを取得
    start_timestamp = str(df.index.min())
    end_timestamp = str(df.index.max())

    # ファイル名を作成
    file_name = f"data_{start_timestamp}_{end_timestamp}.csv"

    # DataFrameをCSVファイルに保存
    df.to_csv(file_name)

if __name__ == "__main__":
    # DynamoDBテーブルをスキャンしてデータを取得します
    items = scan_dynamodb_table(table_name)

    # 取得したデータをCSVにエクスポートします
    export_to_csv(items)