From 2c4f966d60ad59ba7185c70ca8e8681bc26fac18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sun, 1 Aug 2021 16:59:52 +0200 Subject: [PATCH] improvement: use sqlite properly --- src/database.rs | 66 +++++++----------------------- src/database/abstraction/sqlite.rs | 48 ++++------------------ 2 files changed, 24 insertions(+), 90 deletions(-) diff --git a/src/database.rs b/src/database.rs index 5e9e025d..baf66b5c 100644 --- a/src/database.rs +++ b/src/database.rs @@ -47,12 +47,8 @@ pub struct Config { db_cache_capacity_mb: f64, #[serde(default = "default_sqlite_read_pool_size")] sqlite_read_pool_size: usize, - #[serde(default = "true_fn")] - sqlite_wal_clean_timer: bool, #[serde(default = "default_sqlite_wal_clean_second_interval")] sqlite_wal_clean_second_interval: u32, - #[serde(default = "default_sqlite_wal_clean_second_timeout")] - sqlite_wal_clean_second_timeout: u32, #[serde(default = "default_sqlite_spillover_reap_fraction")] sqlite_spillover_reap_fraction: f64, #[serde(default = "default_sqlite_spillover_reap_interval_secs")] @@ -120,11 +116,7 @@ fn default_sqlite_read_pool_size() -> usize { } fn default_sqlite_wal_clean_second_interval() -> u32 { - 60 * 60 -} - -fn default_sqlite_wal_clean_second_timeout() -> u32 { - 2 + 15 * 60 // every 15 minutes } fn default_sqlite_spillover_reap_fraction() -> f64 { @@ -465,7 +457,7 @@ impl Database { #[cfg(feature = "sqlite")] { - Self::start_wal_clean_task(&db, &config).await; + Self::start_wal_clean_task(Arc::clone(&db), &config).await; Self::start_spillover_reap_task(builder, &config).await; } @@ -620,24 +612,17 @@ impl Database { } #[cfg(feature = "sqlite")] - #[tracing::instrument(skip(lock, config))] - pub async fn start_wal_clean_task(lock: &Arc>, config: &Config) { - use tokio::time::{interval, timeout}; + #[tracing::instrument(skip(db, config))] + pub async fn start_wal_clean_task(db: Arc>, config: &Config) { + use tokio::time::interval; #[cfg(unix)] use tokio::signal::unix::{signal, SignalKind}; use tracing::info; - use std::{ - sync::Weak, - time::{Duration, Instant}, - }; + use std::time::{Duration, Instant}; - let weak: Weak> = Arc::downgrade(&lock); - - let lock_timeout = Duration::from_secs(config.sqlite_wal_clean_second_timeout as u64); let timer_interval = Duration::from_secs(config.sqlite_wal_clean_second_interval as u64); - let do_timer = config.sqlite_wal_clean_timer; tokio::spawn(async move { let mut i = interval(timer_interval); @@ -647,45 +632,24 @@ impl Database { loop { #[cfg(unix)] tokio::select! { - _ = i.tick(), if do_timer => { - info!(target: "wal-trunc", "Timer ticked") + _ = i.tick() => { + info!("wal-trunc: Timer ticked"); } _ = s.recv() => { - info!(target: "wal-trunc", "Received SIGHUP") + info!("wal-trunc: Received SIGHUP"); } }; #[cfg(not(unix))] - if do_timer { + { i.tick().await; - info!(target: "wal-trunc", "Timer ticked") - } else { - // timer disabled, and there's no concept of signals on windows, bailing... - return; + info!("wal-trunc: Timer ticked") } - if let Some(arc) = Weak::upgrade(&weak) { - info!(target: "wal-trunc", "Rotating sync helpers..."); - // This actually creates a very small race condition between firing this and trying to acquire the subsequent write lock. - // Though it is not a huge deal if the write lock doesn't "catch", as it'll harmlessly time out. - arc.read().await.globals.rotate.fire(); - info!(target: "wal-trunc", "Locking..."); - let guard = { - if let Ok(guard) = timeout(lock_timeout, arc.write()).await { - guard - } else { - info!(target: "wal-trunc", "Lock failed in timeout, canceled."); - continue; - } - }; - info!(target: "wal-trunc", "Locked, flushing..."); - let start = Instant::now(); - if let Err(e) = guard.flush_wal() { - error!(target: "wal-trunc", "Errored: {}", e); - } else { - info!(target: "wal-trunc", "Flushed in {:?}", start.elapsed()); - } + let start = Instant::now(); + if let Err(e) = db.read().await.flush_wal() { + error!("wal-trunc: Errored: {}", e); } else { - break; + info!("wal-trunc: Flushed in {:?}", start.elapsed()); } } }); diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index a46d3ada..bbf75083 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -4,7 +4,7 @@ use crossbeam::channel::{ bounded, unbounded, Receiver as ChannelReceiver, Sender as ChannelSender, TryRecvError, }; use parking_lot::{Mutex, MutexGuard, RwLock}; -use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension, Params}; +use rusqlite::{Connection, DatabaseName::Main, OptionalExtension, Params}; use std::{ collections::HashMap, future::Future, @@ -122,16 +122,11 @@ impl Pool { fn prepare_conn>(path: P, cache_size: Option) -> Result { let conn = Connection::open(path)?; - conn.pragma_update(Some(Main), "journal_mode", &"WAL".to_owned())?; - - // conn.pragma_update(Some(Main), "wal_autocheckpoint", &250)?; - - // conn.pragma_update(Some(Main), "wal_checkpoint", &"FULL".to_owned())?; - - conn.pragma_update(Some(Main), "synchronous", &"OFF".to_owned())?; + conn.pragma_update(Some(Main), "journal_mode", &"WAL")?; + conn.pragma_update(Some(Main), "synchronous", &"NORMAL")?; if let Some(cache_kib) = cache_size { - conn.pragma_update(Some(Main), "cache_size", &(-Into::::into(cache_kib)))?; + conn.pragma_update(Some(Main), "cache_size", &(-i64::from(cache_kib)))?; } Ok(conn) @@ -193,9 +188,6 @@ impl DatabaseEngine for Engine { config.db_cache_capacity_mb, )?; - pool.write_lock() - .execute("CREATE TABLE IF NOT EXISTS _noop (\"key\" INT)", params![])?; - let arc = Arc::new(Engine { pool, iter_pool: Mutex::new(ThreadPool::new(10)), @@ -205,7 +197,7 @@ impl DatabaseEngine for Engine { } fn open_tree(self: &Arc, name: &str) -> Result> { - self.pool.write_lock().execute(format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name).as_str(), [])?; + self.pool.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; Ok(Arc::new(SqliteTable { engine: Arc::clone(self), @@ -215,37 +207,15 @@ impl DatabaseEngine for Engine { } fn flush(self: &Arc) -> Result<()> { - self.pool - .write_lock() - .execute_batch( - " - PRAGMA synchronous=FULL; - BEGIN; - DELETE FROM _noop; - INSERT INTO _noop VALUES (1); - COMMIT; - PRAGMA synchronous=OFF; - ", - ) - .map_err(Into::into) + // we enabled PRAGMA synchronous=normal, so this should not be necessary + Ok(()) } } impl Engine { pub fn flush_wal(self: &Arc) -> Result<()> { - self.pool - .write_lock() - .execute_batch( - " - PRAGMA synchronous=FULL; PRAGMA wal_checkpoint=TRUNCATE; - BEGIN; - DELETE FROM _noop; - INSERT INTO _noop VALUES (1); - COMMIT; - PRAGMA wal_checkpoint=PASSIVE; PRAGMA synchronous=OFF; - ", - ) - .map_err(Into::into) + self.pool.write_lock().pragma_update(Some(Main), "wal_checkpoint", &"RESTART")?; + Ok(()) } // Reaps (at most) (.len() * `fraction`) (rounded down, min 1) connections.