Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 190 additions & 11 deletions crates/edgezero-core/src/key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
use bytes::Bytes;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use web_time::Instant;

use crate::error::EdgeError;

Expand Down Expand Up @@ -415,7 +416,12 @@
#[inline]
pub async fn delete(&self, key: &str) -> Result<(), KvError> {
Self::validate_key(key)?;
self.store.delete(key).await
let started_at = Self::kv_timing_start();
let result = self.store.delete(key).await;
Self::kv_timing_log(started_at, "delete", &result, || {
format!("key_len={}", key.len())
});
result
}

fn encode_list_cursor(prefix: &str, cursor: Option<String>) -> Result<Option<String>, KvError> {
Expand All @@ -437,7 +443,12 @@
#[inline]
pub async fn exists(&self, key: &str) -> Result<bool, KvError> {
Self::validate_key(key)?;
self.store.exists(key).await
let started_at = Self::kv_timing_start();
let result = self.store.exists(key).await;
Self::kv_timing_log(started_at, "exists", &result, || {
Self::kv_exists_metadata(key.len(), &result)
});
result
}

/// Get a value by key, deserializing from JSON.
Expand All @@ -449,7 +460,13 @@
#[inline]
pub async fn get<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>, KvError> {
Self::validate_key(key)?;
match self.store.get_bytes(key).await? {
let started_at = Self::kv_timing_start();
let result = self.store.get_bytes(key).await;
Self::kv_timing_log(started_at, "get", &result, || {
Self::kv_read_metadata(key.len(), &result)
});

match result? {
Some(bytes) => {
let val = serde_json::from_slice(&bytes)?;
Ok(Some(val))
Expand All @@ -465,7 +482,12 @@
#[inline]
pub async fn get_bytes(&self, key: &str) -> Result<Option<Bytes>, KvError> {
Self::validate_key(key)?;
self.store.get_bytes(key).await
let started_at = Self::kv_timing_start();
let result = self.store.get_bytes(key).await;
Self::kv_timing_log(started_at, "get_bytes", &result, || {
Self::kv_read_metadata(key.len(), &result)
});
result
}

/// Get a value by key, returning `default` if the key does not exist.
Expand All @@ -477,6 +499,78 @@
Ok(self.get(key).await?.unwrap_or(default))
}

fn kv_exists_metadata(key_len: usize, result: &Result<bool, KvError>) -> String {
match result.as_ref() {
Ok(exists) => format!("key_len={key_len} exists={exists}"),
Err(_err) => format!("key_len={key_len}"),
}
}

fn kv_hit_metadata(result: &Result<Option<Bytes>, KvError>) -> String {
match result.as_ref() {
Ok(Some(bytes)) => format!("hit=true bytes={}", bytes.len()),
Ok(None) => "hit=false bytes=0".to_owned(),
Err(_err) => String::new(),
}
}

fn kv_list_metadata(
prefix_len: usize,
cursor_present: bool,
limit: usize,
result: &Result<KvPage, KvError>,
) -> String {
match result.as_ref() {
Ok(page) => format!(
"prefix_len={prefix_len} cursor_present={cursor_present} limit={limit} count={} next_cursor_present={}",
page.keys.len(),
page.cursor.is_some()
),
Err(_err) => {
format!("prefix_len={prefix_len} cursor_present={cursor_present} limit={limit}")
}
}
}

fn kv_read_metadata(key_len: usize, result: &Result<Option<Bytes>, KvError>) -> String {
match result {
Ok(_value) => format!("key_len={key_len} {}", Self::kv_hit_metadata(result)),
Err(_err) => format!("key_len={key_len}"),
}
}

fn kv_timing_log<ResultValue, Metadata>(
started_at: Option<Instant>,
operation: &str,
result: &Result<ResultValue, KvError>,
metadata: Metadata,
) where
Metadata: FnOnce() -> String,
{
if let Some(started_at) = started_at {

Check failure on line 550 in crates/edgezero-core/src/key_value_store.rs

View workflow job for this annotation

GitHub Actions / spin wasm clippy

`started_at` is shadowed

Check failure on line 550 in crates/edgezero-core/src/key_value_store.rs

View workflow job for this annotation

GitHub Actions / fastly wasm clippy

`started_at` is shadowed

Check failure on line 550 in crates/edgezero-core/src/key_value_store.rs

View workflow job for this annotation

GitHub Actions / cloudflare wasm clippy

`started_at` is shadowed

Check failure on line 550 in crates/edgezero-core/src/key_value_store.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

`started_at` is shadowed
let status = if result.is_ok() { "ok" } else { "error" };
log::debug!(
"kv operation={operation} elapsed_ms={} status={status} {}",
started_at.elapsed().as_millis(),
metadata()
);
}
}

fn kv_timing_start() -> Option<Instant> {
log::log_enabled!(log::Level::Debug).then(Instant::now)
}

fn kv_write_metadata(key_len: usize, bytes_len: usize, ttl: Option<Duration>) -> String {
match ttl {
Some(ttl) => format!(

Check failure on line 566 in crates/edgezero-core/src/key_value_store.rs

View workflow job for this annotation

GitHub Actions / spin wasm clippy

`ttl` is shadowed

Check failure on line 566 in crates/edgezero-core/src/key_value_store.rs

View workflow job for this annotation

GitHub Actions / fastly wasm clippy

`ttl` is shadowed

Check failure on line 566 in crates/edgezero-core/src/key_value_store.rs

View workflow job for this annotation

GitHub Actions / cloudflare wasm clippy

`ttl` is shadowed

Check failure on line 566 in crates/edgezero-core/src/key_value_store.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

`ttl` is shadowed
"key_len={key_len} bytes={bytes_len} ttl_secs={}",
ttl.as_secs()
),
None => format!("key_len={key_len} bytes={bytes_len}"),
}
}

/// List keys in a bounded, paginated fashion.
///
/// The cursor is opaque, prefix-bound, and should be passed back unchanged
Expand All @@ -496,10 +590,15 @@
Self::validate_prefix(prefix)?;
Self::validate_list_limit(limit)?;
let decoded_cursor = Self::decode_list_cursor(prefix, cursor)?;
let page = self
let started_at = Self::kv_timing_start();
let result = self
.store
.list_keys_page(prefix, decoded_cursor.as_deref(), limit)
.await?;
.await;
Self::kv_timing_log(started_at, "list_keys_page", &result, || {
Self::kv_list_metadata(prefix.len(), cursor.is_some(), limit, &result)
});
let page = result?;

Ok(KvPage {
cursor: Self::encode_list_cursor(prefix, page.cursor)?,
Expand All @@ -522,7 +621,13 @@
Self::validate_key(key)?;
let bytes = serde_json::to_vec(value)?;
Self::validate_value(&bytes)?;
self.store.put_bytes(key, Bytes::from(bytes)).await
let bytes_len = bytes.len();
let started_at = Self::kv_timing_start();
let result = self.store.put_bytes(key, Bytes::from(bytes)).await;
Self::kv_timing_log(started_at, "put", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, None)
});
result
}

/// Put raw bytes for a key.
Expand All @@ -533,7 +638,13 @@
pub async fn put_bytes(&self, key: &str, value: Bytes) -> Result<(), KvError> {
Self::validate_key(key)?;
Self::validate_value(&value)?;
self.store.put_bytes(key, value).await
let bytes_len = value.len();
let started_at = Self::kv_timing_start();
let result = self.store.put_bytes(key, value).await;
Self::kv_timing_log(started_at, "put_bytes", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, None)
});
result
}

/// Put raw bytes with a TTL.
Expand All @@ -550,7 +661,13 @@
Self::validate_key(key)?;
Self::validate_ttl(ttl)?;
Self::validate_value(&value)?;
self.store.put_bytes_with_ttl(key, value, ttl).await
let bytes_len = value.len();
let started_at = Self::kv_timing_start();
let result = self.store.put_bytes_with_ttl(key, value, ttl).await;
Self::kv_timing_log(started_at, "put_bytes_with_ttl", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, Some(ttl))
});
result
}

