AWSの勉強として、BitFlyerのAPIを使用して、BitCoinの自動売買プログラムを組んでみた。今回の主目的はAWSの勉強なので、実際儲かるかは不明。
※2023/12/30 追記 以下の構成だとCloudWatchの料金が0.1USD/Dayかかるので、完全無料ではないです。
準備するもの
システム構成
こんな感じ。(書き方あってる?)EventBridgeから5分ごとにトリガーを出し、そのトリガーでLambdaがBitCoinの価格を取得、DynamoDBに保存、ゴールデンクロスの場合、Bitcoinを成り行きで購入し、0.4%を上乗せして指値で売り注文を出す。(0.4% は、BitFlyerの手数料が0.15%なので)
実装
DynamoDBの準備
AWSの上部の検索欄に「DynamoDB」と入力し、DynamoDBへ移動。
リージョンに「東京」を選択。(どこでもいいが、DynamoDBやLambdaのリージョンは同じである必要あり)
新しいテーブルを追加。
以下のように入力して作成。
Lambdaの実装
AWSの上部の検索欄に「Lambda」と入力し、Lambdaへ移動。
関数の作成から、関数名、ランタイムを選択し、作成。
必要なライブラリの準備。Layerを選択。
レイヤーの追加から、以下を追加する。
- arn:aws:lambda:ap-northeast-1:770693421928:layer:Klayers-p39-requests:18
- arn:aws:lambda:ap-northeast-1:770693421928:layer:Klayers-p39-pandas:20
- arn:aws:lambda:ap-northeast-1:770693421928:layer:Klayers-p39-boto3:21
参考↓
ロールの権限を変更し、DynamoDBを読み書きできるようにする。 Lambdaの設定→一般設定の以下のリンクをクリック。
許可を追加。DynamoDBを選択し、以下にチェックを入れる。
- Query
- Scan
- BatchWriteItem
Lambdaの設定→一般設定から、タイムアウト時間を10秒にしておく。
コード
以下のコードを張り付ける。
import json import requests import time import pandas as pd import traceback import hashlib import hmac import json from datetime import datetime import os import boto3 from boto3.dynamodb.conditions import Key, Attr # ビットフライヤーのAPIキーとシークレット API_KEY = os.environ["API_KEY"] API_SECRET = os.environ["API_SECRET"] # APIエンドポイント API_URL = 'https://api.bitflyer.jp' # 注文情報 order_quantity = float(os.environ["ODER_QUANTITY"]) # 購入および売却するBTCの数量 product_code = 'BTC_JPY' # トレード対象の通貨ペア # 移動平均の期間設定 short_window = 10 long_window = 20 df = None table_name = os.environ["TABLE_NAME"] def get_market_price(): # 現在の市場価格を取得 ticker_url = f'{API_URL}/v1/ticker?product_code={product_code}' response = requests.get(ticker_url) data = response.json() print(data) return float(data['ltp']) def get_moving_averages(): global df # 移動平均を計算 historical_data_url = f'{API_URL}/v1/getticker?product_code={product_code}' response = requests.get(historical_data_url) data = response.json() new_data = { 'timestamp': data['timestamp'], 'best_ask': [float(data['best_ask'])], 'best_bid': [float(data['best_bid'])], } new_data = pd.DataFrame(new_data) new_data['timestamp'] = pd.to_datetime(new_data['timestamp']) df = pd.concat([df, new_data]) try: short_ma = df['best_bid'].rolling(window=short_window).mean()#.iloc[-1] long_ma = df['best_bid'].rolling(window=long_window).mean()#.iloc[-1] except: short_ma = 0 long_ma = 0 pass return short_ma, long_ma def place_market_order(side, quantity): # 成行注文を出す endpoint = '/v1/me/sendchildorder' order_url = API_URL + endpoint order_data = { 'product_code': product_code, 'child_order_type': 'MARKET', 'side': side, 'size': quantity, } body = json.dumps(order_data) headers = header('POST', endpoint=endpoint, body=body) response = requests.post(order_url, headers=headers, data=body) return response.json() def place_limit_order(side, price, quantity): # 指値注文を出す endpoint = '/v1/me/sendchildorder' order_url = API_URL + endpoint order_data = { 'product_code': product_code, 'child_order_type': 'LIMIT', 'side': side, 'price': price, 'size': quantity, } body = json.dumps(order_data) headers = header('POST', endpoint=endpoint, body=body) response = requests.post(order_url, headers=headers, data=body) return response.json() def header(method: str, endpoint: str, body: str) -> dict: timestamp = str(time.time()) if body == '': message = timestamp + method + endpoint else: message = timestamp + method + endpoint + body signature = hmac.new(API_SECRET.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).hexdigest() headers = { 'Content-Type': 'application/json', 'ACCESS-KEY': API_KEY, 'ACCESS-TIMESTAMP': timestamp, 'ACCESS-SIGN': signature } return headers def get_open_orders(): # 出ている注文一覧を取得 endpoint = '/v1/me/getchildorders' params = { 'product_code': 'BTC_JPY', 'child_order_state': 'ACTIVE', # 出ている注文だけを取得 } endpoint_for_header = endpoint + '?' for k, v in params.items(): endpoint_for_header += k + '=' + v endpoint_for_header += '&' endpoint_for_header = endpoint_for_header[:-1] headers = header('GET', endpoint=endpoint_for_header, body="") response = requests.get(API_URL + endpoint, headers=headers, params=params) orders = response.json() return orders def lambda_handler(event, context): global df dynamodb = boto3.resource('dynamodb') table = dynamodb.Table(table_name) ret = table.scan() df = pd.DataFrame(data=ret['Items']) try: df = df.sort_values('timestamp') except: print("sort error") pass try: # 現在の市場価格と移動平均を取得 current_price = get_market_price() short_ma, long_ma = get_moving_averages() with table.batch_writer() as batch: tmp = df.iloc[-1].to_dict() for k in tmp.keys(): if not k == "timestamp": tmp[k] = int(tmp[k]) else: tmp[k] = tmp[k].strftime('%Y%m%d%H%M%S') tmp[k] = int(tmp[k]) data = tmp batch.put_item(Item=data) # 注文が出されていない場合に短期移動平均が長期移動平均を上回った場合に購入 orders = get_open_orders() print(len(orders)) if os.environ["BUYSELL"] == "1": if (short_ma.iloc[-1] > long_ma.iloc[-1]) and (short_ma.iloc[-2] < long_ma.iloc[-2]) and \ len(orders) < int(os.environ["ODER_NUM"]): print("短期移動平均が長期移動平均を上回りました。購入注文を出します。") order_response = place_market_order('BUY', order_quantity) print("注文結果:", order_response) # 注文が成功したら約定価格に0.4%加算して売り注文を出す if 'child_order_acceptance_id' in order_response: executed_price = int(current_price * 1.004) # 約定価格に0.4%加算 print(f"約定価格: {executed_price}") sell_response = place_limit_order('SELL', executed_price, order_quantity) print("売り注文結果:", sell_response) # 売り注文が出されている場合は、一定の間隔で確認 elif len(orders) > 0: # ここに必要な確認処理を追加 pass except Exception as e: print(f"エラーが発生しました: {e}") print(traceback.format_exc()) return { 'statusCode': 200, 'body': json.dumps('') }
以下のDeployを押す。コードを変更したら、毎回Deploy押す必要あり。
環境変数を変更する。
設定する環境変数は以下の通り。自分の環境に合わせて変更する。
- API_KEY:BitFlyerのAPI_KEY
- API_SECRET:BitFlyerのAPI_SECRET
- BUYSELL:売買を行うかどうか。1の場合、売買を行う。
- ODER_NUM:注文数の上限。3など。
- ODER_QUANTITY:1回の注文で購入するBitCoin。0.003など。
- TABLE_NAME:DynamoDBのテーブル名。test
以下のTestを押すと関数が実行される。初回はテストの設定が必要だが、デフォルトのままでOK。
成功した場合には以下のようなログになる。タイムアウトする場合は、タイムアウト時間を延ばす。
DynamoDBに保存されているかの確認
DynamoDBへ行き、テーブルアイテムの検索を押す。
スキャンを行い、以下のように表示されればOK。
トリガーの追加
以下のトリガーを追加をクリック。
以下のように入力し、トリガーを作成。間隔は自由に変更可。
トリガーが作成されると、Lambdaが自動で実行されるようになる。