AWS CloudformationでMLFlowサーバを構築した

AWS CloudformationでMLFlowサーバを構築した。MLFlowサーバのバックエンドには、S3とRDSを使用している。

AWS Cloudformationで以下のYAMLファイルでStackを構築すれば、VPCやSubnet、ロール、MLFlowがインストールされたEC2インスタンスなどが自動的に構築される。VPCのCIDRがかぶっていたりすると構築に失敗するので、その場合は変数をいい感じに編集して使用する。

AWSTemplateFormatVersion: '2010-09-09'
Parameters:
  VpcCidrBlock:
    Type: String
    Description: CIDR block for VPC
    Default: 192.168.0.0/16
  PublicSubnetCidrBlock1:
    Type: String
    Description: CIDR block for subnet
    Default: 192.168.1.0/24
  PublicSubnetCidrBlock2:
    Type: String
    Description: CIDR block for subnet
    Default: 192.168.2.0/24
  LocalSubnetCidrBlock1:
    Type: String
    Description: CIDR block for subnet
    Default: 192.168.11.0/24
  LocalSubnetCidrBlock2:
    Type: String
    Description: CIDR block for subnet
    Default: 192.168.12.0/24
  MLFlowPrivateIpAddress:
    Type: String
    Description: MLFlow Private IP Address
    Default: 192.168.1.10

  KeyName:
    Type: AWS::EC2::KeyPair::KeyName
    Description: EC2 Key Pair for SSH access
    Default: MyKeyPair
  InstanceType:
    Type: String
    Description: EC2 instance type
    Default: t2.micro
  RDSInstanceClass:
    Type: String
    Description: RDS instance class
    Default: db.t2.micro
  DBName:
    Type: String
    Description: RDS database name
    Default: mlflowdb
  S3Name:
    Type: String
    Description: S3 name
    Default: mlflows300
  MasterUsername:
    Type: String
    Description: RDS master username
    Default: mlflowuser
  MasterUserPassword:
    Type: String
    Description: RDS master user password
    NoEcho: true
    Default: mlflowpassword

