AWSの無料枠でBitCoinの自動売買プログラム組んでみた

AWSの勉強として、BitFlyerAPIを使用して、BitCoinの自動売買プログラムを組んでみた。今回の主目的はAWSの勉強なので、実際儲かるかは不明。

※2023/12/30 追記 以下の構成だとCloudWatchの料金が0.1USD/Dayかかるので、完全無料ではないです。

準備するもの

システム構成

こんな感じ。(書き方あってる?)EventBridgeから5分ごとにトリガーを出し、そのトリガーでLambdaがBitCoinの価格を取得、DynamoDBに保存、ゴールデンクロスの場合、Bitcoinを成り行きで購入し、0.4%を上乗せして指値で売り注文を出す。(0.4% は、BitFlyerの手数料が0.15%なので)

実装

DynamoDBの準備

AWSの上部の検索欄に「DynamoDB」と入力し、DynamoDBへ移動。

リージョンに「東京」を選択。(どこでもいいが、DynamoDBやLambdaのリージョンは同じである必要あり)

新しいテーブルを追加。

以下のように入力して作成。

Lambdaの実装

AWSの上部の検索欄に「Lambda」と入力し、Lambdaへ移動。

関数の作成から、関数名、ランタイムを選択し、作成。

必要なライブラリの準備。Layerを選択。

レイヤーの追加から、以下を追加する。

  • arn:aws:lambda:ap-northeast-1:770693421928:layer:Klayers-p39-requests:18
  • arn:aws:lambda:ap-northeast-1:770693421928:layer:Klayers-p39-pandas:20
  • arn:aws:lambda:ap-northeast-1:770693421928:layer:Klayers-p39-boto3:21

参考↓

qiita.com

github.com

ロールの権限を変更し、DynamoDBを読み書きできるようにする。 Lambdaの設定→一般設定の以下のリンクをクリック。

許可を追加。DynamoDBを選択し、以下にチェックを入れる。

  • Query
  • Scan
  • BatchWriteItem

Lambdaの設定→一般設定から、タイムアウト時間を10秒にしておく。

コード

以下のコードを張り付ける。

import json
import requests
import time
import pandas as pd
import traceback
import hashlib
import hmac
import json
from datetime import datetime
import os

import boto3

from boto3.dynamodb.conditions import Key, Attr

# ビットフライヤーのAPIキーとシークレット
API_KEY = os.environ["API_KEY"]
API_SECRET = os.environ["API_SECRET"]

# APIエンドポイント
API_URL = 'https://api.bitflyer.jp'

# 注文情報
order_quantity = float(os.environ["ODER_QUANTITY"])  # 購入および売却するBTCの数量
product_code = 'BTC_JPY'  # トレード対象の通貨ペア

# 移動平均の期間設定
short_window = 10
long_window = 20

df = None

table_name = os.environ["TABLE_NAME"]

def get_market_price():
    # 現在の市場価格を取得
    ticker_url = f'{API_URL}/v1/ticker?product_code={product_code}'
    response = requests.get(ticker_url)
    data = response.json()
    print(data)
    return float(data['ltp'])

def get_moving_averages():
    global df
    # 移動平均を計算
    historical_data_url = f'{API_URL}/v1/getticker?product_code={product_code}'
    response = requests.get(historical_data_url)
    data = response.json()
    new_data = {
        'timestamp': data['timestamp'], 
        'best_ask': [float(data['best_ask'])],
        'best_bid': [float(data['best_bid'])],
        }
    new_data = pd.DataFrame(new_data)
    new_data['timestamp'] = pd.to_datetime(new_data['timestamp'])
    df = pd.concat([df, new_data])
    try:
        short_ma = df['best_bid'].rolling(window=short_window).mean()#.iloc[-1]
        long_ma = df['best_bid'].rolling(window=long_window).mean()#.iloc[-1]
    except:
        short_ma = 0
        long_ma = 0
        pass
    return short_ma, long_ma

def place_market_order(side, quantity):
    # 成行注文を出す
    endpoint = '/v1/me/sendchildorder'
    order_url = API_URL + endpoint

    order_data = {
        'product_code': product_code,
        'child_order_type': 'MARKET',
        'side': side,
        'size': quantity,
    }
    body = json.dumps(order_data)
    headers = header('POST', endpoint=endpoint, body=body)

    response = requests.post(order_url, headers=headers, data=body)
    return response.json()

def place_limit_order(side, price, quantity):
    # 指値注文を出す
    endpoint = '/v1/me/sendchildorder'
    order_url = API_URL + endpoint

    order_data = {
        'product_code': product_code,
        'child_order_type': 'LIMIT',
        'side': side,
        'price': price,
        'size': quantity,
    }
    body = json.dumps(order_data)
    headers = header('POST', endpoint=endpoint, body=body)

    response = requests.post(order_url, headers=headers, data=body)
    return response.json()