/// Put a value with a TTL, serializing it to JSON.
Expand All @@ -568,9 +685,16 @@
Self::validate_ttl(ttl)?;
let bytes = serde_json::to_vec(value)?;
Self::validate_value(&bytes)?;
self.store
let bytes_len = bytes.len();
let started_at = Self::kv_timing_start();
let result = self
.store
.put_bytes_with_ttl(key, Bytes::from(bytes), ttl)
.await
.await;
Self::kv_timing_log(started_at, "put_with_ttl", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, Some(ttl))
});
result
}

/// Read-modify-write: get the current value (or `default`),
Expand Down Expand Up @@ -995,6 +1119,24 @@
});
}

#[test]
fn error_metadata_omits_unknown_result_fields() {
let read_result = Err(KvError::Unavailable);
assert_eq!(KvHandle::kv_read_metadata(18, &read_result), "key_len=18");

let exists_result = Err(KvError::Unavailable);
assert_eq!(
KvHandle::kv_exists_metadata(18, &exists_result),
"key_len=18"
);

let list_result = Err(KvError::Unavailable);
assert_eq!(
KvHandle::kv_list_metadata(4, true, 100, &list_result),
"prefix_len=4 cursor_present=true limit=100"
);
}

#[test]
fn exists_returns_false_after_delete() {
let kv = handle();
Expand Down Expand Up @@ -1204,6 +1346,43 @@
});
}