Resources:
  VPC:
    Type: 'AWS::EC2::VPC'
    Properties:
      EnableDnsSupport: "true"
      EnableDnsHostnames: "true"
      CidrBlock: !Ref VpcCidrBlock
  InternetGateway:
    Type: 'AWS::EC2::InternetGateway'
    Properties:
      Tags:
        - Key: Name
          Value: !Sub ${AWS::StackName}-InternetGateway
  AttachGateway:
    Type: 'AWS::EC2::VPCGatewayAttachment'
    Properties:
      VpcId: !Ref VPC
      InternetGatewayId: !Ref InternetGateway

  PublicSubnet1:
    Type: 'AWS::EC2::Subnet'
    Properties:
      VpcId: !Ref VPC
      CidrBlock: !Ref PublicSubnetCidrBlock1
      AvailabilityZoneId: "apne1-az1"
      MapPublicIpOnLaunch: true
  PublicSubnet2:
    Type: 'AWS::EC2::Subnet'
    Properties:
      VpcId: !Ref VPC
      CidrBlock: !Ref PublicSubnetCidrBlock2
      AvailabilityZoneId: "apne1-az2"
      MapPublicIpOnLaunch: true
  LocalSubnet1:
    Type: 'AWS::EC2::Subnet'
    Properties:
      VpcId: !Ref VPC
      CidrBlock: !Ref LocalSubnetCidrBlock1
      AvailabilityZoneId: "apne1-az1"
  LocalSubnet2:
    Type: 'AWS::EC2::Subnet'
    Properties:
      VpcId: !Ref VPC
      CidrBlock: !Ref LocalSubnetCidrBlock2
      AvailabilityZoneId: "apne1-az2"
  DBSubnetGroup:
    Type: AWS::RDS::DBSubnetGroup
    Properties:
      DBSubnetGroupDescription: Subnet group
      SubnetIds: 
        - !Ref LocalSubnet1
        - !Ref LocalSubnet2

  #############################################################################
  # public Subnet
  #############################################################################   
  RouteTable1:
    Type: 'AWS::EC2::RouteTable'
    Properties:
      VpcId: !Ref VPC
      Tags:
        - Key: Name
          Value: !Sub ${AWS::StackName}-RouteTable
  PublicRoute1:
    Type: 'AWS::EC2::Route'
    Properties:
      RouteTableId: !Ref RouteTable1
      DestinationCidrBlock: 0.0.0.0/0
      GatewayId: !Ref InternetGateway
  SubnetRouteTableAssociation1:
    Type: 'AWS::EC2::SubnetRouteTableAssociation'
    Properties:
      SubnetId: !Ref PublicSubnet1
      RouteTableId: !Ref RouteTable1

  RouteTable2:
    Type: 'AWS::EC2::RouteTable'
    Properties:
      VpcId: !Ref VPC
      Tags:
        - Key: Name
          Value: !Sub ${AWS::StackName}-RouteTable
  PublicRoute2:
    Type: 'AWS::EC2::Route'
    Properties:
      RouteTableId: !Ref RouteTable2
      DestinationCidrBlock: 0.0.0.0/0
      GatewayId: !Ref InternetGateway
  SubnetRouteTableAssociation2:
    Type: 'AWS::EC2::SubnetRouteTableAssociation'
    Properties:
      SubnetId: !Ref PublicSubnet2
      RouteTableId: !Ref RouteTable2

 
  #############################################################################
  # MLflow RESOURCES SECTION
  # This section contains all the MLflow related resources
  #############################################################################       
  MLflowIAMRole:
    Type: 'AWS::IAM::Role'
    Properties:
      RoleName: TestManager
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: ec2.amazonaws.com
            Action: 'sts:AssumeRole'
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
        - arn:aws:iam::aws:policy/AmazonRDSFullAccess
  MLflowInstanceProfile:
    Type: AWS::IAM::InstanceProfile
    Properties:
      Path: "/"
      Roles:
      - !Ref MLflowIAMRole

  MLflowEC2Instance:
    Type: AWS::EC2::Instance
    DependsOn:
      - MLflowRDSInstance
      - MLflowS3Bucket01
      - MLflowSecurityGroup
    Properties:
      ImageId: ami-0dfa284c9d7b2adad  # Specify the appropriate AMI ID for your region and instance type
      InstanceType: !Ref InstanceType
      IamInstanceProfile: !Ref MLflowInstanceProfile
      KeyName: !Ref KeyName
      UserData: 
        Fn::Base64: !Sub
        - |
          #!/bin/bash
          FLAG_FILE=/var/log/first_run_completed

          # only first run
          if [ ! -f $FLAG_FILE ]; then
          sudo yum update -y
          sudo yum install -y python3 python3-pip
          sudo yum clean all
          sudo su ec2-user -c 'pip3 install -U pip'
          sudo su ec2-user -c 'pip3 install mlflow boto3 pymysql'
          sudo su ec2-user -c 'pip3 cache purge'
          export DB_ENDPOINT=`aws rds describe-db-instances --query "DBInstances[?contains(DBInstanceIdentifier, 'mlflow')].Endpoint.Address | [0]" --output text`
          echo "python3 -m mlflow server \
            --host 0.0.0.0 \
            --port 5000 \
            --backend-store-uri mysql+pymysql://${USR}:${PW}@${!DB_ENDPOINT}:3306/${DB_NAME} \
            --default-artifact-root s3://${S3_NAME}/mlflow-artifacts/" > /etc/start_mlflow.sh
          chmod 777 /etc/start_mlflow.sh
          fi
          # start mlflow server
          nohup sudo su ec2-user -c "/etc/start_mlflow.sh" & 
        - {
            USR: !Ref MasterUsername,
            PW: !Ref MasterUserPassword,
            DB_NAME: !Ref DBName,
            S3_NAME: !Ref S3Name
          }
      NetworkInterfaces: 
        - AssociatePublicIpAddress: "true"
          DeviceIndex: "0"
          GroupSet: 
            - !Ref MLflowSecurityGroup
          SubnetId: !Ref PublicSubnet1
          PrivateIpAddress: !Ref MLFlowPrivateIpAddress
  MLflowElasticIp:
    Type: 'AWS::EC2::EIP'
  MLflowInstanceAssociation:
    Type: 'AWS::EC2::EIPAssociation'
    Properties:
      AllocationId: !GetAtt MLflowElasticIp.AllocationId
      InstanceId: !Ref MLflowEC2Instance

  MLflowSecurityGroup:
    Type: 'AWS::EC2::SecurityGroup'
    Properties:
      GroupDescription: Security group for MLflow server
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 5000
          ToPort: 5000
          CidrIp: 0.0.0.0/0
        - IpProtocol: tcp
          FromPort: 22
          ToPort: 22
          CidrIp: 0.0.0.0/0

  DbSecurityByEC2SecurityGroup:
    Type: AWS::RDS::DBSecurityGroup
    DependsOn:
      - MLflowSecurityGroup
    Properties:
      GroupDescription: Ingress for Amazon EC2 security group
      EC2VpcId: !Ref VPC
      DBSecurityGroupIngress:
      - EC2SecurityGroupId: !Ref MLflowSecurityGroup

  MLflowRDSInstance:
    Type: 'AWS::RDS::DBInstance'
    Properties:
      DBSecurityGroups:
        - !Ref DbSecurityByEC2SecurityGroup
      DBSubnetGroupName: !Ref DBSubnetGroup
      AllocatedStorage: 5
      DBInstanceClass: !Ref RDSInstanceClass
      Engine: mysql  # Choose the appropriate RDS engine
      MasterUsername: !Ref MasterUsername
      MasterUserPassword: !Ref MasterUserPassword
      DBName: !Ref DBName

  MLflowS3Bucket01:
    Type: 'AWS::S3::Bucket'
    Properties:
      BucketName: !Ref S3Name

  #############################################################################
  # Model developer
  #############################################################################
  ModelDeveloperIAMRole:
    Type: 'AWS::IAM::Role'
    Properties:
      RoleName: ModelDeveloper
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: ec2.amazonaws.com
            Action: 'sts:AssumeRole'
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
  ModelDeveloperInstanceProfile:
    Type: AWS::IAM::InstanceProfile
    Properties:
      Path: "/"
      Roles:
      - !Ref ModelDeveloperIAMRole
  ModelDeveloperEC2Instance:
    Type: AWS::EC2::Instance
    Properties:
      ImageId: ami-07c589821f2b353aa  # Specify the appropriate AMI ID for your region and instance type
      InstanceType: !Ref InstanceType
      IamInstanceProfile: !Ref ModelDeveloperInstanceProfile
      KeyName: !Ref KeyName
      UserData: 
        Fn::Base64: !Sub
        - |
          #!/bin/bash
          sudo apt update -y
          sudo apt install -y python3 python3-pip
          su ec2-user <<-EOF
          pip3 install -U pip 
          pip3 install boto3
          EOF
        - {}
      NetworkInterfaces: 
        - AssociatePublicIpAddress: "true"
          DeviceIndex: "0"
          GroupSet: 
            - !Ref ModelDeveloperSecurityGroup
          SubnetId: !Ref PublicSubnet1

  ModelDeveloperSecurityGroup:
    Type: 'AWS::EC2::SecurityGroup'
    Properties:
      GroupDescription: Security group for Model Trainig server
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 22
          ToPort: 22
          CidrIp: 0.0.0.0/0

