Pythonのloggingについて
基本的な使用方法
import logging
# StreamHandlerが追加される
logging.basicConfig(level=logging.INFO)
# ルートロガーを取得
logger = logging.getLogger()
logger.info("hoge") # コンソールに出力される
logger.debug("fuga") # info未満は出力されない
print(logger.handlers) # [<StreamHandler <stderr> (NOTSET)>]
logging.debug/info/warning/error/criticalなどのグローバル関数を呼び出すと、ルートロガーにハンドラーがない場合、内部でbasicConfigが自動的に呼ばれてデフォルトのハンドラーが設定されるため注意する
import logging logging.error("error") # ルートロガーにハンドラーがない場合、デフォルトのハンドラーが設定される logging.basicConfig(level=logging.INFO) # basicConfigが呼び出されない logger = logging.getLogger() logger.info("info") # forceをTrueにすれば上書きされる logging.basicConfig(level=logging.INFO, force=True)
loggerの親子関係
- getLogger関数はルートロガーまたは名前付きロガーを返す
- 名前付きロガーには親子関係がある
- 名前のドットで区切りで親子関係が作られる
- 子のロガーを呼び出すと、親のロガーのハンドラーも順次呼び出される
ロガーの親子関係
RootLogger
└─ Logger("hoge")
└─ Logger("hoge.fuga")
子のLoggerから親を呼び出したくない場合は、セットアップ時にpropagateプロパティを指定する
from logging import getLogger logger = getLogger(__name__) logger.propagate = False # 親のhandlersを呼び出さない
特定のロガーの設定
- 基本的にはRootロガーに想定する共通のハンドラーをセットアップする(セットアップはアプリケーション側で一度だけ初期化するのが望ましい)
- 各呼び出し元のモジュールで
getLogger(__name__)として名前付きロガーを作成して、モジュール単位で管理するのが良い
from logging import getLogger, StreamHandler, Logger, Formatter def get_console_logger(name: str, level: int) -> Logger: logger = getLogger(name) logger.setLevel(level) # すでにハンドラーが追加されている場合 if logger.handlers: return logger stream_handler = StreamHandler() formatter = Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") stream_handler.setFormatter(formatter) logger.addHandler(stream_handler) return logger
Ruby on Railsを少し触ってみた
Ruby on Railsをあまり触ったことがないので、少しだけ触ってみたのでまとめておく
メモ
使用したRailsのバージョン
rails: 8.0.1 database: postgresql
テスト等で使えそうなライブラリ
- rpec-rails
- factory_bot
- テスト時にモデルファクトリを簡単に定義できる
- faker
- 名前などの様々な架空のデータを生成できる
- gimei
- 日本の人名や住所などを生成でき、カナ名も取得できる(Fakerだとカナ名は無理)
プロジェクトの作成
# APIモードで新規プロジェクトを作成する rails new app --API --database=postgresql
データベースの接続設定
app/config/database.ymlの接続情報を修正する
default: # 共通パラメータ # ... development: # 開発用DB username: postgres password: postgres host: localhost port: 5432 test: # テスト用DB # ... production: # 本番用DB # ...
DB操作コマンド
# データベースの作成 rails db:create # データベースの削除 rails db:drop # migrateで作成したスキーマをデータベースに適用する rails db:schema:load # 最新のマイグレーションの実行(マイグレーションがすでにある場合) rails db:migrate # シードを挿入 rails db:seed # データベースのリセット(drop -> create -> seed) rails db:reset
マイグレーション
スキーマを定義する場合は次のコマンドを生成する
# モデルの作成 ## gはgenerateの省略(generateでも可) rails g model User username:string hashed_password:string
app/db/migrationsにタイムスタンプ付きのマイグレーションファイルが作成されるので、コマンドで生成できない部分を追記する。
ユーザーと投稿モデルをそれぞれ新規マイグレーションした場合のサンプルコードを下記に示す。
# ユーザーモデルの作成 class CreateUser < ActiveRecord::Migration[8.0] def change create_table :users do |t| t.string email, null: false t.string hashed_password, null: false t.string nickname, null: false t.timestamps end add_index :users, :email, unique: true end end # 投稿モデルの作成 class CreatePost < ActiveRecord::Migration[8.0] def change create_table :posts do |t| t.references :users, null: false, foreign_key: true t.string :title, null: false t.string :content, null: false t.timestamps end end end
参照する側ではt.refernces <参照リソース名の複数形>のように宣言し、引数にはnull制約や外部キー制約を設定できる。
モデルの関連付け
アプリケーション側のロジックを管理するクラスはApplicationRecordを継承したモデルクラスapp/models/xxx.rbを修正する。
アプリケーション上で属性として関連モデルを取得する場合にも、マイグレーションで参照を関連付けたうえで、別途このクラス内で関連付けを定義する必要がある。
ユーザーと投稿が1:N(has many)の関係にある場合には、次のような定義を追加する。
class User < ApplicationRecord has_many :posts end class Post < ApplicationRecord belongs_to :user end
こうすることで、Userからは複数のPosts、Postからは一つのUserが対応付けされる。
シードデータの用意
db/seeds.rbを編集してシードデータを用意する。
# hashmapのリストを渡すと一括登録できる User.create!(email: "admin@example.com", nickname: "admin", hashed_password: BCrypt::Password.create("password"))
コントローラの作成
定義したモデルに対して、APIのアクセスポイントを定義するにはコントローラを生成する。
# usersはURLのパス/usersに対応する rails g controller users # api/v1のようなプレフィックスが欲しい場合は、次のようにする rails g controller api/v1/users
次のメソッドを実装することで、それぞれ対応するエンドポイントを作成することができる。 (名前はその慣例に従う必要があるが、カスタムすることもたぶんできる)
class UsersController < ApplicationController before_action :set_user, only: [ :show, :update, :destroy ] # GET /users def index users = User.all render json: users.as_json, status: :ok end # GET /users/{id} def show render json: @user.as_json, status: :ok end # POST /users def create user = User.new(user_params) if user.save render json: user.as_json, status: :created else render json: user.errors.full_messages, :unprocessable_entity end end # PATCH/PUT /users def update if @user.update(user_params) render json: user.as_json, status: :ok else render json: user.errors.full_messages, status: :unprocessable_entity end end # DELETE def destroy @user.destory head :no_content end private # パラメータの必須フィールドを指定する def user_params params.require(:user).permit(:email, :password, nickname) end # before_actionで紐づけることで任意のメソッドの呼び出し時に変数をセットできる def set_user @user = User.find(params[:id]) end end
プライベートメソッド
- before_actionとプライベートメソッドを組み合わせて、必要なパラメータのバリデーションを行うことができる(railsではよく用いられている様子)
レスポンス形式をカスタマイズする
レスポンスフィールドをカスタマイズしたい場合には、いくつか方法があるようだがApplicationRecordのas_jsonメソッドの引数onlyに返したいフィールドを指定すれば良い。 共通化したい場合などは、as_jsonメソッドをオーバーライドする。
class User < ApplicationRecord # ... def as_json super(only: [:id, :email, :nickname, :created_at, :updated_at, :deleted_at]) end end
テスト
RSpecの用意
次のコマンドを打つとspecというディレクトリと設定ファイルが生成される。
rails g rspec:install
FactoryBotでテストデータを生成する
spec/factoriesにUserモデルのファクトリを定義してみる。
FactoryBot.define do factory :users do transient do password { "password" } end email { Faker::Internet.email } nickname { "nickname" } after(:build) do | user, evaluator | user.hashed_password = BCrypt::Password.create(evaluator.password) end end end
テストコード内で次のように呼び出す
# 単体を作成する # フィールドは引数に渡して設定できる user = FactoryBot.create(:user, password: "12345") # 複数作成する users = FactoryBot.create_list(:user, 10, password: "password")
APIのテストを書く
次のコマンドを打って、Usersコントローラのテストファイルを生成する。
spec/requests/users.specが作成される。
rails g rspec:controller users
一覧取得と作成のAPIテストを書いてみる。
RSpec.describe "Users", type: :request do describe "GET /index" do it "ユーザー一覧の取得に成功する" do FactoryBot.create_list(:user, 10) get "/api/v1/users" expect(response).to have_http_stastus(:ok) json = JSON.parse(response.body) expect(json.length).to 10 end end describe "POST /" do valid_params = { nickname: "nickname", email: "test@example.com", password: "password" } expect { post "/api/v1/users", params: { user: valid_params } }.to change(User, :count).by(1) expect(response).to have_http_status(:created) json = JSON.parse(response.body) expect(json[:nicname]).to eq(valid_params[:nickname]) expect(json[:nicname]).to eq(valid_params[:email]) end end
SQLAlchemyのセッションについて
SQLAlchemyのSessionをよくわからないままなんとなく使っていたので整理してみようと思います。
公式ドキュメントでscoped_sessionについて説明している記事があるのでこちらの内容を参考にしました。 docs.sqlalchemy.org
sessionmaker
sessionはsqlalchemyの機能を使ってDBにアクセスするために必要なオブジェクトで、sessionmakerを使うことでセッションを生成するファクトリを取得できます。
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session engine = create_engine(DATABASE_URL) Session = sessionmaker(bind=engine)
セッションファクトリーはそのまま呼び出すことでグローバルなセッションを取得することができます。
# 実際に使用する場合 with Session() as db: # DBアクセスの処理
また、セッションファクトリはセッションとして振る舞うことができます。
Session.add(user) Session.commit()
FastAPIなどでDIとして注入する場合は次のような関数を定義して使うことが多いと思います。
async def get_db(): with Session() as db: try: yield db db.commit() except: Exception as e: logger.error(e)
注意
他の記事でも触れられていましたが、sessionオブジェクトはクローズdb.close()しても、現在のセッションが破棄されるだけで、オブジェクト自体は有効のようです。つまり、session.close()したあとでもsession.queryなどを呼び出せてしまいます(内部で再びセッションが張られる)。例えば、FastAPIで非同期タスクを実行する際に、Dependsで注入したセッションオブジェクトをタスク側に渡してしまうと、API側でsession.closeを呼び出してもセッションオブジェクトがタスク内で生き続けてしまう可能性があります。このため、タスク内部では別にセッションを取得する必要がありそうです。
zenn.dev
@route.post("/some_task) async some_task(background_tasks: BackgroundTasks, db: Session = Depends(get_db)): background_tasks.add_task(task, db) return { "message": "created task" }
scoped_session
scoped_sessionはスレッドセーフなセッションファクトリを返します。scoped_sessionにより取得したセッションファクトリは、同一スレッド内においてセッションが閉じるまで常に同一のセッションを返し(最初に生成したセッションをキャッシュする)、セッションが閉じた後は新しいセッションが生成されます。
# スコープ化されたセッションを生成するファクトリーオブジェクトを返す。 SessionLocal = scoped_session(sessionmaker(engine=engine)) some_session = SessionLocal() some_other_session = SessionLocal() some_session is some_other_session # True # セッションを破棄する some_session.remove() # 新しいセッションが生成される new_session = Session() new_session is some_session # False
scoped_sessionは内部でthreading.local(スレッドごとに分離したインメモリなストレージオブジェクトを返す)を使っていて、commitやrollbackなどで明示的にセッションが閉じなかった場合は各スレッドが終わるタイミングでGCによりセッションが解放されるので、この点についても、安全にマルチスレッド環境でセッションを利用できるようです。
まとめ
簡単にsessionmakerとscoped_sessionについて整理してみました。FastAPIのような非同期処理をサポートするWebAPI等でSQLAlchemyを利用する場合は、とりあえずscoped_sessionでセッションを生成しておけば問題なさそうです。
FastAPIではuvicornなどの非同期処理が主流のため、scoped_sessionの使用は不要でした(スレッドの影響は考慮する必要がない)。上述の通り、withを使ったリクエスト内でのみ有効なsessionを作る場合は、ライブラリが提供するSessionクラスまたはsessionmakerから生成したファクトリーオブジェクトから生成します。
SessionLocal = sessionmaker(engine=engine) def get_db(): with SessionLocal() with db: ...
Goのcontext周りの話
Goには非同期処理をうまく管理する方法としてcontextライブラリが用いられるが、このcontext.Contextについて勘違いしていた点があったので改めて整理してみる。
contextライブラリ
context.Contextはインターフェースとして、次のように定義されている。
type context.Context interface { // 設定した期日を返す(設定されていない場合はok == false) DeadLine() (deadline time.Time, ok bool) // キャンセルされた場合に値を返す(キャンセルが設定されていない場合はnilを返す) Done() <-chan struct{} // セットされたエラーを返す Err() error // 任意のkeyでセットされた値を返す Value(key any) any }
ここでは、Valueメソッドに関する使い方を整理する。 WithValue関数は、現在のcontextと任意のkeyとvalueを受け取り、値を埋め込んだ新しいContext返す。 例えば、次のように使う。
func Parent(ctx context.Context) { userID := getUserID() ctx = context.WithValue(ctx, "userID", userID) Child(ctx) } func Child(ctx context.Context) { userID, ok := ctx.Value("userID").(string) }
勘違いしていた話
ginやechoなどの代表的なWebフレームワークでは独自の*gin.Contextや*echo.Contextがハンドラーの引数として渡されます。
func (h *UserHandler) CreateUser(c *gin.Context) { // do something }
context.Contextをなんとなく理解していたのでcontext.Contextとは別物だと勘違いしていたのですが、context.Contextはインターフェースなのでインターフェースとしての実装を満たしていれば任意のものを渡すことができるんですよね。
なので、APIのハンドラーから下のサービスレイヤーへ渡す場合などに、*gin.Contextや*echo.Contextをcontext.Contextとしてそのまま渡すことができます。
// ユーザを作成するメソッド func (u *UserService) CreateUser(ctx context.Context, db *gorm.DB, name string, email string) (*models.User, error) { // } // ユーザーを作成するAPIハンドラ func (h *UserHandler) CreateUser(c *gin.Context) { // user, err := userService.CreateUser(c, h.db, name, email) }
uvでpythonのプロジェクトを管理する
最近uvを使い始めたので備忘録として簡単にまとめてみる
uvとは?
使ってみる
# 初期化したいプロジェクトフォルダに移動 cd <project-dir> # プロジェクトを初期化 uv init . # .venvを作成 uv sync # pythonバージョン指定 uv python pin 3.12 # パッケージ追加 uv add <package-name> # 開発時に使用するパッケージを追加 uv add --dev <package-name> # パッケージを削除 uv remove <package-name> # .venvで管理するpythonを起動する uv run python # 仮想環境を有効にする source .venv/bin/activate # 仮想環境を無効にする deactivate
プロジェクト構成について
pythoのプロジェクト構成に関してはflatレイアウトとsrcレイアウトがあるが、
- 外部パッケージとして開発する場合はsrcレイアウト
- アプリケーションとして開発する場合はflatレイアウト で良さそう。 ※外部パッケージを開発する際にflatパッケージだとモジュールを直接参照する可能性があり、ビルドしたパッケージの挙動を正常にテストできなくなることがあるとのこと(テストではうまくいったのに、ビルド後に期待通り動かないといった問題が発生する可能性がある)
パッケージを参照する
uvに限った話ではないが、パッケージをプロジェクトルートのappディレクトリに作成していて、別のディレクトリのモジュールからappモジュールを内部で参照したいときは、PYTHONPATHにappがあるパスを指定すれば良い。
# プロジェクトルートから実行する場合 PYTHONPATH=. uv run python ./scripts/hoge.py
Juliaの非同期タスクを強制終了させる
Juliaの非同期処理は次のようにTaskとして扱われます。
task = @async begin println!("hoge") end
ここで、タスク内でループ実行する場合などに途中で中断したくなった場合、次のようにして中断させることができます。
task = @async begin while true # do something end end # 任意のタイミングで次を実行する schedule(task, InterruptException(), error=true)
VSCodeでnotebookセルを実行したときのauto scrollを無効にする
VSCodeではNotebook形式のファイルも扱えて色々便利なのですが、デフォルト設定では各セルを実行したときに勝手にスクロールされてしまい、 直前にどこを実行したのかわからなくなることがあり、無効にする方法を探してみました。 以下の設定を追加で無効にできました。
settings.jsonに以下を追加する。
{ // これを追加する "notebook.scrolling.revealNextCellOnExecute": "none", }
ソースはこちら github.com