SQLiteファイルロックの仕組みと競合対策:開発者が知るべきエラーと解決策
SQLiteのファイルロックの基本
SQLiteは、データベース全体を単一のファイルとしてディスクに保存します。このため、同時アクセス制御は、ファイルシステムレベルのロックメカニズムに依存します。これにより、複数のプロセスが同時に同じデータベースファイルに書き込もうとする場合に、データの整合性が保たれるようになっています。
SQLiteのファイルロックにはいくつかの状態があり、これらを組み合わせて同時実行性を制御しています。
-
EXCLUSIVE (排他ロック):
- データベースファイルへの書き込みを行うために必要なロックです。
- EXCLUSIVEロックは、同時に1つしか存在できません。
- EXCLUSIVEロックがアクティブな間は、他のいかなる種類のロックも許可されません。
- SQLiteは、同時実行性を最大化するために、EXCLUSIVEロックが保持される時間を最小限に抑えるように動作します。
-
PENDING (ペンディングロック):
- データベースへの書き込みを試みるプロセスが、他のSHAREDロックが解除されるのを待っている状態です。
- PENDINGロックが存在すると、新しいSHAREDロックの取得はブロックされます。
- 既存のSHAREDロックはそのまま保持できます。
-
RESERVED (予約ロック):
- プロセスがデータベースファイルに書き込む予定があることを示します。
- RESERVEDロックを保持しているプロセスは、まだ実際に書き込みを開始していません。
- 複数のSHAREDロックと共存できますが、RESERVEDロックは1つしか同時に存在できません。
- これにより、書き込みを行う前に他のリーダーが読み取りを完了するのを待つことができます。
-
SHARED (共有ロック):
- データベースの読み取りは可能ですが、書き込みはできません。
- 複数のプロセスが同時にSHAREDロックを保持できます。つまり、複数のリーダーが同時に存在できます。
- SHAREDロックがアクティブな間は、他のスレッドやプロセスはデータベースファイルに書き込むことができません。
-
UNLOCKED (アンロック):
- データベースファイルに何のロックもかかっていない状態です。
- 他のプロセスは自由に読み書きができます。
- SQLiteがデータベースを開いたときのデフォルトの状態です。
同時実行性 (Concurrency) の仕組み
SQLiteバージョン3の同時実行性モデルは、主に以下の動作に基づいています。
-
読み取り時の動作:
- プロセスがデータベースを読み取る場合、まずSHAREDロックを取得しようとします。
- SHAREDロックは複数のプロセスが同時に取得できるため、複数のプロセスが同時にデータベースを読み取ることができます(「多重読み取り」が可能)。
- もしSHAREDロックを取得できない(例えば、他のプロセスがすでにEXCLUSIVEロックを保持している場合)は、
SQLITE_BUSY
エラーが返されることがあります。
-
書き込み時の動作:
- プロセスがデータベースに書き込む場合、まずRESERVEDロックを取得します。これは、将来的に書き込みを行う意図があることを示します。
- 次に、PENDINGロックを取得し、その後EXCLUSIVEロックを取得します。
- EXCLUSIVEロックは排他ロックであるため、このロックを取得できるのは1つのプロセスのみです。この間、他のすべての読み取りや書き込みはブロックされます。
- 書き込みが完了し、トランザクションがコミットされると、EXCLUSIVEロックは解放されます。
ジャーナルファイルによるACID特性の保証
SQLiteは、ACID (原子性、一貫性、独立性、永続性) 特性を保証するために、ジャーナルファイルを使用します。
-
WAL (Write-Ahead Logging):
- SQLiteバージョン3.7以降で導入された、より高い同時実行性を提供するジャーナリングモードです。
- WALモードでは、書き込みはデータベースファイルに直接行われず、まずWALファイル(Write-Ahead Logファイル)に追記されます。
- これにより、書き込み操作中もリーダーがデータベースファイルから読み取りを続けることができます。つまり、「読み取りと書き込みの並行処理」が可能になります。
- 定期的にWALファイルの内容がデータベースファイルにマージされます(チェックポイント処理)。
- 複数の書き込み要求がある場合でも、WALファイルへの書き込みはシリアル化されますが、データベースファイル全体のロックが短時間で済むため、全体のパフォーマンスが向上します。
-
ロールバックジャーナル (Rollback Journal):
- デフォルトのジャーナリングモードです。
- 書き込みトランザクションが開始される前に、変更される予定のデータベースページの元の内容がジャーナルファイルに書き込まれます。
- トランザクションが成功裏にコミットされると、ジャーナルファイルは削除されます。
- 途中でクラッシュした場合、ジャーナルファイルの情報を使ってデータベースを元の状態にロールバックできます。
- 書き込み操作中はEXCLUSIVEロックが取得されるため、この間は他の読み取りや書き込みはできません。
複数のプロセスやスレッドが同時にデータベースにアクセスしようとすると、ロックの競合が発生し、SQLITE_BUSY
エラーが返されることがあります。これは、特に書き込み操作中に顕著です。
- ビジータイムアウト (Busy Timeout):
- SQLiteでは、
sqlite3_busy_timeout()
関数を使って、ロックが利用可能になるまで待機する時間を設定できます。 - このタイムアウト期間内にロックを取得できない場合、
SQLITE_BUSY
エラーが返されます。 - 適切にタイムアウトを設定することで、アプリケーションがロックのために無限に待機するのを防ぎ、エラー処理を行うことができます。
- SQLiteでは、
SQLiteバージョン3のファイルロックと同時実行性に関する重要なポイントは以下の通りです。
- ロックの競合とエラー処理: ロックの競合が発生した場合、
SQLITE_BUSY
エラーが発生することがあります。busy_timeout
を設定し、適切なエラー処理を実装することが重要です。 - WALモードによる同時実行性の向上: WALモードを使用することで、読み取りと書き込みの並行処理が可能になり、高い同時実行性を実現できます。
- ジャーナルファイルによる整合性: クラッシュリカバリとACID特性はジャーナルファイルによって保証されます。
- 単一のライター: データベースへの書き込みは一度に1つのプロセスしか行えません。
- 複数のリーダーは可能: 複数のプロセスが同時にデータベースを読み取ることができます。
一般的なエラー
SQLiteで同時実行性の問題が発生した場合、最も頻繁に遭遇するエラーは以下のものです。
-
- 原因:
- 別のプロセスまたはスレッドがデータベースファイルに排他ロック(書き込みロック)をかけているため、現在の操作(読み取りまたは書き込み)がブロックされている状態です。
- 読み取り操作中に、別のプロセスが書き込みを開始しようとしている場合。
- 書き込み操作中に、別のプロセスが読み取りまたは書き込みを開始しようとしている場合。
- トランザクションが長時間実行されており、ロックが解放されていない場合。
- ネットワークファイルシステム(NFSなど)上でSQLiteデータベースを使用している場合、ファイルロックの挙動が不安定になることがあります。
- アプリケーションのロジックに問題があり、トランザクションが適切にコミットまたはロールバックされずにロックが保持されたままになっている場合。
- 症状:
- クエリの実行が失敗し、「database is locked」というエラーメッセージが表示される。
- アプリケーションがフリーズしたり、応答しなくなる。
- 書き込み操作がタイムアウトする。
- 原因:
-
SQLITE_LOCKED
- 原因:
SQLITE_BUSY
と似ていますが、より具体的なロックの競合を示します。これは、通常、SQLITE_BUSY
よりも深刻な競合状態が発生した場合に返されます。- 一般的には、ある接続が保留中のトランザクションを持っており、別の接続がそのトランザクションが使用しているリソースをロックしようとしている場合に発生します。
- 症状:
- データベース操作が失敗し、
SQLITE_LOCKED
というエラーコードが返される。
- データベース操作が失敗し、
- 原因:
-
ジャーナルファイルやWALファイルの残留
- 原因:
- アプリケーションのクラッシュ、電源障害、または不適切な終了により、トランザクションが完了する前にプロセスが終了した場合。
- これにより、
.db-journal
または.db-wal
、.db-shm
(WALモードの場合) といった一時ファイルがデータベースファイルの隣に残ることがあります。
- 症状:
- 特に問題なく動作することが多いですが、古いジャーナルファイルが存在することで、データベースの整合性チェックに時間がかかったり、リカバリプロセスが開始されたりすることがあります。
- 極稀に、データベースの破損につながる可能性もあります。
- 原因:
-
データベース破損 (
SQLITE_CORRUPT
)- 原因:
- 不適切なシャットダウン(特にジャーナルモードがWALでない場合)。
- データベースファイルがネットワークドライブやクラウドストレージ上にあり、ファイルロックが正常に機能しない環境。
- SQLiteバージョン間の互換性の問題(稀)。
- ディスクの故障やストレージの問題。
- 症状:
- 「database disk image is malformed」のようなエラーメッセージが表示される。
- 特定のテーブルやデータにアクセスできなくなる。
- アプリケーションがクラッシュする。
- 原因:
これらのエラーに遭遇した場合のトラブルシューティングと解決策は以下の通りです。
-
SQLITE_BUSY
の対処PRAGMA busy_timeout
の設定:- 最も一般的な解決策です。SQLiteは、ロックが利用可能になるまで自動的に待機する時間を設定できます。
PRAGMA busy_timeout = 5000;
(5000ミリ秒 = 5秒) のように設定します。これはデータベース接続ごとに設定する必要があるため、アプリケーションの起動時またはデータベース接続確立時に毎回設定するのが良いでしょう。- これでも解決しない場合、アプリケーション側でリトライロジックを実装し、一定回数リトライ後にエラーを報告するようにします。
- WAL (Write-Ahead Logging) モードの活用:
PRAGMA journal_mode = WAL;
を設定することで、同時実行性を大幅に向上させることができます。- WALモードでは、書き込み中でも複数の読み取りが可能になります(リーダーは古いバージョンのデータを読み取ります)。これにより、
SQLITE_BUSY
エラーの発生頻度が大幅に減少します。 - データベース接続を開いた直後に設定するのが推奨されます。
- ただし、WALモードは、通常モード(Rollback Journal)とは異なるファイル(
.db-wal
と.db-shm
)を使用するため、バックアップ時にはこれらのファイルも含む必要があります。
- トランザクションの短縮化:
- 書き込みトランザクションをできるだけ短く、かつアトミックに保ちます。不要な処理をトランザクション内で行わないようにします。
- 大きなバッチ処理を行う場合は、小さなトランザクションに分割して実行することを検討します。
- 不要なロックの解放:
- トランザクションが完了したら、必ず
COMMIT
またはROLLBACK
を実行し、ロックを解放します。 - プリペアドステートメントを使用している場合、処理が完了したら
sqlite3_finalize()
(またはそれに相当する言語固有のAPI)を呼び出すことで、リソースを解放します。
- トランザクションが完了したら、必ず
- アプリケーションレベルでの排他制御:
- 非常に多数の同時書き込みが予想される場合、アプリケーション側でミューテックスやセマフォなどの同期プリミティブを使用して、データベースへの書き込みアクセスをシリアル化することを検討します。ただし、これは複雑になる可能性があります。
-
ジャーナルファイルやWALファイルの残留の対処
- 通常、これらはSQLite自身が自動的に処理します。手動で削除する必要はほとんどありません。
- ただし、クラッシュ後にこれらのファイルが残っている場合でも、SQLiteは次にデータベースを開いたときに適切にリカバリを試みます。
- もし、これらのファイルが残っていること自体が問題である場合(例えば、ファイルシステムが満杯になるなど)、それは通常、アプリケーションのクラッシュ頻度が高いことを示しており、根本原因(メモリリーク、例外処理の不備など)を特定する必要があります。
-
データベース破損の対処
PRAGMA integrity_check
の実行:- データベースが破損している可能性がある場合、
PRAGMA integrity_check;
を実行して整合性をチェックします。これにより、破損している場所や原因のヒントが得られることがあります。
- データベースが破損している可能性がある場合、
- バックアップからの復元:
- 最も確実な方法です。定期的にデータベースのバックアップを取ることが極めて重要です。破損した場合、健全なバックアップから復元します。
- ダンプと再構築:
- もしデータベースが部分的に読み取り可能であれば、
.dump
コマンド(SQLiteのCLIツールなど)でデータをSQLスクリプトとしてエクスポートし、新しいデータベースファイルにインポートし直すことで復旧できる場合があります。 - 例:
sqlite3 old.db .dump | sqlite3 new.db
- もしデータベースが部分的に読み取り可能であれば、
- WALモードへの切り替え:
- WALモードはクラッシュリカバリの堅牢性が向上するため、破損の発生リスクを低減できます。
- ネットワークファイルシステムでの使用を避ける:
- NFSのようなネットワークファイルシステム上でのSQLiteデータベースの使用は、ファイルロックの動作が不安定になり、破損の原因となることが多いため、可能な限り避けるべきです。共有データベースが必要な場合は、クライアント/サーバー型データベース(PostgreSQL, MySQLなど)を検討してください。
- デバッグログの活用: アプリケーションによっては、SQLiteの内部的な動作(ロックの取得状況など)をデバッグログに出力する機能がある場合があります。これを活用して、問題の発生状況を詳しく調査します。
- 単一のプロセスでデータベースにアクセスする: SQLiteは本来、単一のアプリケーションプロセスがデータベースにアクセスするように設計されています。複数のアプリケーションプロセスが同じSQLiteファイルに同時にアクセスする必要がある場合、注意深い設計とテストが必要です。
- 適切な同期モードの選択:
PRAGMA synchronous
の設定は、パフォーマンスと耐久性のトレードオフに関わります。FULL
(デフォルト): 最も安全ですが、最も遅いです。NORMAL
(WALモードで推奨): 十分な安全性を保ちつつ、パフォーマンスを向上させます。OFF
: 最速ですが、クラッシュ時にデータ損失のリスクがあります。非推奨。
- エラーハンドリングの徹底: SQLiteのAPIがエラーコードを返した場合は、必ずそれを適切にハンドリングし、ログに出力するなどして問題を早期に特定できるようにします。
SQLITE_BUSY エラーと busy_timeout の例
この例では、2つの異なるプロセス(またはスレッド)が同時にデータベースファイルにアクセスしようとしたときに、SQLITE_BUSY
エラーがどのように発生し、busy_timeout
を設定することでどのようにそのエラーを処理できるかを示します。
シナリオ:
- プロセスBが、その書き込み中にデータベースへのアクセスを試みます。
- プロセスAが長時間かかる書き込みトランザクションを開始します。
Pythonの例
import sqlite3
import time
import threading
import os
DB_FILE = "test_busy_timeout.db"
def setup_db(conn):
"""データベースの初期設定"""
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS users")
cursor.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
conn.commit()
def process_a():
"""長時間書き込みトランザクションを実行するプロセス"""
print("プロセスA: データベースに接続中...")
conn = sqlite3.connect(DB_FILE)
conn.isolation_level = None # オートコミットモード (各ステートメントが個別のトランザクション)
# 明示的なBEGIN/COMMITを使用する場合: conn.isolation_level = 'DEFERRED'
conn.execute("PRAGMA journal_mode = DELETE;") # デフォルト(ロールバックジャーナル)を使用
print("プロセスA: 長時間かかる書き込みトランザクションを開始します...")
try:
conn.execute("BEGIN IMMEDIATE;") # 即時ロックを取得
conn.execute("INSERT INTO users (name) VALUES ('Alice')")
print("プロセスA: 挿入実行中。5秒間待機します...")
time.sleep(5) # 長時間処理をシミュレート
conn.execute("INSERT INTO users (name) VALUES ('Bob')")
conn.execute("COMMIT;")
print("プロセスA: トランザクションをコミットしました。")
except sqlite3.OperationalError as e:
print(f"プロセスA: エラーが発生しました: {e}")
conn.rollback() # エラーが発生してもロールバックを試みる
finally:
conn.close()
print("プロセスA: 接続を閉じました。")
def process_b_no_timeout():
"""busy_timeout なしでデータベースにアクセスするプロセス"""
print("\nプロセスB (busy_timeoutなし): データベースに接続中...")
conn = sqlite3.connect(DB_FILE)
conn.isolation_level = None # オートコミット
try:
print("プロセスB (busy_timeoutなし): ユーザーの読み取りを試みます...")
# PRAGMA busy_timeout を設定しない
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()
print(f"プロセスB (busy_timeoutなし): ユーザーを読み取りました: {rows}")
except sqlite3.OperationalError as e:
print(f"プロセスB (busy_timeoutなし): エラーが発生しました: {e}")
finally:
conn.close()
print("プロセスB (busy_timeoutなし): 接続を閉じました。")
def process_b_with_timeout():
"""busy_timeout ありでデータベースにアクセスするプロセス"""
print("\nプロセスB (busy_timeoutあり): データベースに接続中...")
conn = sqlite3.connect(DB_FILE)
conn.isolation_level = None # オートコミット
try:
# busy_timeout を設定 (例: 10秒待機)
conn.execute("PRAGMA busy_timeout = 10000;")
print("プロセスB (busy_timeoutあり): PRAGMA busy_timeout = 10000 を設定しました。")
print("プロセスB (busy_timeoutあり): ユーザーの読み取りを試みます...")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()
print(f"プロセスB (busy_timeoutあり): ユーザーを読み取りました: {rows}")
except sqlite3.OperationalError as e:
print(f"プロセスB (busy_timeoutあり): エラーが発生しました: {e}")
finally:
conn.close()
print("プロセスB (busy_timeoutあり): 接続を閉じました。")
if __name__ == "__main__":
if os.path.exists(DB_FILE):
os.remove(DB_FILE)
# データベースをセットアップ
conn_setup = sqlite3.connect(DB_FILE)
setup_db(conn_setup)
conn_setup.close()
# プロセスAをスレッドで開始
thread_a = threading.Thread(target=process_a)
thread_a.start()
# プロセスAが書き込みを開始するまで少し待つ
time.sleep(1)
# busy_timeout なしのプロセスBを開始
thread_b_no_timeout = threading.Thread(target=process_b_no_timeout)
thread_b_no_timeout.start()
thread_b_no_timeout.join() # 完了を待つ
# プロセスAの完了を待つ
thread_a.join()
# DBを再セットアップ (walモードのテストのためにクリーンな状態にする)
if os.path.exists(DB_FILE):
os.remove(DB_FILE)
conn_setup = sqlite3.connect(DB_FILE)
setup_db(conn_setup)
conn_setup.close()
# プロセスAを再度開始
thread_a_2 = threading.Thread(target=process_a)
thread_a_2.start()
# プロセスAが書き込みを開始するまで少し待つ
time.sleep(1)
# busy_timeout ありのプロセスBを開始
thread_b_with_timeout = threading.Thread(target=process_b_with_timeout)
thread_b_with_timeout.start()
thread_b_with_timeout.join() # 完了を待つ
thread_a_2.join()
print("\n--- プログラム終了 ---")
if os.path.exists(DB_FILE):
os.remove(DB_FILE)
if os.path.exists(DB_FILE + "-journal"):
os.remove(DB_FILE + "-journal")
実行結果の概観:
process_b_with_timeout
は、PRAGMA busy_timeout = 10000;
が設定されているため、process_a
がロックを解放するまで最大10秒間待機します。process_a
のスリープが5秒であるため、process_b_with_timeout
は待機後に読み取りに成功するはずです。process_b_no_timeout
は、process_a
がロックを保持している間にデータベースにアクセスしようとするため、すぐにsqlite3.OperationalError: database is locked
エラーが発生するはずです。
C++の例
#include <iostream>
#include <sqlite3.h>
#include <thread>
#include <chrono>
#include <cstdio> // for remove
const char* DB_FILE = "test_busy_timeout_cpp.db";
// エラーチェックマクロ
#define CHECK_SQLITE(rc, msg) \
if (rc != SQLITE_OK) { \
std::cerr << msg << ": " << sqlite3_errstr(rc) << std::endl; \
if (db) sqlite3_close(db); \
return; \
}
void setup_db(sqlite3* db) {
char* err_msg = 0;
int rc;
rc = sqlite3_exec(db, "DROP TABLE IF EXISTS users;", 0, 0, &err_msg);
CHECK_SQLITE(rc, "テーブル削除失敗");
sqlite3_free(err_msg);
rc = sqlite3_exec(db, "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);", 0, 0, &err_msg);
CHECK_SQLITE(rc, "テーブル作成失敗");
sqlite3_free(err_msg);
std::cout << "データベースセットアップ完了" << std::endl;
}
void process_a() {
sqlite3* db;
char* err_msg = 0;
int rc;
std::cout << "プロセスA: データベースに接続中..." << std::endl;
rc = sqlite3_open(DB_FILE, &db);
CHECK_SQLITE(rc, "プロセスA: データベースオープン失敗");
// ロールバックジャーナルモードを使用 (デフォルト)
rc = sqlite3_exec(db, "PRAGMA journal_mode = DELETE;", 0, 0, &err_msg);
CHECK_SQLITE(rc, "プロセスA: PRAGMA journal_mode 失敗");
sqlite3_free(err_msg);
std::cout << "プロセスA: 長時間かかる書き込みトランザクションを開始します..." << std::endl;
try {
rc = sqlite3_exec(db, "BEGIN IMMEDIATE;", 0, 0, &err_msg); // 即時ロックを取得
CHECK_SQLITE(rc, "プロセスA: BEGIN IMMEDIATE 失敗");
sqlite3_free(err_msg);
rc = sqlite3_exec(db, "INSERT INTO users (name) VALUES ('Alice');", 0, 0, &err_msg);
CHECK_SQLITE(rc, "プロセスA: INSERT Alice 失敗");
sqlite3_free(err_msg);
std::cout << "プロセスA: 挿入実行中。5秒間待機します..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5)); // 長時間処理をシミュレート
rc = sqlite3_exec(db, "INSERT INTO users (name) VALUES ('Bob');", 0, 0, &err_msg);
CHECK_SQLITE(rc, "プロセスA: INSERT Bob 失敗");
sqlite3_free(err_msg);
rc = sqlite3_exec(db, "COMMIT;", 0, 0, &err_msg);
CHECK_SQLITE(rc, "プロセスA: COMMIT 失敗");
sqlite3_free(err_msg);
std::cout << "プロセスA: トランザクションをコミットしました。" << std::endl;
} catch (...) {
std::cerr << "プロセスA: 例外が発生しました。" << std::endl;
sqlite3_exec(db, "ROLLBACK;", 0, 0, &err_msg); // エラーが発生してもロールバックを試みる
sqlite3_free(err_msg);
}
sqlite3_close(db);
std::cout << "プロセスA: 接続を閉じました。" << std::endl;
}
void process_b_no_timeout() {
sqlite3* db;
int rc;
std::cout << "\nプロセスB (busy_timeoutなし): データベースに接続中..." << std::endl;
rc = sqlite3_open(DB_FILE, &db);
CHECK_SQLITE(rc, "プロセスB (busy_timeoutなし): データベースオープン失敗");
std::cout << "プロセスB (busy_timeoutなし): ユーザーの読み取りを試みます..." << std::endl;
sqlite3_stmt* stmt;
rc = sqlite3_prepare_v2(db, "SELECT * FROM users;", -1, &stmt, 0);
if (rc == SQLITE_BUSY) {
std::cerr << "プロセスB (busy_timeoutなし): エラーが発生しました: " << sqlite3_errstr(rc) << std::endl;
} else {
CHECK_SQLITE(rc, "プロセスB (busy_timeoutなし): SQL準備失敗");
while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
std::cout << "プロセスB (busy_timeoutなし): ID=" << sqlite3_column_int(stmt, 0)
<< ", Name=" << sqlite3_column_text(stmt, 1) << std::endl;
}
if (rc != SQLITE_DONE) {
std::cerr << "プロセスB (busy_timeoutなし): SELECT失敗: " << sqlite3_errstr(rc) << std::endl;
}
sqlite3_finalize(stmt);
}
sqlite3_close(db);
std::cout << "プロセスB (busy_timeoutなし): 接続を閉じました。" << std::endl;
}
void process_b_with_timeout() {
sqlite3* db;
int rc;
std::cout << "\nプロセスB (busy_timeoutあり): データベースに接続中..." << std::endl;
rc = sqlite3_open(DB_FILE, &db);
CHECK_SQLITE(rc, "プロセスB (busy_timeoutあり): データベースオープン失敗");
// busy_timeout を設定 (例: 10秒待機)
sqlite3_busy_timeout(db, 10000); // 10000ミリ秒 = 10秒
std::cout << "プロセスB (busy_timeoutあり): busy_timeout = 10000 を設定しました。" << std::endl;
std::cout << "プロセスB (busy_timeoutあり): ユーザーの読み取りを試みます..." << std::endl;
sqlite3_stmt* stmt;
rc = sqlite3_prepare_v2(db, "SELECT * FROM users;", -1, &stmt, 0);
if (rc == SQLITE_BUSY) { // busy_timeoutが効いていれば、このエラーは返されないはず
std::cerr << "プロセスB (busy_timeoutあり): エラーが発生しました: " << sqlite3_errstr(rc) << std::endl;
} else {
CHECK_SQLITE(rc, "プロセスB (busy_timeoutあり): SQL準備失敗");
while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
std::cout << "プロセスB (busy_timeoutあり): ID=" << sqlite3_column_int(stmt, 0)
<< ", Name=" << sqlite3_column_text(stmt, 1) << std::endl;
}
if (rc != SQLITE_DONE) {
std::cerr << "プロセスB (busy_timeoutあり): SELECT失敗: " << sqlite3_errstr(rc) << std::endl;
}
sqlite3_finalize(stmt);
}
sqlite3_close(db);
std::cout << "プロセスB (busy_timeoutあり): 接続を閉じました。" << std::endl;
}
int main() {
std::remove(DB_FILE); // 既存のDBファイルを削除
std::remove((std::string(DB_FILE) + "-journal").c_str());
sqlite3* db_setup;
int rc = sqlite3_open(DB_FILE, &db_setup);
CHECK_SQLITE(rc, "初期DBセットアップ失敗");
setup_db(db_setup);
sqlite3_close(db_setup);
// プロセスAをスレッドで開始
std::thread thread_a(process_a);
std::this_thread::sleep_for(std::chrono::seconds(1)); // プロセスAが書き込みを開始するまで少し待つ
// busy_timeout なしのプロセスBを開始
std::thread thread_b_no_timeout(process_b_no_timeout);
thread_b_no_timeout.join(); // 完了を待つ
// プロセスAの完了を待つ
thread_a.join();
// DBを再セットアップ (WALモードのテストのためにクリーンな状態にする)
std::remove(DB_FILE);
std::remove((std::string(DB_FILE) + "-journal").c_str());
rc = sqlite3_open(DB_FILE, &db_setup);
CHECK_SQLITE(rc, "再セットアップDBオープン失敗");
setup_db(db_setup);
sqlite3_close(db_setup);
// プロセスAを再度開始
std::thread thread_a_2(process_a);
std::this_thread::sleep_for(std::chrono::seconds(1)); // プロセスAが書き込みを開始するまで少し待つ
// busy_timeout ありのプロセスBを開始
std::thread thread_b_with_timeout(process_b_with_timeout);
thread_b_with_timeout.join(); // 完了を待つ
thread_a_2.join();
std::cout << "\n--- プログラム終了 ---" << std::endl;
std::remove(DB_FILE);
std::remove((std::string(DB_FILE) + "-journal").c_str());
return 0;
}
WAL (Write-Ahead Logging) モードの例
WALモードは、書き込み操作中に読み取りを許可することで、同時実行性を大幅に向上させます。
シナリオ:
- プロセスBが、その書き込み中にデータベースから読み取りを試みます。
- プロセスAが書き込みを頻繁に行います。
import sqlite3
import time
import threading
import os
DB_FILE_WAL = "test_wal.db"
def setup_db_wal(conn):
"""WALモードでデータベースの初期設定"""
cursor = conn.cursor()
cursor.execute("PRAGMA journal_mode = WAL;") # ここでWALモードを有効化
print(f"ジャーナルモード: {conn.execute('PRAGMA journal_mode;').fetchone()[0]}")
cursor.execute("DROP TABLE IF EXISTS messages")
cursor.execute("CREATE TABLE messages (id INTEGER PRIMARY KEY, content TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)")
conn.commit()
def writer_process_wal():
"""WALモードでデータベースに書き込むプロセス"""
print("\nライタープロセス (WAL): データベースに接続中...")
conn = sqlite3.connect(DB_FILE_WAL)
conn.isolation_level = None # オートコミット
conn.execute("PRAGMA journal_mode = WAL;") # ここでも設定 (接続ごとに必要)
print(f"ライタープロセス (WAL): ジャーナルモード: {conn.execute('PRAGMA journal_mode;').fetchone()[0]}")
for i in range(5):
try:
message = f"メッセージ {i+1} from Writer"
conn.execute("INSERT INTO messages (content) VALUES (?)", (message,))
print(f"ライタープロセス (WAL): 挿入: '{message}'")
time.sleep(0.5) # 少し待機
except sqlite3.OperationalError as e:
print(f"ライタープロセス (WAL): エラー発生 (挿入): {e}")
break
conn.close()
print("ライタープロセス (WAL): 接続を閉じました。")
def reader_process_wal():
"""WALモードでデータベースから読み取るプロセス"""
print("\nリーダープロセス (WAL): データベースに接続中...")
conn = sqlite3.connect(DB_FILE_WAL)
conn.isolation_level = None # オートコミット
conn.execute("PRAGMA journal_mode = WAL;") # ここでも設定
print(f"リーダープロセス (WAL): ジャーナルモード: {conn.execute('PRAGMA journal_mode;').fetchone()[0]}")
conn.execute("PRAGMA busy_timeout = 5000;") # 念のため設定
for _ in range(5):
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM messages")
rows = cursor.fetchall()
print(f"リーダープロセス (WAL): 読み取り: {len(rows)}件のメッセージ")
time.sleep(0.7) # 少し待機
except sqlite3.OperationalError as e:
print(f"リーダープロセス (WAL): エラー発生 (読み取り): {e}")
break
conn.close()
print("リーダープロセス (WAL): 接続を閉じました。")
if __name__ == "__main__":
if os.path.exists(DB_FILE_WAL):
os.remove(DB_FILE_WAL)
if os.path.exists(DB_FILE_WAL + "-wal"):
os.remove(DB_FILE_WAL + "-wal")
if os.path.exists(DB_FILE_WAL + "-shm"):
os.remove(DB_FILE_WAL + "-shm")
# データベースをWALモードでセットアップ
conn_setup_wal = sqlite3.connect(DB_FILE_WAL)
setup_db_wal(conn_setup_wal)
conn_setup_wal.close()
writer_thread = threading.Thread(target=writer_process_wal)
reader_thread = threading.Thread(target=reader_process_wal)
writer_thread.start()
reader_thread.start()
writer_thread.join()
reader_thread.join()
print("\n--- WALプログラム終了 ---")
if os.path.exists(DB_FILE_WAL):
os.remove(DB_FILE_WAL)
if os.path.exists(DB_FILE_WAL + "-wal"):
os.remove(DB_FILE_WAL + "-wal")
if os.path.exists(DB_FILE_WAL + "-shm"):
os.remove(DB_FILE_WAL + "-shm")
実行結果の概観:
- ライタープロセスが書き込みを行っている間でも、リーダープロセスは「database is locked」エラーなしで読み取りを続行できるはずです。これは、WALモードが読み取りと書き込みの並行処理を可能にしているためです。
#include <iostream>
#include <sqlite3.h>
#include <thread>
#include <chrono>
#include <cstdio> // for remove
const char* DB_FILE_WAL_CPP = "test_wal_cpp.db";
// エラーチェックマクロ (再利用)
#define CHECK_SQLITE(rc, msg) \
if (rc != SQLITE_OK) { \
std::cerr << msg << ": " << sqlite3_errstr(rc) << std::endl; \
if (db) sqlite3_close(db); \
return; \
}
void setup_db_wal_cpp(sqlite3* db) {
char* err_msg = 0;
int rc;
rc = sqlite3_exec(db, "PRAGMA journal_mode = WAL;", 0, 0, &err_msg);
CHECK_SQLITE(rc, "PRAGMA journal_mode = WAL 失敗");
sqlite3_free(err_msg);
char** result = nullptr;
int rows, cols;
sqlite3_get_table(db, "PRAGMA journal_mode;", &result, &rows, &cols, &err_msg);
if (rows > 0 && cols > 0) {
std::cout << "ジャーナルモード: " << result[1] << std::endl;
}
sqlite3_free_table(result);
sqlite3_free(err_msg);
rc = sqlite3_exec(db, "DROP TABLE IF EXISTS messages;", 0, 0, &err_msg);
CHECK_SQLITE(rc, "テーブル削除失敗");
sqlite3_free(err_msg);
rc = sqlite3_exec(db, "CREATE TABLE messages (id INTEGER PRIMARY KEY, content TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP);", 0, 0, &err_msg);
CHECK_SQLITE(rc, "テーブル作成失敗");
sqlite3_free(err_msg);
std::cout << "WALモードでデータベースセットアップ完了" << std::endl;
}
void writer_process_wal_cpp() {
sqlite3* db;
int rc;
char* err_msg = 0;
std::cout << "\nライタープロセス (WAL): データベースに接続中..." << std::endl;
rc = sqlite3_open(DB_FILE_WAL_CPP, &db);
CHECK_SQLITE(rc, "ライタープロセス (WAL): データベースオープン失敗");
rc = sqlite3_exec(db, "PRAGMA journal_mode = WAL;", 0, 0, &err_msg); // 接続ごとに必要
CHECK_SQLITE(rc, "ライタープロセス (WAL): PRAGMA journal_mode = WAL 失敗");
sqlite3_free(err_msg);
char** result = nullptr;
int rows, cols;
sqlite3_get_table(db, "PRAGMA journal_mode;", &result, &rows, &cols, &err_msg);
if (rows > 0 && cols > 0) {
std::cout << "ライタープロセス (WAL): ジャーナルモード: " << result[1] << std::endl;
}
sqlite3_free_table(result);
sqlite3_free(err_msg);
sqlite3_stmt* stmt;
rc = sqlite3_prepare_v2(db, "INSERT INTO messages (content) VALUES (?);", -1, &stmt, 0);
CHECK_SQLITE(rc, "ライタープロセス (WAL): SQL準備失敗");
for (int i = 0; i < 5; ++i) {
std::string message = "メッセージ " + std::to_string(i + 1) + " from Writer";
sqlite3_bind_text(stmt, 1, message.c_str(), -1, SQLITE_TRANSIENT);
rc = sqlite3_step(stmt);
if (rc == SQLITE_DONE) {
std::cout << "ライタープロセス (WAL): 挿入: '" << message << "'" << std::endl;
} else if (rc == SQLITE_BUSY) {
std::cerr << "ライタープロセス (WAL): エラー発生 (挿入 - BUSY): " << sqlite3_errstr(rc) << std::endl;
sqlite3_reset(stmt); // ステートメントをリセットして再利用可能にする
continue; // 再試行のためにループを続行
} else {
std::cerr << "ライタープロセス (WAL): エラー発生 (挿入): " << sqlite3_errstr(rc) << std::endl;
break;
}
sqlite3_reset(stmt); // ステートメントをリセットして再利用可能にする
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 少し待機
}
sqlite3_finalize(stmt);
sqlite3_close(db);
std::cout << "ライタープロセス (WAL): 接続を閉じました。" << std::endl;
}
void reader_process_wal_cpp() {
sqlite3* db;
int rc;
std::cout << "\nリーダープロセス (WAL): データベースに接続中..." << std::endl;
rc = sqlite3_open(DB_FILE_WAL_CPP, &db);
CHECK_SQLITE(rc, "リーダープロセス (WAL): データベースオープン失敗");
rc = sqlite3_exec(db, "PRAGMA journal_mode = WAL;", 0, 0, 0); // 接続ごとに必要
CHECK_SQLITE(rc, "リーダープロセス (WAL): PRAGMA journal_mode = WAL 失敗");
char** result = nullptr;
int rows_pragma, cols_pragma;
sqlite3_get_table(db, "PRAGMA journal_mode;", &result, &rows_pragma, &cols_pragma, 0);
if (rows_pragma > 0 && cols_pragma > 0) {
std::cout << "リーダープロセス (WAL): ジャーナルモード: " << result[1] << std::endl;
}
sqlite3_free_table(result);
sqlite3_busy_timeout(db, 5000); // 念のため設定 (WALではあまり発生しないが、排他操作で必要)
sqlite3_stmt* stmt;
rc = sqlite3_prepare_v2(db, "SELECT * FROM messages;", -1, &stmt, 0);
CHECK_SQLITE(rc, "リーダープロセス (WAL): SQL準備失敗");
for (int i = 0; i < 5; ++i) {
int row_count = 0;
rc = sqlite3_step(stmt); // 最初のステップ
if (rc == SQLITE_BUSY) {
std::cerr << "リーダープロセス (WAL): エラー発生 (読み取り - BUSY): " << sqlite3_errstr(rc) << std::endl;
} else {
while (rc == SQLITE_ROW) {
row_count++;
rc = sqlite3_step(stmt); // 次の行へ
}
if (rc != SQLITE_DONE) {
std::cerr << "リーダープロセス (WAL): SELECT失敗: " << sqlite3_errstr(rc) << std::endl;
}
std::cout << "リーダープロセス (WAL): 読み取り: " << row_count << "件のメッセージ" << std::endl;
}
sqlite3_reset(stmt); // ステートメントをリセットして再利用可能にする
std::this_thread::sleep_for(std::chrono::milliseconds(700)); // 少し待機
}
sqlite3_finalize(stmt);
sqlite3_close(db);
std::cout << "リーダープロセス (WAL): 接続を閉じました。" << std::endl;
}
int main() {
std::remove(DB_FILE_WAL_CPP); // 既存のDBファイルを削除
std::remove((std::string(DB_FILE_WAL_CPP) + "-wal").c_str());
std::remove((std::string(DB_FILE_WAL_CPP) + "-shm").c_str());
sqlite3* db_setup;
int rc = sqlite3_open(DB_FILE_WAL_CPP, &db_setup);
CHECK_SQLITE(rc, "初期DBセットアップ失敗");
setup_db_wal_cpp(db_setup);
sqlite3_close(db_setup);
std::thread writer_thread(writer_process_wal_cpp);
std::thread reader_thread(reader_process_wal_cpp);
writer_thread.join();
reader_thread.join();
std::cout << "\n--- WALプログラム終了 ---" << std::endl;
std::remove(DB_FILE_WAL_CPP);
std::remove((std::string(DB_FILE_WAL_CPP) + "-wal").c_str());
std::remove((std::string(DB_FILE_WAL_CPP) + "-shm").c_str());
return 0;
}
コードの解説とポイント
sqlite3_prepare_v2()
/sqlite3_step()
/sqlite3_finalize()
(C++): C++では、SQLステートメントを準備(prepare_v2
)、実行(step
)、解放(finalize
)する一連のAPIを正しく使用することが重要です。sqlite3_step()
がSQLITE_BUSY
を返す可能性があるため、その場合のリトライロジックも検討できます。- トランザクション:
BEGIN IMMEDIATE;
は、トランザクション開始時に即座にRESERVEDロック(書き込み予約ロック)を取得しようとします。これにより、他のリーダーが読み取りを完了するのを待つことができますが、新しいリーダーの読み取りをブロックします。BEGIN DEFERRED;
(デフォルト) は、最初の書き込み操作が行われるまでロックの取得を遅らせます。BEGIN EXCLUSIVE;
は、トランザクション開始時に即座にEXCLUSIVEロック(排他ロック)を取得しようとします。
sqlite3_busy_timeout(db, milliseconds)
(C++): C言語APIでbusy_timeout
を設定するための関数です。PRAGMA journal_mode = WAL;
:- データベースのジャーナリングモードをWALに設定します。これにより、読み取りと書き込みの同時実行性が向上します。
- この設定も、データベース接続ごとに実行する必要がありますが、一度データベースファイルをWALモードに設定すると、次回以降は自動的にWALモードで開かれます(ただし、接続時に明示的に設定しないと、他の接続が古いモードで開く可能性があるので、すべての接続で設定することを推奨します)。
PRAGMA busy_timeout = milliseconds;
:- ロックが利用可能になるまで待機する時間をミリ秒単位で設定します。このタイムアウト期間中、SQLiteは内部的にリトライを繰り返します。
- この設定は、データベース接続ごとに必要です。
SQLITE_BUSY
エラーは、設定されたタイムアウト期間内にロックを取得できなかった場合に返されます。
conn.isolation_level = None
(Python) / オートコミット: Pythonのsqlite3
モジュールでは、isolation_level
をNone
に設定するとオートコミットモードになります。これにより、各SQLステートメントが自動的にコミットされます。明示的なトランザクション(BEGIN
,COMMIT
,ROLLBACK
)を制御したい場合は、isolation_level
を'DEFERRED'
などに設定します。C++では、明示的にBEGIN
/COMMIT
を呼び出す必要があります。sqlite3.connect(DB_FILE)
/sqlite3_open(DB_FILE, &db)
: データベースファイルへの接続を開きます。SQLiteは、存在しない場合は自動的にファイルを作成します。
これらの例は、SQLiteで同時実行性の問題を管理するための基本的な手法を示しています。実際のアプリケーションでは、より複雑なエラーハンドリング、リトライメカニズム、そしてトランザクション管理が必要になる場合があります。
SQLiteのファイルロックと同時実行性に関するプログラミング例を、PythonとC++で示します。これらの例は、SQLITE_BUSY
エラーをどのように処理するか、そしてWAL (Write-Ahead Logging) モードをどのように設定するかを示します。
SQLITE_BUSY の処理と busy_timeout の設定 (Python)
Pythonの sqlite3
モジュールでは、timeout
パラメータを使用して busy_timeout
を設定できます。
import sqlite3
import time
import threading
import os
DATABASE_FILE = 'example.db'
# 既存のデータベースファイルを削除(テスト用)
if os.path.exists(DATABASE_FILE):
os.remove(DATABASE_FILE)
if os.path.exists(DATABASE_FILE + '-wal'):
os.remove(DATABASE_FILE + '-wal')
if os.path.exists(DATABASE_FILE + '-shm'):
os.remove(DATABASE_FILE + '-shm')
def init_db(conn):
"""データベースの初期化とテーブル作成"""
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS data (
id INTEGER PRIMARY KEY,
value TEXT
)
''')
conn.commit()
def writer_thread(thread_id, num_writes):
"""データベースに書き込みを行うスレッド"""
# busy_timeout を設定して、ロック解放まで待機する
conn = sqlite3.connect(DATABASE_FILE, timeout=5) # 5秒まで待機
init_db(conn) # 各スレッドが独自の接続を持つ場合、初期化は一度だけ実行
# WALモードに設定(推奨)
conn.execute("PRAGMA journal_mode = WAL;")
conn.execute("PRAGMA synchronous = NORMAL;")
conn.commit() # PRAGMA の適用をコミット
print(f"Writer {thread_id}: データベース接続開始")
for i in range(num_writes):
try:
value = f"Thread {thread_id} - Value {i}"
conn.execute("INSERT INTO data (value) VALUES (?)", (value,))
conn.commit()
print(f"Writer {thread_id}: データを挿入しました: {value}")
time.sleep(0.01) # ロックを解放して他のスレッドに機会を与える
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
print(f"Writer {thread_id}: データベースがロックされています。リトライします...")
time.sleep(0.1) # 少し待ってからリトライ
else:
print(f"Writer {thread_id}: エラーが発生しました: {e}")
break
except Exception as e:
print(f"Writer {thread_id}: 予期せぬエラー: {e}")
break
conn.close()
print(f"Writer {thread_id}: データベース接続終了")
def reader_thread(thread_id, num_reads):
"""データベースから読み取りを行うスレッド"""
conn = sqlite3.connect(DATABASE_FILE, timeout=5) # 5秒まで待機
# WALモードに設定(推奨)
conn.execute("PRAGMA journal_mode = WAL;")
conn.execute("PRAGMA synchronous = NORMAL;")
conn.commit() # PRAGMA の適用をコミット
print(f"Reader {thread_id}: データベース接続開始")
for i in range(num_reads):
try:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM data")
count = cursor.fetchone()[0]
print(f"Reader {thread_id}: 現在のレコード数: {count}")
time.sleep(0.05) # 少し待機
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
print(f"Reader {thread_id}: データベースがロックされています。リトライします...")
time.sleep(0.1) # 少し待ってからリトライ
else:
print(f"Reader {thread_id}: エラーが発生しました: {e}")
break
except Exception as e:
print(f"Reader {thread_id}: 予期せぬエラー: {e}")
break
conn.close()
print(f"Reader {thread_id}: データベース接続終了")
if __name__ == "__main__":
print("----- SQLite Concurrent Access Example -----")
# メインスレッドでデータベースを初期化
main_conn = sqlite3.connect(DATABASE_FILE)
init_db(main_conn)
main_conn.close()
threads = []
# 複数の書き込みスレッド
for i in range(2):
writer = threading.Thread(target=writer_thread, args=(i + 1, 5))
threads.append(writer)
# 複数の読み取りスレッド
for i in range(2):
reader = threading.Thread(target=reader_thread, args=(i + 1, 10))
threads.append(reader)
# スレッドを開始
for t in threads:
t.start()
# すべてのスレッドの終了を待つ
for t in threads:
t.join()
print("----- すべてのスレッドが終了しました -----")
# 最終的なデータを確認
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM data")
final_count = cursor.fetchone()[0]
print(f"最終的なレコード総数: {final_count}")
conn.close()
このコードのポイント
- エラーハンドリング:
try-except sqlite3.OperationalError
ブロックでdatabase is locked
エラーを捕捉し、リトライロジックを実装することで、一時的なロック競合に対応できます。 - 各スレッドが独自の接続を持つ: SQLiteは、通常、異なるスレッドで同じデータベース接続オブジェクトを共有することを推奨していません(
check_same_thread=False
を設定しない限り)。各スレッドが独自のsqlite3.Connection
オブジェクトを作成することで、この問題を回避できます。 PRAGMA synchronous = NORMAL;
: WALモードと組み合わせることで、データの耐久性を確保しつつパフォーマンスを向上させます。PRAGMA journal_mode = WAL;
: これはWALモードを有効にするための最も重要な設定です。これにより、読み取りと書き込みの同時実行性が大幅に向上します。読み取り側は書き込み中に古いスナップショットを見ることができます。sqlite3.connect(DATABASE_FILE, timeout=5)
:timeout
パラメータは、データベースがロックされている場合にOperationalError
が発生するまでの待機時間を秒単位で指定します。これにより、すぐにエラーになるのではなく、ある程度の時間ロックが解放されるのを待つことができます。
SQLITE_BUSY の処理と sqlite3_busy_timeout の設定 (C++)
C++では、SQLite C APIを直接使用します。sqlite3_busy_timeout
関数を呼び出すことで、ビジーハンドラを設定できます。
#include <iostream>
#include <sqlite3.h>
#include <string>
#include <thread>
#include <chrono>
#include <vector>
#include <filesystem> // C++17以降
// データベースファイル名
const std::string DATABASE_FILE = "example_cpp.db";
// エラーチェックマクロ
#define CHECK_SQLITE(rc, db, msg) \
if (rc != SQLITE_OK) { \
std::cerr << msg << ": " << sqlite3_errmsg(db) << std::endl; \
/* エラー処理 */ \
}
// データベースの初期化
void init_db(sqlite3* db) {
char* err_msg = nullptr;
int rc = sqlite3_exec(db,
"CREATE TABLE IF NOT EXISTS data (id INTEGER PRIMARY KEY, value TEXT);",
nullptr, nullptr, &err_msg);
CHECK_SQLITE(rc, db, "Failed to create table");
if (err_msg) {
sqlite3_free(err_msg);
}
// WALモードの設定
rc = sqlite3_exec(db, "PRAGMA journal_mode = WAL;", nullptr, nullptr, &err_msg);
CHECK_SQLITE(rc, db, "Failed to set WAL journal mode");
if (err_msg) {
sqlite3_free(err_msg);
}
// synchronousモードの設定
rc = sqlite3_exec(db, "PRAGMA synchronous = NORMAL;", nullptr, nullptr, &err_msg);
CHECK_SQLITE(rc, db, "Failed to set synchronous mode");
if (err_msg) {
sqlite3_free(err_msg);
}
}
// 書き込みスレッド
void writer_thread(int thread_id, int num_writes) {
sqlite3* db;
int rc = sqlite3_open(DATABASE_FILE.c_str(), &db);
CHECK_SQLITE(rc, db, "Failed to open database for writer");
// busy_timeout の設定 (5000ミリ秒 = 5秒)
sqlite3_busy_timeout(db, 5000);
std::cout << "Writer " << thread_id << ": データベース接続開始" << std::endl;
init_db(db); // 各スレッドが独自の接続を持つ場合、初期化は一度だけ実行
for (int i = 0; i < num_writes; ++i) {
std::string value = "Thread " + std::to_string(thread_id) + " - Value " + std::to_string(i);
std::string sql = "INSERT INTO data (value) VALUES ('" + value + "');";
char* err_msg = nullptr;
while (true) {
rc = sqlite3_exec(db, sql.c_str(), nullptr, nullptr, &err_msg);
if (rc == SQLITE_OK) {
std::cout << "Writer " << thread_id << ": データを挿入しました: " << value << std::endl;
if (err_msg) sqlite3_free(err_msg);
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // ロックを解放
break;
} else if (rc == SQLITE_BUSY || rc == SQLITE_LOCKED) {
std::cerr << "Writer " << thread_id << ": データベースがロックされています (" << sqlite3_errcode(db) << "). リトライします..." << std::endl;
if (err_msg) sqlite3_free(err_msg);
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 少し待ってからリトライ
} else {
std::cerr << "Writer " << thread_id << ": エラーが発生しました: " << sqlite3_errmsg(db) << std::endl;
if (err_msg) sqlite3_free(err_msg);
break; // 致命的なエラー
}
}
}
sqlite3_close(db);
std::cout << "Writer " << thread_id << ": データベース接続終了" << std::endl;
}
// 読み取りスレッド
void reader_thread(int thread_id, int num_reads) {
sqlite3* db;
int rc = sqlite3_open(DATABASE_FILE.c_str(), &db);
CHECK_SQLITE(rc, db, "Failed to open database for reader");
// busy_timeout の設定
sqlite3_busy_timeout(db, 5000);
std::cout << "Reader " << thread_id << ": データベース接続開始" << std::endl;
init_db(db); // 各スレッドが独自の接続を持つ場合、初期化は一度だけ実行
for (int i = 0; i < num_reads; ++i) {
std::string sql = "SELECT COUNT(*) FROM data;";
sqlite3_stmt* stmt;
while (true) {
rc = sqlite3_prepare_v2(db, sql.c_str(), -1, &stmt, nullptr);
if (rc != SQLITE_OK) {
std::cerr << "Reader " << thread_id << ": SQL準備エラー: " << sqlite3_errmsg(db) << std::endl;
break;
}
rc = sqlite3_step(stmt);
if (rc == SQLITE_ROW) {
int count = sqlite3_column_int(stmt, 0);
std::cout << "Reader " << thread_id << ": 現在のレコード数: " << count << std::endl;
sqlite3_finalize(stmt);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
break;
} else if (rc == SQLITE_BUSY || rc == SQLITE_LOCKED) {
std::cerr << "Reader " << thread_id << ": データベースがロックされています (" << sqlite3_errcode(db) << "). リトライします..." << std::endl;
sqlite3_finalize(stmt); // ステートメントを解放してからリトライ
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
std::cerr << "Reader " << thread_id << ": データ読み取りエラー: " << sqlite3_errmsg(db) << std::endl;
sqlite3_finalize(stmt);
break;
}
}
}
sqlite3_close(db);
std::cout << "Reader " << thread_id << ": データベース接続終了" << std::endl;
}
int main() {
std::cout << "----- SQLite Concurrent Access Example (C++) -----" << std::endl;
// 既存のデータベースファイルを削除(テスト用)
if (std::filesystem::exists(DATABASE_FILE)) {
std::filesystem::remove(DATABASE_FILE);
}
if (std::filesystem::exists(DATABASE_FILE + "-wal")) {
std::filesystem::remove(DATABASE_FILE + "-wal");
}
if (std::filesystem::exists(DATABASE_FILE + "-shm")) {
std::filesystem::remove(DATABASE_FILE + "-shm");
}
// メインスレッドでデータベースを初期化
sqlite3* main_db;
int rc = sqlite3_open(DATABASE_FILE.c_str(), &main_db);
CHECK_SQLITE(rc, main_db, "Failed to open database for main init");
init_db(main_db);
sqlite3_close(main_db);
std::vector<std::thread> threads;
// 複数の書き込みスレッド
for (int i = 0; i < 2; ++i) {
threads.emplace_back(writer_thread, i + 1, 5);
}
// 複数の読み取りスレッド
for (int i = 0; i < 2; ++i) {
threads.emplace_back(reader_thread, i + 1, 10);
}
// スレッドを開始(すでに開始している)
// すべてのスレッドの終了を待つ
for (auto& t : threads) {
t.join();
}
std::cout << "----- すべてのスレッドが終了しました -----" << std::endl;
// 最終的なデータを確認
sqlite3* final_db;
rc = sqlite3_open(DATABASE_FILE.c_str(), &final_db);
CHECK_SQLITE(rc, final_db, "Failed to open database for final check");
sqlite3_stmt* stmt;
rc = sqlite3_prepare_v2(final_db, "SELECT COUNT(*) FROM data;", -1, &stmt, nullptr);
CHECK_SQLITE(rc, final_db, "Failed to prepare final count statement");
if (sqlite3_step(stmt) == SQLITE_ROW) {
int final_count = sqlite3_column_int(stmt, 0);
std::cout << "最終的なレコード総数: " << final_count << std::endl;
}
sqlite3_finalize(stmt);
sqlite3_close(final_db);
return 0;
}
このコードのポイント
- 各スレッドが独自の接続を持つ: C++でも同様に、各スレッドが独自の
sqlite3*
ポインタ(データベース接続)を持つことが推奨されます。 sqlite3_finalize()
: プリペアドステートメントの使用後は、必ずsqlite3_finalize()
を呼び出してリソースを解放します。- エラー処理 (
SQLITE_BUSY
,SQLITE_LOCKED
):sqlite3_exec
やsqlite3_step
の戻り値をチェックし、SQLITE_BUSY
またはSQLITE_LOCKED
の場合は、一定時間待機してから操作をリトライするループを実装します。 PRAGMA journal_mode = WAL;
とPRAGMA synchronous = NORMAL;
: Pythonの例と同様に、これらのPRAGMAは、より高い同時実行性と堅牢なデータ整合性のために設定されます。sqlite3_busy_timeout(db, 5000)
:sqlite3_busy_timeout
関数は、SQLITE_BUSY
エラーが返される前に、SQLiteが内部的にロックが解放されるのを待つ時間をミリ秒単位で設定します。sqlite3_open()
: データベースファイルを開きます。
コンパイルと実行 (C++)
このC++コードをコンパイルするには、SQLiteライブラリをリンクする必要があります。
Linux/macOS
g++ your_program.cpp -o your_program -lsqlite3 -std=c++17 -pthread
g++ your_program.cpp -o your_program -lsqlite3 -std=c++17 -static -static-libgcc -static-libstdc++ -pthread
- 各接続は独立: 複数のスレッドやプロセスからSQLiteデータベースにアクセスする場合、各スレッド/プロセスが独自のデータベース接続(
sqlite3.Connection
またはsqlite3*
)を持つべきです。これにより、内部的なロックメカニズムが正しく機能し、データの破損を防ぐことができます。 - リトライロジック:
SQLITE_BUSY
は一時的な競合を示すことが多いため、アプリケーション側でリトライロジックを実装することが、堅牢なシステムを構築するために重要です。 - トランザクションは短く: 特に書き込みトランザクションは可能な限り短く保ち、ロックが保持される時間を最小限に抑えるようにします。
- WALモードは非常に有効:
PRAGMA journal_mode = WAL;
を使用することで、読み取りと書き込みの競合が大幅に減少し、アプリケーションの同時実行パフォーマンスが向上します。 busy_timeout
は必須: 複数のプロセス/スレッドが同じSQLiteデータベースにアクセスする場合、busy_timeout
を設定することは、SQLITE_BUSY
エラーを管理し、アプリケーションの安定性を向上させるために不可欠です。
しかし、特定の状況下でSQLiteの同時実行性を最大限に引き出すための代替的なプログラミング手法や、あるいはSQLiteでは限界がある場合に検討すべき選択肢があります。
SQLiteの制限内での代替手法
SQLiteの基本的な同時実行モデル(複数リーダー、単一ライター)を前提としつつ、パフォーマンスを向上させるためのプログラミング手法です。
-
WAL (Write-Ahead Logging) モードの活用:
- 説明: 最も効果的な方法であり、前述の例でも示しました。WALモードでは、書き込みは直接データベースファイルに行われず、まず専用のログファイル(WALファイル)に追記されます。これにより、書き込み中でもリーダーは古いデータベースファイルからデータを読み取ることが可能になり、読み取りと書き込みの同時実行性が大幅に向上します。
- プログラミング:
データベース接続を開いた直後にこれらのPRAGMAを実行します。PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL; -- WALモードではNORMALが推奨
-
busy_timeout
とリトライロジックの実装:- 説明:
SQLITE_BUSY
エラーは、一時的なロックの競合によって発生することがほとんどです。すぐに諦めるのではなく、一定時間待機してから操作をリトライするロジックをアプリケーション側に実装します。 - プログラミング:
sqlite3_busy_timeout()
(C/C++) または接続時のtimeout
パラメータ (Pythonsqlite3
モジュール) を使用して、SQLiteが内部的に待機する時間を設定します。- それでもエラーになる場合は、アプリケーション側で指数バックオフ(例えば、初回は10ms、次は20ms、次は40ms…と待機時間を増やしていく)を伴うリトライループを実装します。
- 説明:
-
書き込みを一元化する (Write Serialization):
- 説明: 複数のスレッド/プロセスが同時に書き込みを行うのではなく、アプリケーション内で1つの専任のスレッド/プロセス(またはキュー)がすべての書き込み要求を受け取り、順番にSQLiteデータベースに書き込むようにします。
- プログラミング:
- メッセージキュー/チャネル: 書き込み要求をメッセージキュー(例: RabbitMQ, Kafka、あるいはシンプルなスレッドセーフなキュー)に渡し、専用のワーカーがキューから要求を取り出してSQLiteに書き込みます。
- シングルライタースレッド: アプリケーション内で、データベース書き込み専用の1つのスレッドを作成し、他のスレッドからの書き込み要求をこのスレッドに転送します。このスレッドは、受け取った要求を順次処理します。
- 利点:
SQLITE_BUSY
エラーの発生頻度が劇的に減り、ロック競合の管理がアプリケーション層で一元化されるため、予測可能な動作になります。 - 欠点: ボトルネックになる可能性があり、システム全体の書き込みスループットは、その単一ライタースレッドの処理能力に依存します。
-
複数のSQLiteデータベースファイルを使用する:
- 説明: アプリケーションのデータが論理的に分割可能であれば、複数のSQLiteデータベースファイルにデータを分散させ、異なるプロセス/スレッドが異なるファイルに同時に書き込めるようにします。
- プログラミング:
- 例えば、ユーザーごとに個別のSQLiteデータベースファイルを持つ、あるいはデータの種類ごとに異なるファイルを持つといった設計が考えられます。
- 必要に応じて、
ATTACH DATABASE
コマンドを使用して、1つのSQLite接続から複数のデータベースファイルを操作することも可能です。
- 利点: 異なるファイルへの書き込みは相互にロックし合わないため、同時書き込み性能が向上します。
- 欠点: データの一貫性やトランザクション管理が複雑になる可能性があります。全データベースにまたがるJOINなどが非効率になる場合もあります。
SQLiteのファイルロックモデルがアプリケーションの要件に合致しない場合(例: 非常に高い同時書き込みスループット、分散環境での利用など)、他のデータベースシステムへの移行を検討する必要があります。
-
クライアント/サーバー型RDBMS (PostgreSQL, MySQLなど):
- 説明: これらのデータベースは、専用のサーバープロセスが動作し、内部的に高度な同時実行制御メカニズム(MVCC: Multi-Version Concurrency Controlなど)を備えています。これにより、行レベルロックや異なる分離レベルを提供し、複数のクライアントからの同時書き込みを効率的に処理できます。
- 利点: 高い同時書き込み性能、ネットワーク経由でのアクセス、より堅牢なデータ整合性、豊富な機能(レプリケーション、クラスタリングなど)。
- 欠点: セットアップと管理がSQLiteよりも複雑、専用サーバーが必要、軽量な組み込み用途には不向き。
-
埋め込み型KVS (Key-Value Store):
- 説明: SQLiteが提供するSQLインターフェースが不要で、単にキーと値のペアを保存できれば良い場合、BadgerDB, BoltDB (Go言語), RocksDB, LevelDBなどの組み込み型KVSを検討できます。これらは多くの場合、SQLiteよりも高い書き込みスループットを提供し、異なる同時実行モデルを持つことがあります。
- 利点: 非常に高速な書き込み、シンプルなAPI、多くの場合、組み込み型。
- 欠点: SQLのような柔軟なクエリ機能がない、リレーショナルなデータ構造には不向き。
-
分散型データベース:
- 説明: 複数のマシンにデータを分散させ、高可用性とスケーラビリティを実現したい場合、Cassandra, MongoDB, CockroachDB, TiDB などの分散型データベースが選択肢となります。
- 利点: 非常に高いスケーラビリティ、耐障害性。
- 欠点: 運用が非常に複雑、SQLiteとは全く異なる設計アプローチが必要。
SQLiteの「ファイルロックと同時実行性」は、その設計上の特徴と制約です。
- それでも要件を満たせない場合は、PostgreSQLやMySQLのようなクライアント/サーバー型データベース、あるいは特定のユースケースに特化したKVSや分散型データベースへの移行を真剣に検討すべきです。
- 多数の同時書き込み、分散環境 のシナリオでは、SQLiteの限界に達するため、書き込みの一元化や複数のファイルへの分割などのアプリケーションレベルでの工夫が必要になります。
- 単一プロセス、読み取り中心、限定的な同時書き込み のシナリオでは、WALモードの活用と
busy_timeout
によるリトライロジックの実装で十分な性能を発揮します。