Outputs:
  # Emit values needed for deployment status (e.g., where to SSH to)
  MLflowEC2IPAddress:
    Description: "MLflow EC2 Instance Public IP Address"
    Value: !GetAtt MLflowEC2Instance.PublicIp

300 USD分のAWS無料クレジットを申請して、もらった

AWSを開始するとすぐに、300USDほしくないですか?というメールがくる。ほかの方の記事を見てみると受け取れたり受け取れなかったりしているみたいなので、参考までに自分はどうやって申請したら受け取れたかを書いておく。

申請内容

あまり詳細は覚えていないのだが、住所や氏名、電話番号といった内容は普通に自分のことを記載。会社名は、個人的な勉強のために使用しているので、「Freelance」として、会社情報も適当に合わせて記載しておいた。個人的にはここは適当でも問題ないと思う。

AWSの利用状況

AWSは、以下の記事にある、5分ごとに実行されるBotを24時間稼働している。請求状況はCloudWatchから、0.3USDほど請求が来ていた。個人的にはこのような、AWSを使っているという実績が必要なのではないか?と思う。

AWSの無料枠でBitCoinの自動売買プログラム組んでみた - ぼうびろく

申請後

申請してから1週間程度で、300UDSあげたよ、というメールがくる。

【合格体験記】エンベデッドシステムスペシャリスト試験(令和5年度秋期) (午後Ⅰ 95点)

エンデベッドシステムスペシャリスト合格しました。思ったよりも高得点だったので合格体験記を書こうと思います。

過去に合格した試験

受験理由

業務が組み込みに近いので、自主学習がてら受けました。

勉強時間

  • 午前II試験:2か月ほど前から、毎日通勤時間にスマホアプリで過去問解いてました。
  • 午後I試験:2か月ほど前から、週8時間ほど参考書か過去問を解いていました。過去問は週2問くらいをじっくりといていました。
  • 午後II試験:やってません。(小論文に変わったの知らなくてめちゃくちゃ焦った。。)

参考書

使ったのは以下の2つ。ESは覚えることが少ないので、参考書は必須ではないかもだけど、1冊はあると便利。

hb.afl.rakuten.co.jp

hb.afl.rakuten.co.jp

重要だと思ったこと

ESを受けるうえでの考え方として重要だと思ったのは以下。

コンポーネントの役割を意識する

無意味なコンポーネントはないので、各タスクやHWが何のために存在しているのか意識して読むようにしていると、自然と組み込みの考え方が身につく

