BitflyerのAPIを使用して、Bitcoin自動売買プログラムを作成した

Bitコインの自動売買プログラムを作った。注文のトリガーには移動平均を用いて、長期の移動平均が、短期の移動平均を上回ったとき、指値で購入する。購入に成功したら、同時に0.4%上乗せして売り注文を出す。

  • 参考にしたサイト

zero-cheese.com

import requests
import time
import pandas as pd
import traceback
import hashlib
import hmac
import json

# ビットフライヤーのAPIキーとシークレット
API_KEY = '*****'
API_SECRET = '*****'

# APIエンドポイント
API_URL = 'https://api.bitflyer.jp'

# 注文情報
order_quantity = 0.01  # 購入および売却するBTCの数量
product_code = 'BTC_JPY'  # トレード対象の通貨ペア

# 移動平均の期間設定
short_window = 10
long_window = 20

file_name = "data.csv"

try:
    df = pd.read_csv("data.csv")
except:
    print("failed read csv")
    df = pd.DataFrame(columns=['timestamp', 'best_bid', 'best_ask'])

def get_market_price():
    # 現在の市場価格を取得
    ticker_url = f'{API_URL}/v1/ticker?product_code={product_code}'
    response = requests.get(ticker_url)
    data = response.json()
    print(data)
    return float(data['ltp'])

def get_moving_averages():
    global df
    # 移動平均を計算
    historical_data_url = f'{API_URL}/v1/getticker?product_code={product_code}'
    response = requests.get(historical_data_url)
    data = response.json()
    new_data = {
        'timestamp': data['timestamp'], 
        'best_ask': [float(data['best_ask'])],
        'best_bid': [float(data['best_bid'])],
        }
    print(data)
    new_data = pd.DataFrame(new_data)
    new_data['timestamp'] = pd.to_datetime(new_data['timestamp'])
    df = pd.concat([df, new_data])
    try:
        short_ma = df['best_bid'].rolling(window=short_window).mean().iloc[-1]
        long_ma = df['best_bid'].rolling(window=long_window).mean().iloc[-1]
    except:
        short_ma = 0
        long_ma = 0
        pass
    print(short_ma, long_ma)
    return short_ma, long_ma

def place_market_order(side, quantity):
    # 成行注文を出す
    endpoint = '/v1/me/sendchildorder'
    order_url = API_URL + endpoint

    order_data = {
        'product_code': product_code,
        'child_order_type': 'MARKET',
        'side': side,
        'size': quantity,
    }
    body = json.dumps(order_data)
    headers = header('POST', endpoint=endpoint, body=body)

    response = requests.post(order_url, headers=headers, data=body)
    return response.json()

def place_limit_order(side, price, quantity):
    # 指値注文を出す
    endpoint = '/v1/me/sendchildorder'
    order_url = API_URL + endpoint

    order_data = {
        'product_code': product_code,
        'child_order_type': 'LIMIT',
        'side': side,
        'price': price,
        'size': quantity,
    }
    body = json.dumps(order_data)
    headers = header('POST', endpoint=endpoint, body=body)

    response = requests.post(order_url, headers=headers, data=body)
    return response.json()


def header(method: str, endpoint: str, body: str) -> dict:
    timestamp = str(time.time())
    if body == '':
        message = timestamp + method + endpoint
    else:
        message = timestamp + method + endpoint + body
    signature = hmac.new(API_SECRET.encode('utf-8'), message.encode('utf-8'),
                         digestmod=hashlib.sha256).hexdigest()
    headers = {
        'Content-Type': 'application/json',
        'ACCESS-KEY': API_KEY,
        'ACCESS-TIMESTAMP': timestamp,
        'ACCESS-SIGN': signature
    }
    return headers


def get_open_orders():
    # 出ている注文一覧を取得
    endpoint = '/v1/me/getchildorders'
    
    params = {
        'product_code': 'BTC_JPY',
        'child_order_state': 'ACTIVE',  # 出ている注文だけを取得
    }
    endpoint_for_header = endpoint + '?'
    for k, v in params.items():
        endpoint_for_header += k + '=' + v
        endpoint_for_header += '&'
    endpoint_for_header = endpoint_for_header[:-1]
    
    headers = header('GET', endpoint=endpoint_for_header, body="")

    response = requests.get(API_URL + endpoint, headers=headers, params=params)
    orders = response.json()
    return orders

while True:
    try:
        # 現在の市場価格と移動平均を取得
        current_price = get_market_price()
        short_ma, long_ma = get_moving_averages()
        df.to_csv(file_name, index=False)

        # 注文が出されていない場合に短期移動平均が長期移動平均を上回った場合に購入
        orders = get_open_orders()
        print(len(orders))
        if (short_ma > long_ma) and len(orders) == 0:
            print("短期移動平均が長期移動平均を上回りました。購入注文を出します。")
            order_response = place_market_order('BUY', order_quantity)
            print("注文結果:", order_response)

            # 注文が成功したら約定価格に0.4%加算して売り注文を出す
            if 'child_order_acceptance_id' in order_response:
                executed_price = int(current_price * 1.004)  # 約定価格に0.4%加算
                print(f"約定価格: {executed_price}")
                sell_response = place_limit_order('SELL', executed_price, order_quantity)
                print("売り注文結果:", sell_response)

                # 買い注文が出されたフラグを立てる
                buy_order_placed = True

        # 売り注文が出されている場合は、一定の間隔で確認
        elif buy_order_placed:
            # ここに必要な確認処理を追加
            pass

        # 一定の間隔でトレードを実行
        time.sleep(60)  # 60秒ごとに実行
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        print(traceback.format_exc())
        time.sleep(60)  # エラーが発生した場合も60秒待ってから再試行