diff --git a/crates/edgezero-core/src/key_value_store.rs b/crates/edgezero-core/src/key_value_store.rs index 5169606..b879f38 100644 --- a/crates/edgezero-core/src/key_value_store.rs +++ b/crates/edgezero-core/src/key_value_store.rs @@ -57,6 +57,7 @@ use async_trait::async_trait; use bytes::Bytes; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use web_time::Instant; use crate::error::EdgeError; @@ -415,7 +416,12 @@ impl KvHandle { #[inline] pub async fn delete(&self, key: &str) -> Result<(), KvError> { Self::validate_key(key)?; - self.store.delete(key).await + let started_at = Self::kv_timing_start(); + let result = self.store.delete(key).await; + Self::kv_timing_log(started_at, "delete", &result, || { + format!("key_len={}", key.len()) + }); + result } fn encode_list_cursor(prefix: &str, cursor: Option) -> Result, KvError> { @@ -437,7 +443,12 @@ impl KvHandle { #[inline] pub async fn exists(&self, key: &str) -> Result { Self::validate_key(key)?; - self.store.exists(key).await + let started_at = Self::kv_timing_start(); + let result = self.store.exists(key).await; + Self::kv_timing_log(started_at, "exists", &result, || { + Self::kv_exists_metadata(key.len(), &result) + }); + result } /// Get a value by key, deserializing from JSON. @@ -449,7 +460,13 @@ impl KvHandle { #[inline] pub async fn get(&self, key: &str) -> Result, KvError> { Self::validate_key(key)?; - match self.store.get_bytes(key).await? { + let started_at = Self::kv_timing_start(); + let result = self.store.get_bytes(key).await; + Self::kv_timing_log(started_at, "get", &result, || { + Self::kv_read_metadata(key.len(), &result) + }); + + match result? { Some(bytes) => { let val = serde_json::from_slice(&bytes)?; Ok(Some(val)) @@ -465,7 +482,12 @@ impl KvHandle { #[inline] pub async fn get_bytes(&self, key: &str) -> Result, KvError> { Self::validate_key(key)?; - self.store.get_bytes(key).await + let started_at = Self::kv_timing_start(); + let result = self.store.get_bytes(key).await; + Self::kv_timing_log(started_at, "get_bytes", &result, || { + Self::kv_read_metadata(key.len(), &result) + }); + result } /// Get a value by key, returning `default` if the key does not exist. @@ -477,6 +499,78 @@ impl KvHandle { Ok(self.get(key).await?.unwrap_or(default)) } + fn kv_exists_metadata(key_len: usize, result: &Result) -> String { + match result.as_ref() { + Ok(exists) => format!("key_len={key_len} exists={exists}"), + Err(_err) => format!("key_len={key_len}"), + } + } + + fn kv_hit_metadata(result: &Result, KvError>) -> String { + match result.as_ref() { + Ok(Some(bytes)) => format!("hit=true bytes={}", bytes.len()), + Ok(None) => "hit=false bytes=0".to_owned(), + Err(_err) => String::new(), + } + } + + fn kv_list_metadata( + prefix_len: usize, + cursor_present: bool, + limit: usize, + result: &Result, + ) -> String { + match result.as_ref() { + Ok(page) => format!( + "prefix_len={prefix_len} cursor_present={cursor_present} limit={limit} count={} next_cursor_present={}", + page.keys.len(), + page.cursor.is_some() + ), + Err(_err) => { + format!("prefix_len={prefix_len} cursor_present={cursor_present} limit={limit}") + } + } + } + + fn kv_read_metadata(key_len: usize, result: &Result, KvError>) -> String { + match result { + Ok(_value) => format!("key_len={key_len} {}", Self::kv_hit_metadata(result)), + Err(_err) => format!("key_len={key_len}"), + } + } + + fn kv_timing_log( + started_at: Option, + operation: &str, + result: &Result, + metadata: Metadata, + ) where + Metadata: FnOnce() -> String, + { + if let Some(start) = started_at { + let status = if result.is_ok() { "ok" } else { "error" }; + log::debug!( + "kv operation={operation} elapsed_ms={} status={status} {}", + start.elapsed().as_millis(), + metadata() + ); + } + } + + fn kv_timing_start() -> Option { + log::log_enabled!(log::Level::Debug).then(Instant::now) + } + + fn kv_write_metadata(key_len: usize, bytes_len: usize, ttl: Option) -> String { + match ttl { + Some(duration) => format!( + "key_len={key_len} bytes={bytes_len} ttl_secs={}", + duration.as_secs() + ), + None => format!("key_len={key_len} bytes={bytes_len}"), + } + } + /// List keys in a bounded, paginated fashion. /// /// The cursor is opaque, prefix-bound, and should be passed back unchanged @@ -496,10 +590,15 @@ impl KvHandle { Self::validate_prefix(prefix)?; Self::validate_list_limit(limit)?; let decoded_cursor = Self::decode_list_cursor(prefix, cursor)?; - let page = self + let started_at = Self::kv_timing_start(); + let result = self .store .list_keys_page(prefix, decoded_cursor.as_deref(), limit) - .await?; + .await; + Self::kv_timing_log(started_at, "list_keys_page", &result, || { + Self::kv_list_metadata(prefix.len(), cursor.is_some(), limit, &result) + }); + let page = result?; Ok(KvPage { cursor: Self::encode_list_cursor(prefix, page.cursor)?, @@ -522,7 +621,13 @@ impl KvHandle { Self::validate_key(key)?; let bytes = serde_json::to_vec(value)?; Self::validate_value(&bytes)?; - self.store.put_bytes(key, Bytes::from(bytes)).await + let bytes_len = bytes.len(); + let started_at = Self::kv_timing_start(); + let result = self.store.put_bytes(key, Bytes::from(bytes)).await; + Self::kv_timing_log(started_at, "put", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, None) + }); + result } /// Put raw bytes for a key. @@ -533,7 +638,13 @@ impl KvHandle { pub async fn put_bytes(&self, key: &str, value: Bytes) -> Result<(), KvError> { Self::validate_key(key)?; Self::validate_value(&value)?; - self.store.put_bytes(key, value).await + let bytes_len = value.len(); + let started_at = Self::kv_timing_start(); + let result = self.store.put_bytes(key, value).await; + Self::kv_timing_log(started_at, "put_bytes", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, None) + }); + result } /// Put raw bytes with a TTL. @@ -550,7 +661,13 @@ impl KvHandle { Self::validate_key(key)?; Self::validate_ttl(ttl)?; Self::validate_value(&value)?; - self.store.put_bytes_with_ttl(key, value, ttl).await + let bytes_len = value.len(); + let started_at = Self::kv_timing_start(); + let result = self.store.put_bytes_with_ttl(key, value, ttl).await; + Self::kv_timing_log(started_at, "put_bytes_with_ttl", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, Some(ttl)) + }); + result } /// Put a value with a TTL, serializing it to JSON. @@ -568,9 +685,16 @@ impl KvHandle { Self::validate_ttl(ttl)?; let bytes = serde_json::to_vec(value)?; Self::validate_value(&bytes)?; - self.store + let bytes_len = bytes.len(); + let started_at = Self::kv_timing_start(); + let result = self + .store .put_bytes_with_ttl(key, Bytes::from(bytes), ttl) - .await + .await; + Self::kv_timing_log(started_at, "put_with_ttl", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, Some(ttl)) + }); + result } /// Read-modify-write: get the current value (or `default`), @@ -995,6 +1119,24 @@ mod tests { }); } + #[test] + fn error_metadata_omits_unknown_result_fields() { + let read_result = Err(KvError::Unavailable); + assert_eq!(KvHandle::kv_read_metadata(18, &read_result), "key_len=18"); + + let exists_result = Err(KvError::Unavailable); + assert_eq!( + KvHandle::kv_exists_metadata(18, &exists_result), + "key_len=18" + ); + + let list_result = Err(KvError::Unavailable); + assert_eq!( + KvHandle::kv_list_metadata(4, true, 100, &list_result), + "prefix_len=4 cursor_present=true limit=100" + ); + } + #[test] fn exists_returns_false_after_delete() { let kv = handle(); @@ -1204,6 +1346,43 @@ mod tests { }); } + #[test] + fn read_metadata_logs_lengths_not_raw_key_or_value() { + let key = "super-secret-token"; + let value = Bytes::from_static(b"super-secret-value"); + let result = Ok(Some(value)); + + let metadata = KvHandle::kv_read_metadata(key.len(), &result); + + assert_eq!(metadata, "key_len=18 hit=true bytes=18"); + assert!(!metadata.contains(key)); + assert!(!metadata.contains("super-secret-value")); + } + + #[test] + fn success_metadata_keeps_stable_field_types() { + let read_result = Ok(Some(Bytes::from_static(b"abc"))); + assert_eq!( + KvHandle::kv_read_metadata(1, &read_result), + "key_len=1 hit=true bytes=3" + ); + + let exists_result = Ok(false); + assert_eq!( + KvHandle::kv_exists_metadata(1, &exists_result), + "key_len=1 exists=false" + ); + + let list_result = Ok(KvPage { + cursor: Some("cursor".to_owned()), + keys: vec!["a".to_owned(), "b".to_owned()], + }); + assert_eq!( + KvHandle::kv_list_metadata(4, false, 100, &list_result), + "prefix_len=4 cursor_present=false limit=100 count=2 next_cursor_present=true" + ); + } + #[test] fn typed_get_bad_json_returns_serialization_error() { let kv = handle(); diff --git a/docs/guide/kv.md b/docs/guide/kv.md index c468aac..5f58917 100644 --- a/docs/guide/kv.md +++ b/docs/guide/kv.md @@ -109,6 +109,12 @@ For strict correctness, use a transactional data store. Key listing is paginated by design. This avoids buffering an unbounded number of keys in memory and matches the underlying provider APIs. The Spin adapter materialises `Store::get_keys()` and pages client-side; a `max_list_keys` cap (configurable via `EDGEZERO__STORES__KV____MAX_LIST_KEYS`, default `1000`) guards against runaway lists and yields `KvError::LimitExceeded` when exceeded. +## Operation Timing / Observability + +`KvHandle` emits debug-level timing logs for backend KV operations across all adapters. Logs include safe metadata such as operation name, elapsed milliseconds, success/error status, key or prefix length, hit/miss, byte counts, TTL seconds, and list page counts. + +Timing logs are limited to derived metadata such as lengths, counts, booleans, and TTLs rather than raw keys, prefixes, cursors, or values. On Cloudflare Workers, `elapsed_ms` should be treated as approximate because the runtime uses a reduced-resolution monotonic clock. Typed helper timings measure only the backend call after validation/serialization and before JSON deserialization. `read_modify_write` performs separate read and write calls, so it emits separate operation logs. + ## Platform Specifics ### Local Development