def header(method: str, endpoint: str, body: str) -> dict:
    timestamp = str(time.time())
    if body == '':
        message = timestamp + method + endpoint
    else:
        message = timestamp + method + endpoint + body
    signature = hmac.new(API_SECRET.encode('utf-8'), message.encode('utf-8'),
                         digestmod=hashlib.sha256).hexdigest()
    headers = {
        'Content-Type': 'application/json',
        'ACCESS-KEY': API_KEY,
        'ACCESS-TIMESTAMP': timestamp,
        'ACCESS-SIGN': signature
    }
    return headers


def get_open_orders():
    # 出ている注文一覧を取得
    endpoint = '/v1/me/getchildorders'
    
    params = {
        'product_code': 'BTC_JPY',
        'child_order_state': 'ACTIVE',  # 出ている注文だけを取得
    }
    endpoint_for_header = endpoint + '?'
    for k, v in params.items():
        endpoint_for_header += k + '=' + v
        endpoint_for_header += '&'
    endpoint_for_header = endpoint_for_header[:-1]
    
    headers = header('GET', endpoint=endpoint_for_header, body="")

    response = requests.get(API_URL + endpoint, headers=headers, params=params)
    orders = response.json()
    return orders

def lambda_handler(event, context):
    global df
    
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(table_name)
    ret = table.scan()
    df = pd.DataFrame(data=ret['Items'])
    try:
        df = df.sort_values('timestamp')
    except:
        print("sort error")
        pass
    
    try:
        # 現在の市場価格と移動平均を取得
        current_price = get_market_price()
        short_ma, long_ma = get_moving_averages()
        
        with table.batch_writer() as batch:
            tmp = df.iloc[-1].to_dict()
            for k in tmp.keys():
                if not k == "timestamp":
                    tmp[k] = int(tmp[k])
                else:
                    tmp[k] = tmp[k].strftime('%Y%m%d%H%M%S')
                    tmp[k] = int(tmp[k])
            data = tmp
            batch.put_item(Item=data)
    
        # 注文が出されていない場合に短期移動平均が長期移動平均を上回った場合に購入
        orders = get_open_orders()
        print(len(orders))
            
        if os.environ["BUYSELL"] == "1":
            if (short_ma.iloc[-1] > long_ma.iloc[-1]) and (short_ma.iloc[-2] < long_ma.iloc[-2]) and \
                len(orders) < int(os.environ["ODER_NUM"]):
                print("短期移動平均が長期移動平均を上回りました。購入注文を出します。")
                order_response = place_market_order('BUY', order_quantity)
                print("注文結果:", order_response)
    
                # 注文が成功したら約定価格に0.4%加算して売り注文を出す
                if 'child_order_acceptance_id' in order_response:
                    executed_price = int(current_price * 1.004)  # 約定価格に0.4%加算
                    print(f"約定価格: {executed_price}")
                    sell_response = place_limit_order('SELL', executed_price, order_quantity)
                    print("売り注文結果:", sell_response)
    
            # 売り注文が出されている場合は、一定の間隔で確認
            elif len(orders) > 0:
                # ここに必要な確認処理を追加
                pass

    except Exception as e:
        print(f"エラーが発生しました: {e}")
        print(traceback.format_exc())

    return {
        'statusCode': 200,
        'body': json.dumps('')
    }

以下のDeployを押す。コードを変更したら、毎回Deploy押す必要あり。

環境変数を変更する。

設定する環境変数は以下の通り。自分の環境に合わせて変更する。

  • API_KEY:BitFlyerAPI_KEY
  • API_SECRET:BitFlyerAPI_SECRET
  • BUYSELL:売買を行うかどうか。1の場合、売買を行う。
  • ODER_NUM:注文数の上限。3など。
  • ODER_QUANTITY:1回の注文で購入するBitCoin。0.003など。
  • TABLE_NAME:DynamoDBのテーブル名。test

以下のTestを押すと関数が実行される。初回はテストの設定が必要だが、デフォルトのままでOK。

成功した場合には以下のようなログになる。タイムアウトする場合は、タイムアウト時間を延ばす。

DynamoDBに保存されているかの確認

DynamoDBへ行き、テーブルアイテムの検索を押す。

スキャンを行い、以下のように表示されればOK。

トリガーの追加

以下のトリガーを追加をクリック。

以下のように入力し、トリガーを作成。間隔は自由に変更可。

トリガーが作成されると、Lambdaが自動で実行されるようになる。