ユースケースを考えて、頭の中で動かしてみる

動かしてみると、各タスクの相互作用やデータの流れがわかってくるので、半分くらい読めたらユースケースを考えてみて、実際に動かしてみるといい。

異常系を考える

組み込みで難しいのは異常系だと思う。例えばHWが予期しない動作をした場合にどうするかや、事故や故障が起きた場合に安全に停止できるか、それをユーザが検知できるかや、天候によって性能が左右されないかなど、考えるときりがない。しかし異常系を予期しておくというのは非常に大切で、よく問題にも出るので、常に考える訓練をしておくといい。

結論ファースト(小論文)

小論文で必要なのは、相手に伝えること。なので結論から書いて、あとから肉付けをしていく。例えば、○○の機能仕様をかけ、みたいな出題であれば、「機能仕様は○○である。○○とは~~」みたいな文章がいい。ただし、この結論部分に事前説明が必要な固有名詞は入れないほうがいい。なぜなら、その分説明を記載する必要があり、文章が長くなり、結果としてわかりづらい内容となってしまう。 最も、固有名詞は文章を難読にするので、キーワード的に1,2個入れるのはいいと思うが、あまり入れないほうがいい。

公式サイトで試験内容の確認をする

記述問題かと思ったら小論文だった、みたいなことにならないように。気を付けましょう。(自戒)

PythonでDynamoDBをローカルにバックアップするコード

DynamoDBをダウンロードしてローカルにCSVで保存する。 1回のScanでは1MBしか取れないので、最後までループして取得している。

データはtimestamでソートしている。

import boto3
import pandas as pd

aws_access_key_id = 'xxx'
aws_secret_access_key = 'yyyy'
region_name = 'ap-northeast-1'
table_name = 'zzz'

def scan_dynamodb_table(table_name):
    dynamodb = boto3.resource('dynamodb', region_name=region_name,
                          aws_access_key_id=aws_access_key_id,
                          aws_secret_access_key=aws_secret_access_key)

    table = dynamodb.Table(table_name)

    items = []
    last_evaluated_key = None

    while True:
        # ページネーションでScanを行います
        response = table.scan(ExclusiveStartKey=last_evaluated_key) if last_evaluated_key else table.scan()
        items.extend(response['Items'])

        # LastEvaluatedKeyがあれば次のページがあるということです
        if 'LastEvaluatedKey' in response:
            last_evaluated_key = response['LastEvaluatedKey']
        else:
            break

    return items

def export_to_csv(items, csv_file_path):
    df = pd.DataFrame(items)
    # timestamp列をIndexに設定
    df.set_index('timestamp', inplace=True)

    # Indexでソート
    df.sort_index(inplace=True)

    # ファイル名に使用する最初と最後のタイムスタンプを取得
    start_timestamp = str(df.index.min())
    end_timestamp = str(df.index.max())

    # ファイル名を作成
    file_name = f"data_{start_timestamp}_{end_timestamp}.csv"

    # DataFrameをCSVファイルに保存
    df.to_csv(file_name)

if __name__ == "__main__":
    # DynamoDBテーブルをスキャンしてデータを取得します
    items = scan_dynamodb_table(table_name)

    # 取得したデータをCSVにエクスポートします
    export_to_csv(items)

OpenCVで重なりのある物体の領域分割

OpenCVで、重なりのある物体の領域分割をやってみた。 以下の関数では領域分割を行い、その領域をクロッピングして返却する。 領域分割がうまくいかない場合は、dist_factorを調整する。これは重なりが大きい場合は小さく、重なりが小さい場合は大きい値にする。

参考↓

labs.eecs.tottori-u.ac.jp

