From 5902083f4d5a18684dc981f67cdee8cc222ffb13 Mon Sep 17 00:00:00 2001 From: Christian Date: Tue, 26 May 2026 13:02:19 -0500 Subject: [PATCH 1/3] Instrument KV operation timing logs --- crates/edgezero-core/src/key_value_store.rs | 131 ++++++++++++++++++-- docs/guide/kv.md | 6 + 2 files changed, 126 insertions(+), 11 deletions(-) diff --git a/crates/edgezero-core/src/key_value_store.rs b/crates/edgezero-core/src/key_value_store.rs index 5169606..6f7f8a6 100644 --- a/crates/edgezero-core/src/key_value_store.rs +++ b/crates/edgezero-core/src/key_value_store.rs @@ -57,6 +57,7 @@ use async_trait::async_trait; use bytes::Bytes; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use web_time::Instant; use crate::error::EdgeError; @@ -415,7 +416,12 @@ impl KvHandle { #[inline] pub async fn delete(&self, key: &str) -> Result<(), KvError> { Self::validate_key(key)?; - self.store.delete(key).await + let started_at = Self::kv_timing_start(); + let result = self.store.delete(key).await; + Self::kv_timing_log(started_at, "delete", &result, || { + format!("key_len={}", key.len()) + }); + result } fn encode_list_cursor(prefix: &str, cursor: Option) -> Result, KvError> { @@ -437,7 +443,16 @@ impl KvHandle { #[inline] pub async fn exists(&self, key: &str) -> Result { Self::validate_key(key)?; - self.store.exists(key).await + let started_at = Self::kv_timing_start(); + let result = self.store.exists(key).await; + Self::kv_timing_log(started_at, "exists", &result, || { + let exists = result + .as_ref() + .map(bool::to_string) + .unwrap_or_else(|_err| "unknown".to_owned()); + format!("key_len={} exists={exists}", key.len()) + }); + result } /// Get a value by key, deserializing from JSON. @@ -449,7 +464,13 @@ impl KvHandle { #[inline] pub async fn get(&self, key: &str) -> Result, KvError> { Self::validate_key(key)?; - match self.store.get_bytes(key).await? { + let started_at = Self::kv_timing_start(); + let result = self.store.get_bytes(key).await; + Self::kv_timing_log(started_at, "get", &result, || { + format!("key_len={} {}", key.len(), Self::kv_hit_metadata(&result)) + }); + + match result? { Some(bytes) => { let val = serde_json::from_slice(&bytes)?; Ok(Some(val)) @@ -465,7 +486,12 @@ impl KvHandle { #[inline] pub async fn get_bytes(&self, key: &str) -> Result, KvError> { Self::validate_key(key)?; - self.store.get_bytes(key).await + let started_at = Self::kv_timing_start(); + let result = self.store.get_bytes(key).await; + Self::kv_timing_log(started_at, "get_bytes", &result, || { + format!("key_len={} {}", key.len(), Self::kv_hit_metadata(&result)) + }); + result } /// Get a value by key, returning `default` if the key does not exist. @@ -477,6 +503,46 @@ impl KvHandle { Ok(self.get(key).await?.unwrap_or(default)) } + fn kv_hit_metadata(result: &Result, KvError>) -> String { + match result.as_ref() { + Ok(Some(bytes)) => format!("hit=true bytes={}", bytes.len()), + Ok(None) => "hit=false bytes=0".to_owned(), + Err(_err) => "hit=unknown bytes=unknown".to_owned(), + } + } + + fn kv_timing_log( + started_at: Option, + operation: &str, + result: &Result, + metadata: Metadata, + ) where + Metadata: FnOnce() -> String, + { + if let Some(started_at) = started_at { + let status = if result.is_ok() { "ok" } else { "error" }; + log::debug!( + "kv operation={operation} elapsed_ms={} status={status} {}", + started_at.elapsed().as_millis(), + metadata() + ); + } + } + + fn kv_timing_start() -> Option { + log::log_enabled!(log::Level::Debug).then(Instant::now) + } + + fn kv_write_metadata(key_len: usize, bytes_len: usize, ttl: Option) -> String { + match ttl { + Some(ttl) => format!( + "key_len={key_len} bytes={bytes_len} ttl_secs={}", + ttl.as_secs() + ), + None => format!("key_len={key_len} bytes={bytes_len}"), + } + } + /// List keys in a bounded, paginated fashion. /// /// The cursor is opaque, prefix-bound, and should be passed back unchanged @@ -496,10 +562,28 @@ impl KvHandle { Self::validate_prefix(prefix)?; Self::validate_list_limit(limit)?; let decoded_cursor = Self::decode_list_cursor(prefix, cursor)?; - let page = self + let started_at = Self::kv_timing_start(); + let result = self .store .list_keys_page(prefix, decoded_cursor.as_deref(), limit) - .await?; + .await; + Self::kv_timing_log(started_at, "list_keys_page", &result, || { + let (count, next_cursor) = result + .as_ref() + .map(|page| { + ( + page.keys.len().to_string(), + page.cursor.is_some().to_string(), + ) + }) + .unwrap_or_else(|_err| ("unknown".to_owned(), "unknown".to_owned())); + format!( + "prefix_len={} cursor_present={} limit={limit} count={count} next_cursor_present={next_cursor}", + prefix.len(), + cursor.is_some() + ) + }); + let page = result?; Ok(KvPage { cursor: Self::encode_list_cursor(prefix, page.cursor)?, @@ -522,7 +606,13 @@ impl KvHandle { Self::validate_key(key)?; let bytes = serde_json::to_vec(value)?; Self::validate_value(&bytes)?; - self.store.put_bytes(key, Bytes::from(bytes)).await + let bytes_len = bytes.len(); + let started_at = Self::kv_timing_start(); + let result = self.store.put_bytes(key, Bytes::from(bytes)).await; + Self::kv_timing_log(started_at, "put", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, None) + }); + result } /// Put raw bytes for a key. @@ -533,7 +623,13 @@ impl KvHandle { pub async fn put_bytes(&self, key: &str, value: Bytes) -> Result<(), KvError> { Self::validate_key(key)?; Self::validate_value(&value)?; - self.store.put_bytes(key, value).await + let bytes_len = value.len(); + let started_at = Self::kv_timing_start(); + let result = self.store.put_bytes(key, value).await; + Self::kv_timing_log(started_at, "put_bytes", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, None) + }); + result } /// Put raw bytes with a TTL. @@ -550,7 +646,13 @@ impl KvHandle { Self::validate_key(key)?; Self::validate_ttl(ttl)?; Self::validate_value(&value)?; - self.store.put_bytes_with_ttl(key, value, ttl).await + let bytes_len = value.len(); + let started_at = Self::kv_timing_start(); + let result = self.store.put_bytes_with_ttl(key, value, ttl).await; + Self::kv_timing_log(started_at, "put_bytes_with_ttl", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, Some(ttl)) + }); + result } /// Put a value with a TTL, serializing it to JSON. @@ -568,9 +670,16 @@ impl KvHandle { Self::validate_ttl(ttl)?; let bytes = serde_json::to_vec(value)?; Self::validate_value(&bytes)?; - self.store + let bytes_len = bytes.len(); + let started_at = Self::kv_timing_start(); + let result = self + .store .put_bytes_with_ttl(key, Bytes::from(bytes), ttl) - .await + .await; + Self::kv_timing_log(started_at, "put_with_ttl", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, Some(ttl)) + }); + result } /// Read-modify-write: get the current value (or `default`), diff --git a/docs/guide/kv.md b/docs/guide/kv.md index c468aac..b82a5e0 100644 --- a/docs/guide/kv.md +++ b/docs/guide/kv.md @@ -109,6 +109,12 @@ For strict correctness, use a transactional data store. Key listing is paginated by design. This avoids buffering an unbounded number of keys in memory and matches the underlying provider APIs. The Spin adapter materialises `Store::get_keys()` and pages client-side; a `max_list_keys` cap (configurable via `EDGEZERO__STORES__KV____MAX_LIST_KEYS`, default `1000`) guards against runaway lists and yields `KvError::LimitExceeded` when exceeded. +## Operation Timing / Observability + +`KvHandle` emits debug-level timing logs for backend KV operations across all adapters. Logs include safe metadata such as operation name, elapsed milliseconds, success/error status, key or prefix length, hit/miss, byte counts, TTL seconds, and list page counts. + +Raw keys, prefixes, cursors, and values are never logged. Typed helper timings measure only the backend call after validation/serialization and before JSON deserialization. `read_modify_write` performs separate read and write calls, so it emits separate operation logs. + ## Platform Specifics ### Local Development From 8bdc491dd329d995856f58f2765736bf526850c0 Mon Sep 17 00:00:00 2001 From: Christian Date: Thu, 11 Jun 2026 12:45:33 -0500 Subject: [PATCH 2/3] Address KV timing review feedback --- crates/edgezero-core/src/key_value_store.rs | 114 ++++++++++++++++---- docs/guide/kv.md | 2 +- 2 files changed, 93 insertions(+), 23 deletions(-) diff --git a/crates/edgezero-core/src/key_value_store.rs b/crates/edgezero-core/src/key_value_store.rs index 6f7f8a6..7479adb 100644 --- a/crates/edgezero-core/src/key_value_store.rs +++ b/crates/edgezero-core/src/key_value_store.rs @@ -446,11 +446,7 @@ impl KvHandle { let started_at = Self::kv_timing_start(); let result = self.store.exists(key).await; Self::kv_timing_log(started_at, "exists", &result, || { - let exists = result - .as_ref() - .map(bool::to_string) - .unwrap_or_else(|_err| "unknown".to_owned()); - format!("key_len={} exists={exists}", key.len()) + Self::kv_exists_metadata(key.len(), &result) }); result } @@ -467,7 +463,7 @@ impl KvHandle { let started_at = Self::kv_timing_start(); let result = self.store.get_bytes(key).await; Self::kv_timing_log(started_at, "get", &result, || { - format!("key_len={} {}", key.len(), Self::kv_hit_metadata(&result)) + Self::kv_read_metadata(key.len(), &result) }); match result? { @@ -489,7 +485,7 @@ impl KvHandle { let started_at = Self::kv_timing_start(); let result = self.store.get_bytes(key).await; Self::kv_timing_log(started_at, "get_bytes", &result, || { - format!("key_len={} {}", key.len(), Self::kv_hit_metadata(&result)) + Self::kv_read_metadata(key.len(), &result) }); result } @@ -503,11 +499,43 @@ impl KvHandle { Ok(self.get(key).await?.unwrap_or(default)) } + fn kv_exists_metadata(key_len: usize, result: &Result) -> String { + match result.as_ref() { + Ok(exists) => format!("key_len={key_len} exists={exists}"), + Err(_err) => format!("key_len={key_len}"), + } + } + fn kv_hit_metadata(result: &Result, KvError>) -> String { match result.as_ref() { Ok(Some(bytes)) => format!("hit=true bytes={}", bytes.len()), Ok(None) => "hit=false bytes=0".to_owned(), - Err(_err) => "hit=unknown bytes=unknown".to_owned(), + Err(_err) => String::new(), + } + } + + fn kv_list_metadata( + prefix_len: usize, + cursor_present: bool, + limit: usize, + result: &Result, + ) -> String { + match result.as_ref() { + Ok(page) => format!( + "prefix_len={prefix_len} cursor_present={cursor_present} limit={limit} count={} next_cursor_present={}", + page.keys.len(), + page.cursor.is_some() + ), + Err(_err) => { + format!("prefix_len={prefix_len} cursor_present={cursor_present} limit={limit}") + } + } + } + + fn kv_read_metadata(key_len: usize, result: &Result, KvError>) -> String { + match result { + Ok(_value) => format!("key_len={key_len} {}", Self::kv_hit_metadata(result)), + Err(_err) => format!("key_len={key_len}"), } } @@ -568,20 +596,7 @@ impl KvHandle { .list_keys_page(prefix, decoded_cursor.as_deref(), limit) .await; Self::kv_timing_log(started_at, "list_keys_page", &result, || { - let (count, next_cursor) = result - .as_ref() - .map(|page| { - ( - page.keys.len().to_string(), - page.cursor.is_some().to_string(), - ) - }) - .unwrap_or_else(|_err| ("unknown".to_owned(), "unknown".to_owned())); - format!( - "prefix_len={} cursor_present={} limit={limit} count={count} next_cursor_present={next_cursor}", - prefix.len(), - cursor.is_some() - ) + Self::kv_list_metadata(prefix.len(), cursor.is_some(), limit, &result) }); let page = result?; @@ -1104,6 +1119,24 @@ mod tests { }); } + #[test] + fn error_metadata_omits_unknown_result_fields() { + let read_result = Err(KvError::Unavailable); + assert_eq!(KvHandle::kv_read_metadata(18, &read_result), "key_len=18"); + + let exists_result = Err(KvError::Unavailable); + assert_eq!( + KvHandle::kv_exists_metadata(18, &exists_result), + "key_len=18" + ); + + let list_result = Err(KvError::Unavailable); + assert_eq!( + KvHandle::kv_list_metadata(4, true, 100, &list_result), + "prefix_len=4 cursor_present=true limit=100" + ); + } + #[test] fn exists_returns_false_after_delete() { let kv = handle(); @@ -1313,6 +1346,43 @@ mod tests { }); } + #[test] + fn read_metadata_logs_lengths_not_raw_key_or_value() { + let key = "super-secret-token"; + let value = Bytes::from_static(b"super-secret-value"); + let result = Ok(Some(value)); + + let metadata = KvHandle::kv_read_metadata(key.len(), &result); + + assert_eq!(metadata, "key_len=18 hit=true bytes=18"); + assert!(!metadata.contains(key)); + assert!(!metadata.contains("super-secret-value")); + } + + #[test] + fn success_metadata_keeps_stable_field_types() { + let read_result = Ok(Some(Bytes::from_static(b"abc"))); + assert_eq!( + KvHandle::kv_read_metadata(1, &read_result), + "key_len=1 hit=true bytes=3" + ); + + let exists_result = Ok(false); + assert_eq!( + KvHandle::kv_exists_metadata(1, &exists_result), + "key_len=1 exists=false" + ); + + let list_result = Ok(KvPage { + cursor: Some("cursor".to_owned()), + keys: vec!["a".to_owned(), "b".to_owned()], + }); + assert_eq!( + KvHandle::kv_list_metadata(4, false, 100, &list_result), + "prefix_len=4 cursor_present=false limit=100 count=2 next_cursor_present=true" + ); + } + #[test] fn typed_get_bad_json_returns_serialization_error() { let kv = handle(); diff --git a/docs/guide/kv.md b/docs/guide/kv.md index b82a5e0..5f58917 100644 --- a/docs/guide/kv.md +++ b/docs/guide/kv.md @@ -113,7 +113,7 @@ Key listing is paginated by design. This avoids buffering an unbounded number of `KvHandle` emits debug-level timing logs for backend KV operations across all adapters. Logs include safe metadata such as operation name, elapsed milliseconds, success/error status, key or prefix length, hit/miss, byte counts, TTL seconds, and list page counts. -Raw keys, prefixes, cursors, and values are never logged. Typed helper timings measure only the backend call after validation/serialization and before JSON deserialization. `read_modify_write` performs separate read and write calls, so it emits separate operation logs. +Timing logs are limited to derived metadata such as lengths, counts, booleans, and TTLs rather than raw keys, prefixes, cursors, or values. On Cloudflare Workers, `elapsed_ms` should be treated as approximate because the runtime uses a reduced-resolution monotonic clock. Typed helper timings measure only the backend call after validation/serialization and before JSON deserialization. `read_modify_write` performs separate read and write calls, so it emits separate operation logs. ## Platform Specifics From 935c2874d8f564d2aa03bcfd55f89b8b817236db Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Tue, 30 Jun 2026 16:28:17 -0700 Subject: [PATCH 3/3] Fix clippy shadow_reuse lints in kv timing logs --- crates/edgezero-core/src/key_value_store.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/edgezero-core/src/key_value_store.rs b/crates/edgezero-core/src/key_value_store.rs index 7479adb..b879f38 100644 --- a/crates/edgezero-core/src/key_value_store.rs +++ b/crates/edgezero-core/src/key_value_store.rs @@ -547,11 +547,11 @@ impl KvHandle { ) where Metadata: FnOnce() -> String, { - if let Some(started_at) = started_at { + if let Some(start) = started_at { let status = if result.is_ok() { "ok" } else { "error" }; log::debug!( "kv operation={operation} elapsed_ms={} status={status} {}", - started_at.elapsed().as_millis(), + start.elapsed().as_millis(), metadata() ); } @@ -563,9 +563,9 @@ impl KvHandle { fn kv_write_metadata(key_len: usize, bytes_len: usize, ttl: Option) -> String { match ttl { - Some(ttl) => format!( + Some(duration) => format!( "key_len={key_len} bytes={bytes_len} ttl_secs={}", - ttl.as_secs() + duration.as_secs() ), None => format!("key_len={key_len} bytes={bytes_len}"), }