#[test]
fn read_metadata_logs_lengths_not_raw_key_or_value() {
let key = "super-secret-token";
let value = Bytes::from_static(b"super-secret-value");
let result = Ok(Some(value));

let metadata = KvHandle::kv_read_metadata(key.len(), &result);

assert_eq!(metadata, "key_len=18 hit=true bytes=18");
assert!(!metadata.contains(key));
assert!(!metadata.contains("super-secret-value"));
}

#[test]
fn success_metadata_keeps_stable_field_types() {
let read_result = Ok(Some(Bytes::from_static(b"abc")));
assert_eq!(
KvHandle::kv_read_metadata(1, &read_result),
"key_len=1 hit=true bytes=3"
);

let exists_result = Ok(false);
assert_eq!(
KvHandle::kv_exists_metadata(1, &exists_result),
"key_len=1 exists=false"
);

let list_result = Ok(KvPage {
cursor: Some("cursor".to_owned()),
keys: vec!["a".to_owned(), "b".to_owned()],
});
assert_eq!(
KvHandle::kv_list_metadata(4, false, 100, &list_result),
"prefix_len=4 cursor_present=false limit=100 count=2 next_cursor_present=true"
);
}

#[test]
fn typed_get_bad_json_returns_serialization_error() {
let kv = handle();
Expand Down
6 changes: 6 additions & 0 deletions docs/guide/kv.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ For strict correctness, use a transactional data store.

Key listing is paginated by design. This avoids buffering an unbounded number of keys in memory and matches the underlying provider APIs. The Spin adapter materialises `Store::get_keys()` and pages client-side; a `max_list_keys` cap (configurable via `EDGEZERO__STORES__KV__<ID>__MAX_LIST_KEYS`, default `1000`) guards against runaway lists and yields `KvError::LimitExceeded` when exceeded.

## Operation Timing / Observability

`KvHandle` emits debug-level timing logs for backend KV operations across all adapters. Logs include safe metadata such as operation name, elapsed milliseconds, success/error status, key or prefix length, hit/miss, byte counts, TTL seconds, and list page counts.
Comment thread
ChristianPavilonis marked this conversation as resolved.

Timing logs are limited to derived metadata such as lengths, counts, booleans, and TTLs rather than raw keys, prefixes, cursors, or values. On Cloudflare Workers, `elapsed_ms` should be treated as approximate because the runtime uses a reduced-resolution monotonic clock. Typed helper timings measure only the backend call after validation/serialization and before JSON deserialization. `read_modify_write` performs separate read and write calls, so it emits separate operation logs.

## Platform Specifics

### Local Development
Expand Down
Loading