def watershed_segmentation_with_bounding_boxes(image, dist_factor=0.35):
    mask = image.copy().astype(np.uint8)
    # 画像をグレースケールに変換
    gray = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)

    # 画像の前処理(例:ノイズ除去やコントラスト調整)
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)

    # 画像の閾値処理を適用して、物体と背景を区別
    _, thresh = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

    # 領域分割を行う前に、距離変換を計算
    dist_transform = cv2.distanceTransform(thresh, cv2.DIST_L2, 5)
    _, sure_fg = cv2.threshold(dist_transform, dist_factor * dist_transform.max(), 255, 0)

    # sure_fgをもとにマーカーを作成
    sure_fg = np.uint8(sure_fg)
    unknown = cv2.subtract(thresh, sure_fg)

    # マーカーにラベルを付ける
    _, markers = cv2.connectedComponents(sure_fg)

    # マーカーの背景を1に設定
    markers = markers + 1
    markers[unknown == 255] = 0

    # Watershedアルゴリズムを適用
    cv2.watershed(mask, markers)
    #show_img(markers)

    labels = np.unique(markers)
    contours_all = []
    # 各領域の輪郭を検出
    for label in labels[2:]:
        target = np.where(markers == label, 255, 0).astype(np.uint8)
        contours, _ = cv2.findContours(target, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        for c in contours:
            contours_all.append(c)

    # 輪郭を囲むバウンディングボックスを取得
    bounding_boxes = [cv2.boundingRect(contour) for contour in contours_all]

    # バウンディングボックスを描画
    #for x, y, w, h in bounding_boxes:
    #    cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)  # 緑のバウンディングボックスを描画
    croped = []
    for (x, y, w, h), contour in zip(bounding_boxes, contours_all):
        mask = np.zeros(image.shape)
        mask = cv2.drawContours(mask, [contour], -1, (1, 1, 1), -1).astype(np.uint8)

        area = w*h
        if area > 4000 and area < 13000:  # あるしきい値より大きな領域のみを描画
            croped.append(cv2.resize((np.multiply(image.copy(), mask))[y:y+h, x:x+w, :], (16, 16)))
    return bounding_boxes, croped

Vision Transformerによるクラス分類を実装してみた

Vision Transformer (ViT)によるクラス分類を実装したのでメモしておく。

  • 参考にしたサイト様

farml1.com

以下のコードは、Google colabで実行する。

必要なライブラリのインストール

!pip install timm seaborn linformer tqdm pandas

import (全部はいらないかも)

from __future__ import print_function

import glob
from itertools import chain
import os
import random
import zipfile

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from linformer import Linformer
from PIL import Image
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from tqdm.notebook import tqdm

from sklearn.metrics import average_precision_score

#from vit_pytorch.efficient import ViT
import seaborn as sns
import timm
from pprint import pprint
model_names = timm.list_models(pretrained=True)
pprint([ x for x in model_names if "vit" in x])

ここで使えるViTのモデル一覧が表示されるので、使いたいものを選択する。

ハイパーパラメータの設定

# Training settings
model_name = 'vit_medium_patch16_gap_384.sw_in12k_ft_in1k'
batch_size = 48
epochs = 20
lr = 3e-5
gamma = 0.7
seed = 42
img_size = 384
weight_decay = 1e-3
device = 'cuda'
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

seed_everything(seed)

学習データのロード

data_path = "/content/drive/MyDrive/data"
train_dir = data_path + '/train'
test_dir = data_path + '/test'
train_list = sorted(glob.glob(os.path.join(train_dir,'*.png')))
test_list = sorted(glob.glob(os.path.join(test_dir, '*.png')))
print(f"Train Data: {len(train_list)}")
print(f"Test Data: {len(test_list)}")

train_labels = np.loadtxt(data_path + "/train.csv", delimiter=',', dtype='int64',
               skiprows=1, usecols=[1])
test_labels = np.loadtxt(data_path + "/train.csv", delimiter=',', dtype='int64',
               skiprows=1, usecols=[1])
test_labels = np.zeros((len(test_list),))

train_df = pd.DataFrame({
    'data_path': train_list,
    'label': train_labels
})
test_df = pd.DataFrame({
    'data_path': test_list,
    'label': test_labels
})
random_idx = np.random.randint(1, len(train_df.data_path), size=9)
fig, axes = plt.subplots(3, 3, figsize=(16, 12))

for idx, ax in enumerate(axes.ravel()):
    img = Image.open(train_df.data_path[idx])
    ax.set_title(train_df.label[idx])
    ax.imshow(img)

データセットを分割。

train_df, valid_df = train_test_split(train_df,
                                          test_size=0.1,
                                          stratify=train_df.label,
                                          random_state=seed)
print(f"Train Data: {len(train_df)}")
print(f"Validation Data: {len(valid_df)}")
print(f"Test Data: {len(test_df)}")

前処理の設定。

train_transforms = transforms.Compose(
    [
        transforms.Resize((img_size, img_size)),
        transforms.RandomResizedCrop(img_size),
        transforms.RandomRotation(degrees = 180),
        #transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.5),
        transforms.ToTensor(),
        transforms.RandomErasing(p=0.5),
    ]
)

val_transforms = transforms.Compose(
    [
        transforms.Resize(img_size),
        transforms.ToTensor(),
    ]
)


test_transforms = transforms.Compose(
    [
        transforms.Resize(img_size),
        transforms.ToTensor(),
    ]
)
import numpy as np
class MyDataset(Dataset):
    def __init__(self, df, transform=None, return_filepath=False):
        self.file_list = list(df.data_path)
        self.transform = transform
        self.label = list(df.label)
        self.return_filepath = return_filepath

    def __len__(self):
        self.filelength = len(self.file_list)
        return self.filelength

    def __getitem__(self, idx):
        img_path = self.file_list[idx]
        img = Image.open(img_path)
        img_transformed = self.transform(img)
        if not self.return_filepath:
          return img_transformed, self.label[idx]
        else:
          return img_transformed, os.path.basename(img_path)
train_data = MyDataset(train_df, transform=train_transforms)
valid_data = MyDataset(valid_df, transform=test_transforms)
test_data = MyDataset(test_df, transform=test_transforms, return_filepath=True)

train_loader = DataLoader(dataset = train_data, batch_size=batch_size, shuffle=True )
valid_loader = DataLoader(dataset = valid_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset = test_data, batch_size=1, shuffle=False)

print(len(train_data), len(train_loader))

モデルの準備

model = timm.create_model(model_name, pretrained=True, num_classes=2).to(device)
#model = torch.load(data_path + '/model_weight.pth').to(device)
# loss function
criterion = nn.CrossEntropyLoss()
# optimizer
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
# scheduler
scheduler = StepLR(optimizer, step_size=1, gamma=gamma)

学習の実行

train_acc_list = []
val_acc_list = []
train_loss_list = []
val_loss_list = []
##############

epochs = 20
m = nn.Softmax(dim=1)

for epoch in range(epochs):
    epoch_loss = 0
    epoch_accuracy = 0

    for data, label in tqdm(train_loader):
        data = data.to(device)
        label = label.to(device)

        output = m(model(data))
        loss = criterion(output, label)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        acc = (output.argmax(dim=1) == label).float().mean()
        #tmp = output.detach().cpu().numpy()
        #result = tmp[:, 1]
        #acc = average_precision_score(label.detach().cpu().numpy(), result, average='samples')


        epoch_accuracy += acc / len(train_loader)
        epoch_loss += loss / len(train_loader)

    with torch.no_grad():
        epoch_val_accuracy = 0
        epoch_val_loss = 0
        for data, label in valid_loader:
            data = data.to(device)
            label = label.to(device)

            val_output = model(data)
            val_loss = criterion(val_output, label)

            acc = (val_output.argmax(dim=1) == label).float().mean()
            #tmp = val_output.detach().cpu().numpy()
            #result = tmp[:, 1]
            #acc = average_precision_score(label.detach().cpu().numpy(), result, average='samples')
            epoch_val_accuracy += acc / len(valid_loader)
            epoch_val_loss += val_loss / len(valid_loader)

    print(
        f"Epoch : {epoch+1} - loss : {epoch_loss:.4f} - acc: {epoch_accuracy:.4f} - val_loss : {epoch_val_loss:.4f} - val_acc: {epoch_val_accuracy:.4f}\n"
    )
    torch.save(model, data_path + '/model_weight.pth')

    train_acc_list.append(epoch_accuracy)
    val_acc_list.append(epoch_val_accuracy)
    train_loss_list.append(epoch_loss)
    val_loss_list.append(epoch_val_loss)

学習結果のプロット

device2 = torch.device('cpu')

train_acc = []
train_loss = []
val_acc = []
val_loss = []

for i in range(epochs):
    train_acc2 = train_acc_list[i].to(device2)
    train_acc3 = train_acc2.clone().numpy()
    train_acc.append(train_acc3)

    train_loss2 = train_loss_list[i].to(device2)
    train_loss3 = train_loss2.clone().detach().numpy()
    train_loss.append(train_loss3)

    val_acc2 = val_acc_list[i].to(device2)
    val_acc3 = val_acc2.clone().numpy()
    val_acc.append(val_acc3)

    val_loss2 = val_loss_list[i].to(device2)
    val_loss3 = val_loss2.clone().numpy()
    val_loss.append(val_loss3)

#取得したデータをグラフ化する
sns.set()
num_epochs = epochs

fig = plt.subplots(figsize=(12, 4), dpi=80)

ax1 = plt.subplot(1,2,1)
ax1.plot(range(num_epochs), train_acc, c='b', label='train acc')
ax1.plot(range(num_epochs), val_acc, c='r', label='val acc')
ax1.set_xlabel('epoch', fontsize='12')
ax1.set_ylabel('accuracy', fontsize='12')
ax1.set_title('training and val acc', fontsize='14')
ax1.legend(fontsize='12')

ax2 = plt.subplot(1,2,2)
ax2.plot(range(num_epochs), train_loss, c='b', label='train loss')
ax2.plot(range(num_epochs), val_loss, c='r', label='val loss')
ax2.set_xlabel('epoch', fontsize='12')
ax2.set_ylabel('loss', fontsize='12')
ax2.set_title('training and val loss', fontsize='14')
ax2.legend(fontsize='12')
plt.show()

AWSの無料枠でBitCoinの自動売買プログラム組んでみた

AWSの勉強として、BitFlyerAPIを使用して、BitCoinの自動売買プログラムを組んでみた。今回の主目的はAWSの勉強なので、実際儲かるかは不明。

※2023/12/30 追記 以下の構成だとCloudWatchの料金が0.1USD/Dayかかるので、完全無料ではないです。

準備するもの

システム構成

こんな感じ。(書き方あってる?)EventBridgeから5分ごとにトリガーを出し、そのトリガーでLambdaがBitCoinの価格を取得、DynamoDBに保存、ゴールデンクロスの場合、Bitcoinを成り行きで購入し、0.4%を上乗せして指値で売り注文を出す。(0.4% は、BitFlyerの手数料が0.15%なので)

実装

DynamoDBの準備

AWSの上部の検索欄に「DynamoDB」と入力し、DynamoDBへ移動。

リージョンに「東京」を選択。(どこでもいいが、DynamoDBやLambdaのリージョンは同じである必要あり)

新しいテーブルを追加。

以下のように入力して作成。

Lambdaの実装

AWSの上部の検索欄に「Lambda」と入力し、Lambdaへ移動。

関数の作成から、関数名、ランタイムを選択し、作成。

必要なライブラリの準備。Layerを選択。

レイヤーの追加から、以下を追加する。

  • arn:aws:lambda:ap-northeast-1:770693421928:layer:Klayers-p39-requests:18
  • arn:aws:lambda:ap-northeast-1:770693421928:layer:Klayers-p39-pandas:20
  • arn:aws:lambda:ap-northeast-1:770693421928:layer:Klayers-p39-boto3:21

参考↓

qiita.com

github.com

ロールの権限を変更し、DynamoDBを読み書きできるようにする。 Lambdaの設定→一般設定の以下のリンクをクリック。

許可を追加。DynamoDBを選択し、以下にチェックを入れる。

  • Query
  • Scan
  • BatchWriteItem

Lambdaの設定→一般設定から、タイムアウト時間を10秒にしておく。

コード

以下のコードを張り付ける。

import json
import requests
import time
import pandas as pd
import traceback
import hashlib
import hmac
import json
from datetime import datetime
import os

import boto3

from boto3.dynamodb.conditions import Key, Attr

# ビットフライヤーのAPIキーとシークレット
API_KEY = os.environ["API_KEY"]
API_SECRET = os.environ["API_SECRET"]

# APIエンドポイント
API_URL = 'https://api.bitflyer.jp'

# 注文情報
order_quantity = float(os.environ["ODER_QUANTITY"])  # 購入および売却するBTCの数量
product_code = 'BTC_JPY'  # トレード対象の通貨ペア

# 移動平均の期間設定
short_window = 10
long_window = 20

df = None

table_name = os.environ["TABLE_NAME"]

def get_market_price():
    # 現在の市場価格を取得
    ticker_url = f'{API_URL}/v1/ticker?product_code={product_code}'
    response = requests.get(ticker_url)
    data = response.json()
    print(data)
    return float(data['ltp'])

def get_moving_averages():
    global df
    # 移動平均を計算
    historical_data_url = f'{API_URL}/v1/getticker?product_code={product_code}'
    response = requests.get(historical_data_url)
    data = response.json()
    new_data = {
        'timestamp': data['timestamp'], 
        'best_ask': [float(data['best_ask'])],
        'best_bid': [float(data['best_bid'])],
        }
    new_data = pd.DataFrame(new_data)
    new_data['timestamp'] = pd.to_datetime(new_data['timestamp'])
    df = pd.concat([df, new_data])
    try:
        short_ma = df['best_bid'].rolling(window=short_window).mean()#.iloc[-1]
        long_ma = df['best_bid'].rolling(window=long_window).mean()#.iloc[-1]
    except:
        short_ma = 0
        long_ma = 0
        pass
    return short_ma, long_ma

def place_market_order(side, quantity):
    # 成行注文を出す
    endpoint = '/v1/me/sendchildorder'
    order_url = API_URL + endpoint

    order_data = {
        'product_code': product_code,
        'child_order_type': 'MARKET',
        'side': side,
        'size': quantity,
    }
    body = json.dumps(order_data)
    headers = header('POST', endpoint=endpoint, body=body)

    response = requests.post(order_url, headers=headers, data=body)
    return response.json()

def place_limit_order(side, price, quantity):
    # 指値注文を出す
    endpoint = '/v1/me/sendchildorder'
    order_url = API_URL + endpoint

    order_data = {
        'product_code': product_code,
        'child_order_type': 'LIMIT',
        'side': side,
        'price': price,
        'size': quantity,
    }
    body = json.dumps(order_data)
    headers = header('POST', endpoint=endpoint, body=body)

    response = requests.post(order_url, headers=headers, data=body)
    return response.json()


def header(method: str, endpoint: str, body: str) -> dict:
    timestamp = str(time.time())
    if body == '':
        message = timestamp + method + endpoint
    else:
        message = timestamp + method + endpoint + body
    signature = hmac.new(API_SECRET.encode('utf-8'), message.encode('utf-8'),
                         digestmod=hashlib.sha256).hexdigest()
    headers = {
        'Content-Type': 'application/json',
        'ACCESS-KEY': API_KEY,
        'ACCESS-TIMESTAMP': timestamp,
        'ACCESS-SIGN': signature
    }
    return headers


def get_open_orders():
    # 出ている注文一覧を取得
    endpoint = '/v1/me/getchildorders'
    
    params = {
        'product_code': 'BTC_JPY',
        'child_order_state': 'ACTIVE',  # 出ている注文だけを取得
    }
    endpoint_for_header = endpoint + '?'
    for k, v in params.items():
        endpoint_for_header += k + '=' + v
        endpoint_for_header += '&'
    endpoint_for_header = endpoint_for_header[:-1]
    
    headers = header('GET', endpoint=endpoint_for_header, body="")

    response = requests.get(API_URL + endpoint, headers=headers, params=params)
    orders = response.json()
    return orders

def lambda_handler(event, context):
    global df
    
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(table_name)
    ret = table.scan()
    df = pd.DataFrame(data=ret['Items'])
    try:
        df = df.sort_values('timestamp')
    except:
        print("sort error")
        pass
    
    try:
        # 現在の市場価格と移動平均を取得
        current_price = get_market_price()
        short_ma, long_ma = get_moving_averages()
        
        with table.batch_writer() as batch:
            tmp = df.iloc[-1].to_dict()
            for k in tmp.keys():
                if not k == "timestamp":
                    tmp[k] = int(tmp[k])
                else:
                    tmp[k] = tmp[k].strftime('%Y%m%d%H%M%S')
                    tmp[k] = int(tmp[k])
            data = tmp
            batch.put_item(Item=data)
    
        # 注文が出されていない場合に短期移動平均が長期移動平均を上回った場合に購入
        orders = get_open_orders()
        print(len(orders))
            
        if os.environ["BUYSELL"] == "1":
            if (short_ma.iloc[-1] > long_ma.iloc[-1]) and (short_ma.iloc[-2] < long_ma.iloc[-2]) and \
                len(orders) < int(os.environ["ODER_NUM"]):
                print("短期移動平均が長期移動平均を上回りました。購入注文を出します。")
                order_response = place_market_order('BUY', order_quantity)
                print("注文結果:", order_response)
    
                # 注文が成功したら約定価格に0.4%加算して売り注文を出す
                if 'child_order_acceptance_id' in order_response:
                    executed_price = int(current_price * 1.004)  # 約定価格に0.4%加算
                    print(f"約定価格: {executed_price}")
                    sell_response = place_limit_order('SELL', executed_price, order_quantity)
                    print("売り注文結果:", sell_response)
    
            # 売り注文が出されている場合は、一定の間隔で確認
            elif len(orders) > 0:
                # ここに必要な確認処理を追加
                pass

    except Exception as e:
        print(f"エラーが発生しました: {e}")
        print(traceback.format_exc())

    return {
        'statusCode': 200,
        'body': json.dumps('')
    }

以下のDeployを押す。コードを変更したら、毎回Deploy押す必要あり。

環境変数を変更する。

設定する環境変数は以下の通り。自分の環境に合わせて変更する。

  • API_KEY:BitFlyerAPI_KEY
  • API_SECRET:BitFlyerAPI_SECRET
  • BUYSELL:売買を行うかどうか。1の場合、売買を行う。
  • ODER_NUM:注文数の上限。3など。
  • ODER_QUANTITY:1回の注文で購入するBitCoin。0.003など。
  • TABLE_NAME:DynamoDBのテーブル名。test

以下のTestを押すと関数が実行される。初回はテストの設定が必要だが、デフォルトのままでOK。

成功した場合には以下のようなログになる。タイムアウトする場合は、タイムアウト時間を延ばす。

DynamoDBに保存されているかの確認

DynamoDBへ行き、テーブルアイテムの検索を押す。

スキャンを行い、以下のように表示されればOK。

トリガーの追加

以下のトリガーを追加をクリック。

以下のように入力し、トリガーを作成。間隔は自由に変更可。

トリガーが作成されると、Lambdaが自動で実行されるようになる。