AWS CloudformationでMLFlowサーバを構築した
AWS CloudformationでMLFlowサーバを構築した。MLFlowサーバのバックエンドには、S3とRDSを使用している。
AWS Cloudformationで以下のYAMLファイルでStackを構築すれば、VPCやSubnet、ロール、MLFlowがインストールされたEC2インスタンスなどが自動的に構築される。VPCのCIDRがかぶっていたりすると構築に失敗するので、その場合は変数をいい感じに編集して使用する。
AWSTemplateFormatVersion: '2010-09-09' Parameters: VpcCidrBlock: Type: String Description: CIDR block for VPC Default: 192.168.0.0/16 PublicSubnetCidrBlock1: Type: String Description: CIDR block for subnet Default: 192.168.1.0/24 PublicSubnetCidrBlock2: Type: String Description: CIDR block for subnet Default: 192.168.2.0/24 LocalSubnetCidrBlock1: Type: String Description: CIDR block for subnet Default: 192.168.11.0/24 LocalSubnetCidrBlock2: Type: String Description: CIDR block for subnet Default: 192.168.12.0/24 MLFlowPrivateIpAddress: Type: String Description: MLFlow Private IP Address Default: 192.168.1.10 KeyName: Type: AWS::EC2::KeyPair::KeyName Description: EC2 Key Pair for SSH access Default: MyKeyPair InstanceType: Type: String Description: EC2 instance type Default: t2.micro RDSInstanceClass: Type: String Description: RDS instance class Default: db.t2.micro DBName: Type: String Description: RDS database name Default: mlflowdb S3Name: Type: String Description: S3 name Default: mlflows300 MasterUsername: Type: String Description: RDS master username Default: mlflowuser MasterUserPassword: Type: String Description: RDS master user password NoEcho: true Default: mlflowpassword Resources: VPC: Type: 'AWS::EC2::VPC' Properties: EnableDnsSupport: "true" EnableDnsHostnames: "true" CidrBlock: !Ref VpcCidrBlock InternetGateway: Type: 'AWS::EC2::InternetGateway' Properties: Tags: - Key: Name Value: !Sub ${AWS::StackName}-InternetGateway AttachGateway: Type: 'AWS::EC2::VPCGatewayAttachment' Properties: VpcId: !Ref VPC InternetGatewayId: !Ref InternetGateway PublicSubnet1: Type: 'AWS::EC2::Subnet' Properties: VpcId: !Ref VPC CidrBlock: !Ref PublicSubnetCidrBlock1 AvailabilityZoneId: "apne1-az1" MapPublicIpOnLaunch: true PublicSubnet2: Type: 'AWS::EC2::Subnet' Properties: VpcId: !Ref VPC CidrBlock: !Ref PublicSubnetCidrBlock2 AvailabilityZoneId: "apne1-az2" MapPublicIpOnLaunch: true LocalSubnet1: Type: 'AWS::EC2::Subnet' Properties: VpcId: !Ref VPC CidrBlock: !Ref LocalSubnetCidrBlock1 AvailabilityZoneId: "apne1-az1" LocalSubnet2: Type: 'AWS::EC2::Subnet' Properties: VpcId: !Ref VPC CidrBlock: !Ref LocalSubnetCidrBlock2 AvailabilityZoneId: "apne1-az2" DBSubnetGroup: Type: AWS::RDS::DBSubnetGroup Properties: DBSubnetGroupDescription: Subnet group SubnetIds: - !Ref LocalSubnet1 - !Ref LocalSubnet2 ############################################################################# # public Subnet ############################################################################# RouteTable1: Type: 'AWS::EC2::RouteTable' Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${AWS::StackName}-RouteTable PublicRoute1: Type: 'AWS::EC2::Route' Properties: RouteTableId: !Ref RouteTable1 DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway SubnetRouteTableAssociation1: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: SubnetId: !Ref PublicSubnet1 RouteTableId: !Ref RouteTable1 RouteTable2: Type: 'AWS::EC2::RouteTable' Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${AWS::StackName}-RouteTable PublicRoute2: Type: 'AWS::EC2::Route' Properties: RouteTableId: !Ref RouteTable2 DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway SubnetRouteTableAssociation2: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: SubnetId: !Ref PublicSubnet2 RouteTableId: !Ref RouteTable2 ############################################################################# # MLflow RESOURCES SECTION # This section contains all the MLflow related resources ############################################################################# MLflowIAMRole: Type: 'AWS::IAM::Role' Properties: RoleName: TestManager AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: ec2.amazonaws.com Action: 'sts:AssumeRole' ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/AmazonRDSFullAccess MLflowInstanceProfile: Type: AWS::IAM::InstanceProfile Properties: Path: "/" Roles: - !Ref MLflowIAMRole MLflowEC2Instance: Type: AWS::EC2::Instance DependsOn: - MLflowRDSInstance - MLflowS3Bucket01 - MLflowSecurityGroup Properties: ImageId: ami-0dfa284c9d7b2adad # Specify the appropriate AMI ID for your region and instance type InstanceType: !Ref InstanceType IamInstanceProfile: !Ref MLflowInstanceProfile KeyName: !Ref KeyName UserData: Fn::Base64: !Sub - | #!/bin/bash FLAG_FILE=/var/log/first_run_completed # only first run if [ ! -f $FLAG_FILE ]; then sudo yum update -y sudo yum install -y python3 python3-pip sudo yum clean all sudo su ec2-user -c 'pip3 install -U pip' sudo su ec2-user -c 'pip3 install mlflow boto3 pymysql' sudo su ec2-user -c 'pip3 cache purge' export DB_ENDPOINT=`aws rds describe-db-instances --query "DBInstances[?contains(DBInstanceIdentifier, 'mlflow')].Endpoint.Address | [0]" --output text` echo "python3 -m mlflow server \ --host 0.0.0.0 \ --port 5000 \ --backend-store-uri mysql+pymysql://${USR}:${PW}@${!DB_ENDPOINT}:3306/${DB_NAME} \ --default-artifact-root s3://${S3_NAME}/mlflow-artifacts/" > /etc/start_mlflow.sh chmod 777 /etc/start_mlflow.sh fi # start mlflow server nohup sudo su ec2-user -c "/etc/start_mlflow.sh" & - { USR: !Ref MasterUsername, PW: !Ref MasterUserPassword, DB_NAME: !Ref DBName, S3_NAME: !Ref S3Name } NetworkInterfaces: - AssociatePublicIpAddress: "true" DeviceIndex: "0" GroupSet: - !Ref MLflowSecurityGroup SubnetId: !Ref PublicSubnet1 PrivateIpAddress: !Ref MLFlowPrivateIpAddress MLflowElasticIp: Type: 'AWS::EC2::EIP' MLflowInstanceAssociation: Type: 'AWS::EC2::EIPAssociation' Properties: AllocationId: !GetAtt MLflowElasticIp.AllocationId InstanceId: !Ref MLflowEC2Instance MLflowSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: Security group for MLflow server VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: 5000 ToPort: 5000 CidrIp: 0.0.0.0/0 - IpProtocol: tcp FromPort: 22 ToPort: 22 CidrIp: 0.0.0.0/0 DbSecurityByEC2SecurityGroup: Type: AWS::RDS::DBSecurityGroup DependsOn: - MLflowSecurityGroup Properties: GroupDescription: Ingress for Amazon EC2 security group EC2VpcId: !Ref VPC DBSecurityGroupIngress: - EC2SecurityGroupId: !Ref MLflowSecurityGroup MLflowRDSInstance: Type: 'AWS::RDS::DBInstance' Properties: DBSecurityGroups: - !Ref DbSecurityByEC2SecurityGroup DBSubnetGroupName: !Ref DBSubnetGroup AllocatedStorage: 5 DBInstanceClass: !Ref RDSInstanceClass Engine: mysql # Choose the appropriate RDS engine MasterUsername: !Ref MasterUsername MasterUserPassword: !Ref MasterUserPassword DBName: !Ref DBName MLflowS3Bucket01: Type: 'AWS::S3::Bucket' Properties: BucketName: !Ref S3Name ############################################################################# # Model developer ############################################################################# ModelDeveloperIAMRole: Type: 'AWS::IAM::Role' Properties: RoleName: ModelDeveloper AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: ec2.amazonaws.com Action: 'sts:AssumeRole' ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3FullAccess ModelDeveloperInstanceProfile: Type: AWS::IAM::InstanceProfile Properties: Path: "/" Roles: - !Ref ModelDeveloperIAMRole ModelDeveloperEC2Instance: Type: AWS::EC2::Instance Properties: ImageId: ami-07c589821f2b353aa # Specify the appropriate AMI ID for your region and instance type InstanceType: !Ref InstanceType IamInstanceProfile: !Ref ModelDeveloperInstanceProfile KeyName: !Ref KeyName UserData: Fn::Base64: !Sub - | #!/bin/bash sudo apt update -y sudo apt install -y python3 python3-pip su ec2-user <<-EOF pip3 install -U pip pip3 install boto3 EOF - {} NetworkInterfaces: - AssociatePublicIpAddress: "true" DeviceIndex: "0" GroupSet: - !Ref ModelDeveloperSecurityGroup SubnetId: !Ref PublicSubnet1 ModelDeveloperSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: Security group for Model Trainig server VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: 22 ToPort: 22 CidrIp: 0.0.0.0/0 Outputs: # Emit values needed for deployment status (e.g., where to SSH to) MLflowEC2IPAddress: Description: "MLflow EC2 Instance Public IP Address" Value: !GetAtt MLflowEC2Instance.PublicIp
300 USD分のAWS無料クレジットを申請して、もらった
AWSを開始するとすぐに、300USDほしくないですか?というメールがくる。ほかの方の記事を見てみると受け取れたり受け取れなかったりしているみたいなので、参考までに自分はどうやって申請したら受け取れたかを書いておく。
申請内容
あまり詳細は覚えていないのだが、住所や氏名、電話番号といった内容は普通に自分のことを記載。会社名は、個人的な勉強のために使用しているので、「Freelance」として、会社情報も適当に合わせて記載しておいた。個人的にはここは適当でも問題ないと思う。
AWSの利用状況
AWSは、以下の記事にある、5分ごとに実行されるBotを24時間稼働している。請求状況はCloudWatchから、0.3USDほど請求が来ていた。個人的にはこのような、AWSを使っているという実績が必要なのではないか?と思う。
AWSの無料枠でBitCoinの自動売買プログラム組んでみた - ぼうびろく
申請後
申請してから1週間程度で、300UDSあげたよ、というメールがくる。
【合格体験記】エンベデッドシステムスペシャリスト試験(令和5年度秋期) (午後Ⅰ 95点)
エンデベッドシステムスペシャリスト合格しました。思ったよりも高得点だったので合格体験記を書こうと思います。
過去に合格した試験
受験理由
業務が組み込みに近いので、自主学習がてら受けました。
勉強時間
- 午前II試験:2か月ほど前から、毎日通勤時間にスマホアプリで過去問解いてました。
- 午後I試験:2か月ほど前から、週8時間ほど参考書か過去問を解いていました。過去問は週2問くらいをじっくりといていました。
- 午後II試験:やってません。(小論文に変わったの知らなくてめちゃくちゃ焦った。。)
参考書
使ったのは以下の2つ。ESは覚えることが少ないので、参考書は必須ではないかもだけど、1冊はあると便利。
重要だと思ったこと
ESを受けるうえでの考え方として重要だと思ったのは以下。
各コンポーネントの役割を意識する
無意味なコンポーネントはないので、各タスクやHWが何のために存在しているのか意識して読むようにしていると、自然と組み込みの考え方が身につく
ユースケースを考えて、頭の中で動かしてみる
動かしてみると、各タスクの相互作用やデータの流れがわかってくるので、半分くらい読めたらユースケースを考えてみて、実際に動かしてみるといい。
異常系を考える
組み込みで難しいのは異常系だと思う。例えばHWが予期しない動作をした場合にどうするかや、事故や故障が起きた場合に安全に停止できるか、それをユーザが検知できるかや、天候によって性能が左右されないかなど、考えるときりがない。しかし異常系を予期しておくというのは非常に大切で、よく問題にも出るので、常に考える訓練をしておくといい。
結論ファースト(小論文)
小論文で必要なのは、相手に伝えること。なので結論から書いて、あとから肉付けをしていく。例えば、○○の機能仕様をかけ、みたいな出題であれば、「機能仕様は○○である。○○とは~~」みたいな文章がいい。ただし、この結論部分に事前説明が必要な固有名詞は入れないほうがいい。なぜなら、その分説明を記載する必要があり、文章が長くなり、結果としてわかりづらい内容となってしまう。 最も、固有名詞は文章を難読にするので、キーワード的に1,2個入れるのはいいと思うが、あまり入れないほうがいい。
公式サイトで試験内容の確認をする
記述問題かと思ったら小論文だった、みたいなことにならないように。気を付けましょう。(自戒)
PythonでDynamoDBをローカルにバックアップするコード
DynamoDBをダウンロードしてローカルにCSVで保存する。 1回のScanでは1MBしか取れないので、最後までループして取得している。
データはtimestamでソートしている。
import boto3 import pandas as pd aws_access_key_id = 'xxx' aws_secret_access_key = 'yyyy' region_name = 'ap-northeast-1' table_name = 'zzz' def scan_dynamodb_table(table_name): dynamodb = boto3.resource('dynamodb', region_name=region_name, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key) table = dynamodb.Table(table_name) items = [] last_evaluated_key = None while True: # ページネーションでScanを行います response = table.scan(ExclusiveStartKey=last_evaluated_key) if last_evaluated_key else table.scan() items.extend(response['Items']) # LastEvaluatedKeyがあれば次のページがあるということです if 'LastEvaluatedKey' in response: last_evaluated_key = response['LastEvaluatedKey'] else: break return items def export_to_csv(items, csv_file_path): df = pd.DataFrame(items) # timestamp列をIndexに設定 df.set_index('timestamp', inplace=True) # Indexでソート df.sort_index(inplace=True) # ファイル名に使用する最初と最後のタイムスタンプを取得 start_timestamp = str(df.index.min()) end_timestamp = str(df.index.max()) # ファイル名を作成 file_name = f"data_{start_timestamp}_{end_timestamp}.csv" # DataFrameをCSVファイルに保存 df.to_csv(file_name) if __name__ == "__main__": # DynamoDBテーブルをスキャンしてデータを取得します items = scan_dynamodb_table(table_name) # 取得したデータをCSVにエクスポートします export_to_csv(items)
OpenCVで重なりのある物体の領域分割
OpenCVで、重なりのある物体の領域分割をやってみた。 以下の関数では領域分割を行い、その領域をクロッピングして返却する。 領域分割がうまくいかない場合は、dist_factorを調整する。これは重なりが大きい場合は小さく、重なりが小さい場合は大きい値にする。
参考↓
def watershed_segmentation_with_bounding_boxes(image, dist_factor=0.35): mask = image.copy().astype(np.uint8) # 画像をグレースケールに変換 gray = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY) # 画像の前処理(例:ノイズ除去やコントラスト調整) blurred = cv2.GaussianBlur(gray, (5, 5), 0) # 画像の閾値処理を適用して、物体と背景を区別 _, thresh = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # 領域分割を行う前に、距離変換を計算 dist_transform = cv2.distanceTransform(thresh, cv2.DIST_L2, 5) _, sure_fg = cv2.threshold(dist_transform, dist_factor * dist_transform.max(), 255, 0) # sure_fgをもとにマーカーを作成 sure_fg = np.uint8(sure_fg) unknown = cv2.subtract(thresh, sure_fg) # マーカーにラベルを付ける _, markers = cv2.connectedComponents(sure_fg) # マーカーの背景を1に設定 markers = markers + 1 markers[unknown == 255] = 0 # Watershedアルゴリズムを適用 cv2.watershed(mask, markers) #show_img(markers) labels = np.unique(markers) contours_all = [] # 各領域の輪郭を検出 for label in labels[2:]: target = np.where(markers == label, 255, 0).astype(np.uint8) contours, _ = cv2.findContours(target, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) for c in contours: contours_all.append(c) # 輪郭を囲むバウンディングボックスを取得 bounding_boxes = [cv2.boundingRect(contour) for contour in contours_all] # バウンディングボックスを描画 #for x, y, w, h in bounding_boxes: # cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2) # 緑のバウンディングボックスを描画 croped = [] for (x, y, w, h), contour in zip(bounding_boxes, contours_all): mask = np.zeros(image.shape) mask = cv2.drawContours(mask, [contour], -1, (1, 1, 1), -1).astype(np.uint8) area = w*h if area > 4000 and area < 13000: # あるしきい値より大きな領域のみを描画 croped.append(cv2.resize((np.multiply(image.copy(), mask))[y:y+h, x:x+w, :], (16, 16))) return bounding_boxes, croped
Vision Transformerによるクラス分類を実装してみた
Vision Transformer (ViT)によるクラス分類を実装したのでメモしておく。
- 参考にしたサイト様
以下のコードは、Google colabで実行する。
必要なライブラリのインストール
!pip install timm seaborn linformer tqdm pandas
import (全部はいらないかも)
from __future__ import print_function import glob from itertools import chain import os import random import zipfile import matplotlib.pyplot as plt import numpy as np import pandas as pd import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from linformer import Linformer from PIL import Image from sklearn.model_selection import train_test_split from torch.optim.lr_scheduler import StepLR from torch.utils.data import DataLoader, Dataset from torchvision import datasets, transforms from tqdm.notebook import tqdm from sklearn.metrics import average_precision_score #from vit_pytorch.efficient import ViT import seaborn as sns import timm from pprint import pprint model_names = timm.list_models(pretrained=True) pprint([ x for x in model_names if "vit" in x])
ここで使えるViTのモデル一覧が表示されるので、使いたいものを選択する。
ハイパーパラメータの設定
# Training settings model_name = 'vit_medium_patch16_gap_384.sw_in12k_ft_in1k' batch_size = 48 epochs = 20 lr = 3e-5 gamma = 0.7 seed = 42 img_size = 384 weight_decay = 1e-3 device = 'cuda' def seed_everything(seed): random.seed(seed) os.environ['PYTHONHASHSEED'] = str(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True seed_everything(seed)
学習データのロード
data_path = "/content/drive/MyDrive/data" train_dir = data_path + '/train' test_dir = data_path + '/test' train_list = sorted(glob.glob(os.path.join(train_dir,'*.png'))) test_list = sorted(glob.glob(os.path.join(test_dir, '*.png'))) print(f"Train Data: {len(train_list)}") print(f"Test Data: {len(test_list)}") train_labels = np.loadtxt(data_path + "/train.csv", delimiter=',', dtype='int64', skiprows=1, usecols=[1]) test_labels = np.loadtxt(data_path + "/train.csv", delimiter=',', dtype='int64', skiprows=1, usecols=[1]) test_labels = np.zeros((len(test_list),)) train_df = pd.DataFrame({ 'data_path': train_list, 'label': train_labels }) test_df = pd.DataFrame({ 'data_path': test_list, 'label': test_labels }) random_idx = np.random.randint(1, len(train_df.data_path), size=9) fig, axes = plt.subplots(3, 3, figsize=(16, 12)) for idx, ax in enumerate(axes.ravel()): img = Image.open(train_df.data_path[idx]) ax.set_title(train_df.label[idx]) ax.imshow(img)
データセットを分割。
train_df, valid_df = train_test_split(train_df, test_size=0.1, stratify=train_df.label, random_state=seed) print(f"Train Data: {len(train_df)}") print(f"Validation Data: {len(valid_df)}") print(f"Test Data: {len(test_df)}")
前処理の設定。
train_transforms = transforms.Compose( [ transforms.Resize((img_size, img_size)), transforms.RandomResizedCrop(img_size), transforms.RandomRotation(degrees = 180), #transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.5), transforms.ToTensor(), transforms.RandomErasing(p=0.5), ] ) val_transforms = transforms.Compose( [ transforms.Resize(img_size), transforms.ToTensor(), ] ) test_transforms = transforms.Compose( [ transforms.Resize(img_size), transforms.ToTensor(), ] )
import numpy as np class MyDataset(Dataset): def __init__(self, df, transform=None, return_filepath=False): self.file_list = list(df.data_path) self.transform = transform self.label = list(df.label) self.return_filepath = return_filepath def __len__(self): self.filelength = len(self.file_list) return self.filelength def __getitem__(self, idx): img_path = self.file_list[idx] img = Image.open(img_path) img_transformed = self.transform(img) if not self.return_filepath: return img_transformed, self.label[idx] else: return img_transformed, os.path.basename(img_path)
train_data = MyDataset(train_df, transform=train_transforms) valid_data = MyDataset(valid_df, transform=test_transforms) test_data = MyDataset(test_df, transform=test_transforms, return_filepath=True) train_loader = DataLoader(dataset = train_data, batch_size=batch_size, shuffle=True ) valid_loader = DataLoader(dataset = valid_data, batch_size=batch_size, shuffle=True) test_loader = DataLoader(dataset = test_data, batch_size=1, shuffle=False) print(len(train_data), len(train_loader))
モデルの準備
model = timm.create_model(model_name, pretrained=True, num_classes=2).to(device) #model = torch.load(data_path + '/model_weight.pth').to(device)
# loss function criterion = nn.CrossEntropyLoss() # optimizer optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay) # scheduler scheduler = StepLR(optimizer, step_size=1, gamma=gamma)
学習の実行
train_acc_list = [] val_acc_list = [] train_loss_list = [] val_loss_list = [] ############## epochs = 20 m = nn.Softmax(dim=1) for epoch in range(epochs): epoch_loss = 0 epoch_accuracy = 0 for data, label in tqdm(train_loader): data = data.to(device) label = label.to(device) output = m(model(data)) loss = criterion(output, label) optimizer.zero_grad() loss.backward() optimizer.step() acc = (output.argmax(dim=1) == label).float().mean() #tmp = output.detach().cpu().numpy() #result = tmp[:, 1] #acc = average_precision_score(label.detach().cpu().numpy(), result, average='samples') epoch_accuracy += acc / len(train_loader) epoch_loss += loss / len(train_loader) with torch.no_grad(): epoch_val_accuracy = 0 epoch_val_loss = 0 for data, label in valid_loader: data = data.to(device) label = label.to(device) val_output = model(data) val_loss = criterion(val_output, label) acc = (val_output.argmax(dim=1) == label).float().mean() #tmp = val_output.detach().cpu().numpy() #result = tmp[:, 1] #acc = average_precision_score(label.detach().cpu().numpy(), result, average='samples') epoch_val_accuracy += acc / len(valid_loader) epoch_val_loss += val_loss / len(valid_loader) print( f"Epoch : {epoch+1} - loss : {epoch_loss:.4f} - acc: {epoch_accuracy:.4f} - val_loss : {epoch_val_loss:.4f} - val_acc: {epoch_val_accuracy:.4f}\n" ) torch.save(model, data_path + '/model_weight.pth') train_acc_list.append(epoch_accuracy) val_acc_list.append(epoch_val_accuracy) train_loss_list.append(epoch_loss) val_loss_list.append(epoch_val_loss)
学習結果のプロット
device2 = torch.device('cpu') train_acc = [] train_loss = [] val_acc = [] val_loss = [] for i in range(epochs): train_acc2 = train_acc_list[i].to(device2) train_acc3 = train_acc2.clone().numpy() train_acc.append(train_acc3) train_loss2 = train_loss_list[i].to(device2) train_loss3 = train_loss2.clone().detach().numpy() train_loss.append(train_loss3) val_acc2 = val_acc_list[i].to(device2) val_acc3 = val_acc2.clone().numpy() val_acc.append(val_acc3) val_loss2 = val_loss_list[i].to(device2) val_loss3 = val_loss2.clone().numpy() val_loss.append(val_loss3) #取得したデータをグラフ化する sns.set() num_epochs = epochs fig = plt.subplots(figsize=(12, 4), dpi=80) ax1 = plt.subplot(1,2,1) ax1.plot(range(num_epochs), train_acc, c='b', label='train acc') ax1.plot(range(num_epochs), val_acc, c='r', label='val acc') ax1.set_xlabel('epoch', fontsize='12') ax1.set_ylabel('accuracy', fontsize='12') ax1.set_title('training and val acc', fontsize='14') ax1.legend(fontsize='12') ax2 = plt.subplot(1,2,2) ax2.plot(range(num_epochs), train_loss, c='b', label='train loss') ax2.plot(range(num_epochs), val_loss, c='r', label='val loss') ax2.set_xlabel('epoch', fontsize='12') ax2.set_ylabel('loss', fontsize='12') ax2.set_title('training and val loss', fontsize='14') ax2.legend(fontsize='12') plt.show()
AWSの無料枠でBitCoinの自動売買プログラム組んでみた
AWSの勉強として、BitFlyerのAPIを使用して、BitCoinの自動売買プログラムを組んでみた。今回の主目的はAWSの勉強なので、実際儲かるかは不明。
※2023/12/30 追記 以下の構成だとCloudWatchの料金が0.1USD/Dayかかるので、完全無料ではないです。
準備するもの
システム構成
こんな感じ。(書き方あってる?)EventBridgeから5分ごとにトリガーを出し、そのトリガーでLambdaがBitCoinの価格を取得、DynamoDBに保存、ゴールデンクロスの場合、Bitcoinを成り行きで購入し、0.4%を上乗せして指値で売り注文を出す。(0.4% は、BitFlyerの手数料が0.15%なので)
実装
DynamoDBの準備
AWSの上部の検索欄に「DynamoDB」と入力し、DynamoDBへ移動。
リージョンに「東京」を選択。(どこでもいいが、DynamoDBやLambdaのリージョンは同じである必要あり)
新しいテーブルを追加。
以下のように入力して作成。
Lambdaの実装
AWSの上部の検索欄に「Lambda」と入力し、Lambdaへ移動。
関数の作成から、関数名、ランタイムを選択し、作成。
必要なライブラリの準備。Layerを選択。
レイヤーの追加から、以下を追加する。
- arn:aws:lambda:ap-northeast-1:770693421928:layer:Klayers-p39-requests:18
- arn:aws:lambda:ap-northeast-1:770693421928:layer:Klayers-p39-pandas:20
- arn:aws:lambda:ap-northeast-1:770693421928:layer:Klayers-p39-boto3:21
参考↓
ロールの権限を変更し、DynamoDBを読み書きできるようにする。 Lambdaの設定→一般設定の以下のリンクをクリック。
許可を追加。DynamoDBを選択し、以下にチェックを入れる。
- Query
- Scan
- BatchWriteItem
Lambdaの設定→一般設定から、タイムアウト時間を10秒にしておく。
コード
以下のコードを張り付ける。
import json import requests import time import pandas as pd import traceback import hashlib import hmac import json from datetime import datetime import os import boto3 from boto3.dynamodb.conditions import Key, Attr # ビットフライヤーのAPIキーとシークレット API_KEY = os.environ["API_KEY"] API_SECRET = os.environ["API_SECRET"] # APIエンドポイント API_URL = 'https://api.bitflyer.jp' # 注文情報 order_quantity = float(os.environ["ODER_QUANTITY"]) # 購入および売却するBTCの数量 product_code = 'BTC_JPY' # トレード対象の通貨ペア # 移動平均の期間設定 short_window = 10 long_window = 20 df = None table_name = os.environ["TABLE_NAME"] def get_market_price(): # 現在の市場価格を取得 ticker_url = f'{API_URL}/v1/ticker?product_code={product_code}' response = requests.get(ticker_url) data = response.json() print(data) return float(data['ltp']) def get_moving_averages(): global df # 移動平均を計算 historical_data_url = f'{API_URL}/v1/getticker?product_code={product_code}' response = requests.get(historical_data_url) data = response.json() new_data = { 'timestamp': data['timestamp'], 'best_ask': [float(data['best_ask'])], 'best_bid': [float(data['best_bid'])], } new_data = pd.DataFrame(new_data) new_data['timestamp'] = pd.to_datetime(new_data['timestamp']) df = pd.concat([df, new_data]) try: short_ma = df['best_bid'].rolling(window=short_window).mean()#.iloc[-1] long_ma = df['best_bid'].rolling(window=long_window).mean()#.iloc[-1] except: short_ma = 0 long_ma = 0 pass return short_ma, long_ma def place_market_order(side, quantity): # 成行注文を出す endpoint = '/v1/me/sendchildorder' order_url = API_URL + endpoint order_data = { 'product_code': product_code, 'child_order_type': 'MARKET', 'side': side, 'size': quantity, } body = json.dumps(order_data) headers = header('POST', endpoint=endpoint, body=body) response = requests.post(order_url, headers=headers, data=body) return response.json() def place_limit_order(side, price, quantity): # 指値注文を出す endpoint = '/v1/me/sendchildorder' order_url = API_URL + endpoint order_data = { 'product_code': product_code, 'child_order_type': 'LIMIT', 'side': side, 'price': price, 'size': quantity, } body = json.dumps(order_data) headers = header('POST', endpoint=endpoint, body=body) response = requests.post(order_url, headers=headers, data=body) return response.json() def header(method: str, endpoint: str, body: str) -> dict: timestamp = str(time.time()) if body == '': message = timestamp + method + endpoint else: message = timestamp + method + endpoint + body signature = hmac.new(API_SECRET.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).hexdigest() headers = { 'Content-Type': 'application/json', 'ACCESS-KEY': API_KEY, 'ACCESS-TIMESTAMP': timestamp, 'ACCESS-SIGN': signature } return headers def get_open_orders(): # 出ている注文一覧を取得 endpoint = '/v1/me/getchildorders' params = { 'product_code': 'BTC_JPY', 'child_order_state': 'ACTIVE', # 出ている注文だけを取得 } endpoint_for_header = endpoint + '?' for k, v in params.items(): endpoint_for_header += k + '=' + v endpoint_for_header += '&' endpoint_for_header = endpoint_for_header[:-1] headers = header('GET', endpoint=endpoint_for_header, body="") response = requests.get(API_URL + endpoint, headers=headers, params=params) orders = response.json() return orders def lambda_handler(event, context): global df dynamodb = boto3.resource('dynamodb') table = dynamodb.Table(table_name) ret = table.scan() df = pd.DataFrame(data=ret['Items']) try: df = df.sort_values('timestamp') except: print("sort error") pass try: # 現在の市場価格と移動平均を取得 current_price = get_market_price() short_ma, long_ma = get_moving_averages() with table.batch_writer() as batch: tmp = df.iloc[-1].to_dict() for k in tmp.keys(): if not k == "timestamp": tmp[k] = int(tmp[k]) else: tmp[k] = tmp[k].strftime('%Y%m%d%H%M%S') tmp[k] = int(tmp[k]) data = tmp batch.put_item(Item=data) # 注文が出されていない場合に短期移動平均が長期移動平均を上回った場合に購入 orders = get_open_orders() print(len(orders)) if os.environ["BUYSELL"] == "1": if (short_ma.iloc[-1] > long_ma.iloc[-1]) and (short_ma.iloc[-2] < long_ma.iloc[-2]) and \ len(orders) < int(os.environ["ODER_NUM"]): print("短期移動平均が長期移動平均を上回りました。購入注文を出します。") order_response = place_market_order('BUY', order_quantity) print("注文結果:", order_response) # 注文が成功したら約定価格に0.4%加算して売り注文を出す if 'child_order_acceptance_id' in order_response: executed_price = int(current_price * 1.004) # 約定価格に0.4%加算 print(f"約定価格: {executed_price}") sell_response = place_limit_order('SELL', executed_price, order_quantity) print("売り注文結果:", sell_response) # 売り注文が出されている場合は、一定の間隔で確認 elif len(orders) > 0: # ここに必要な確認処理を追加 pass except Exception as e: print(f"エラーが発生しました: {e}") print(traceback.format_exc()) return { 'statusCode': 200, 'body': json.dumps('') }
以下のDeployを押す。コードを変更したら、毎回Deploy押す必要あり。
環境変数を変更する。
設定する環境変数は以下の通り。自分の環境に合わせて変更する。
- API_KEY:BitFlyerのAPI_KEY
- API_SECRET:BitFlyerのAPI_SECRET
- BUYSELL:売買を行うかどうか。1の場合、売買を行う。
- ODER_NUM:注文数の上限。3など。
- ODER_QUANTITY:1回の注文で購入するBitCoin。0.003など。
- TABLE_NAME:DynamoDBのテーブル名。test
以下のTestを押すと関数が実行される。初回はテストの設定が必要だが、デフォルトのままでOK。
成功した場合には以下のようなログになる。タイムアウトする場合は、タイムアウト時間を延ばす。
DynamoDBに保存されているかの確認
DynamoDBへ行き、テーブルアイテムの検索を押す。
スキャンを行い、以下のように表示されればOK。
トリガーの追加
以下のトリガーを追加をクリック。
以下のように入力し、トリガーを作成。間隔は自由に変更可。
トリガーが作成されると、Lambdaが自動で実行されるようになる。