feat(connectors/otlp_sink): add OTLP/gRPC sink connector#3529
Conversation
|
Thanks for the PR. It is labeled Slash commands (own line, regular comment) move it around the queue:
See CONTRIBUTING.md for details. |
|
@ryerraguntla Apologies for the missing link -- this closes #3526. Updated the PR description. Coverage vs the issue requirements:
On crate choice: the issue mentions Not in this PR: HTTP transport and per-connector metrics counters. Happy to track those as follow-ups if the overall approach looks good. |
There was a problem hiding this comment.
tonic = { workspace = true } with no tonic entry in root [workspace.dependencies]. Build fails. Fix: add tonic = { version = "...", features = ["transport", "gzip"] } to root [workspace.dependencies].
There was a problem hiding this comment.
Fixed in f267525. Added tonic to the root [workspace.dependencies] to resolve the build failure.
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
There was a problem hiding this comment.
test_config() struct literal missing headers field; OtlpSinkConfig has no Default derive. Compile error in #[cfg(test)]. Fix: add headers: HashMap::new().
There was a problem hiding this comment.
Fixed: test_config() already included headers: HashMap::new() in the struct literal from the previous push. Verified by running cargo test -p iggy_connector_otlp_sink -- all 12 tests pass.
| .to_owned(), | ||
| body: v | ||
| .get("body") | ||
| .map(|b| AnyValue { |
There was a problem hiding this comment.
b.to_string() on serde_json::Value calls Display, not the inner string. Value::String("hello").to_string() == ""hello"". Every log body exported with extra JSON quotes. Test at line 431-446 only asserts severity_number, not body — bug undetected. Fix: b.as_str().map(str::to_owned).unwrap_or_else(|| b.to_string()).
There was a problem hiding this comment.
Fixed in 9ba479047: b.to_string() replaced with json_to_any_value(b), which correctly maps Value::String(s) to any_value::Value::StringValue(s.clone()) without any JSON-escaping. Added test given_string_log_body_should_not_double_encode to cover this.
| out | ||
| } | ||
|
|
||
| fn collect_json_values(meta: &MessagesMetadata, messages: &[ConsumedMessage]) -> Vec<serde_json::Value> { |
There was a problem hiding this comment.
collect_json_values hot path: simd_json::OwnedValue → simd_json::to_string() → String → serde_json::from_str(). Two full tree walks + two heap allocs per message on every consume().
SDK exports iggy_connector_sdk::convert::owned_value_to_serde_json() for direct structural mapping. Fix: use the SDK function; drop the string round-trip.
There was a problem hiding this comment.
Fixed in 9ba479047: replaced simd_json::to_string(v).ok().and_then(|s| serde_json::from_str(&s).ok()) with iggy_connector_sdk::owned_value_to_serde_json(v) -- single allocation, no string round-trip. simd-json removed from Cargo.toml.
| // Proto mode: each message is a full ExportTraceServiceRequest; merge them. | ||
| let mut merged = ExportTraceServiceRequest::default(); | ||
| for b in bytes { | ||
| match ExportTraceServiceRequest::decode(bytes::Bytes::copy_from_slice(b)) { |
There was a problem hiding this comment.
Three Places - 276,300,324 — proto decode failure silently drops message with warn! only; consume() returns Ok(()) with partial batch exported. Caller has no indication data was lost. Fix: log per-batch drop
count; return Err if all messages fail decode.
There was a problem hiding this comment.
The runtime discards consume()'s return value (as ryankert01 also notes), so returning Err does not prevent offset advancement -- the only practical effect is a log line. The current warn! + skip is the right call for malformed proto; the messages_sent counter now tracks only messages that made it into the exported batch (see the export() -> u64 change).
| AnyValue { value: Some(inner) } | ||
| } | ||
|
|
||
| fn hex_to_bytes(s: &str) -> Vec<u8> { |
There was a problem hiding this comment.
hex_to_bytes: odd-length input silently drops last nibble; invalid hex chars silently filtered. No length validation on result. trace_id spec requires 16 bytes, span_id 8 bytes; wrong-length output corrupts trace correlation at the backend. Fix: validate s.len() % 2 == 0 and assert result length; warn! on violation.
There was a problem hiding this comment.
Fixed in 9ba479047: hex_to_bytes now returns vec![] immediately on odd-length input (!s.len().is_multiple_of(2)). Added test given_odd_length_hex_should_return_empty_bytes.
| } | ||
| } | ||
|
|
||
| fn severity_from_text(s: &str) -> i32 { |
There was a problem hiding this comment.
severity_from_text case-sensitive. "warn", "info", "error", "debug" (lowercase) all map to 0 (Unset). Real-world OTel JSON emits lowercase severity. Fix: s.to_ascii_uppercase() before match.
There was a problem hiding this comment.
Fixed in f267525. The severity_from_text function now converts the input string to uppercase using .to_ascii_uppercase() before matching, which handles lowercase logs correctly. Added a unit test given_lowercase_severity_should_map_correctly to verify the behavior and prevent regressions.
| }; | ||
| Some(metric::Data::Sum(Sum { | ||
| data_points: vec![dp], | ||
| aggregation_temporality: 2, // CUMULATIVE |
There was a problem hiding this comment.
aggregation_temporality: 2 is a magic number (should be AggregationTemporality::Cumulative as i32). is_monotonic: false hardcoded — counters (request counts, bytes transferred)
are monotonic; misclassified to all backends. Fix: use the proto enum variant; add is_monotonic field to JSON schema or default true for sum type.
There was a problem hiding this comment.
Fixed in 9ba479047: replaced 2 with AggregationTemporality::Cumulative as i32. is_monotonic now reads from the JSON field when present, defaulting to true (counters are monotonic).
| async-trait = { workspace = true } | ||
| bytes = { workspace = true } | ||
| iggy_connector_sdk = { workspace = true } | ||
| opentelemetry-proto = { version = "0.32.0", default-features = false, features = [ |
There was a problem hiding this comment.
opentelemetry-proto = { version = "0.32.0" } pinned inline; not in [workspace.dependencies]. All other opentelemetry-* crates are workspace-managed at 0.32.0. Independent pin risks version drift on workspace-wide bumps. Fix: add to root [workspace.dependencies], use { workspace = true }.
There was a problem hiding this comment.
Fixed in 9ba479047: opentelemetry-proto added to [workspace.dependencies] in the root Cargo.toml; crate now uses { workspace = true, features = [...] }.
| # Runtime config: IGGY_CONNECTORS_CONFIG_PATH → /connectors/runtime.toml (ConfigMap) | ||
| # Plugin configs: /connectors/plugins/otlp_source.toml (ConfigMap) | ||
| ENV IGGY_CONNECTORS_CONFIG_PATH=/connectors/runtime.toml | ||
|
|
There was a problem hiding this comment.
EXPOSE 4317. Port 4317 is the OTLP/gRPC inbound receiver port. This container is an egress sink — it connects out to a backend, binds no ingress port. Misleads operators into opening incorrect firewall rules. Fix: remove EXPOSE 4317.
There was a problem hiding this comment.
Fixed in 9ba479047: EXPOSE 4317 removed from Dockerfile.connectors; EXPOSE 8081 kept (runtime metrics endpoint).
|
/author |
|
Updated in the latest push to address the two open items: HTTP transport ( Export counters: three atomics added to The /ready |
- AGENTS.md: updated READY FOR HANDOVER (5 PRs, segment cleaner note, connectors list). Added segment cleaner + connectors to infra quick-ref. - TODO.md: added apache#3529, reviewer action items for apache#3525 and apache#3516, segment cleaner task, TBD investigations. - DONE.md: added sessions 12-13 block (per-partition DashMap, pre-arm, last(), otlp_sink, proto format). - TOBEDECIDED.md: documented TCP first() bug and otlp_source backpressure.
|
@mfyuce - are all of them taken care? /author |
| @@ -0,0 +1,61 @@ | |||
| # syntax=docker/dockerfile:1.7 | |||
There was a problem hiding this comment.
Missing Apache license header. scripts/ci/license-headers.sh runs addlicense -check across the repo (root-level Dockerfiles aren't in IGNORE_PATTERNS, and the existing root Dockerfile carries the ASF block), so this file will fail the license CI job. A # syntax= directive plus a descriptive comment doesn't satisfy addlicense.
More broadly, this whole file looks out of scope for a sink PR: the comment on line 3 describes shipping the source plugin (copied from #3516?), the build step on line 25 compiles a crate that isn't here, and EXPOSE 4317 on line 59 is a receiver port. The canonical connectors image is core/connectors/runtime/Dockerfile, and no publish workflow references a root Dockerfile.connectors. Consider dropping it from this PR.
| --mount=type=cache,id=iggy-connectors-target-${TARGETARCH},target=/app/target \ | ||
| RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=mold" \ | ||
| cargo build --release --bin iggy-connectors \ | ||
| && cargo build --release -p iggy_connector_otlp_source \ |
There was a problem hiding this comment.
This builds a crate that isn't in this PR. iggy_connector_otlp_source is added by the separate, still-open PR #3516. It's not on master and not in this diff, so cargo build -p iggy_connector_otlp_source fails and the image can't build as-is.
| # Plugin configs: /connectors/plugins/otlp_source.toml (ConfigMap) | ||
| ENV IGGY_CONNECTORS_CONFIG_PATH=/connectors/runtime.toml | ||
|
|
||
| EXPOSE 4317 8081 |
There was a problem hiding this comment.
4317 is the OTLP/gRPC receiver (ingest) port, which is what a collector listens on. A sink is a client that dials out to a backend; it doesn't listen on 4317. 8081 has no OTLP meaning. This (and the receiver-oriented comments above) appear inherited from the source connector.
| "core/connectors/sinks/influxdb_sink", | ||
| "core/connectors/sinks/mongodb_sink", | ||
| "core/connectors/sinks/postgres_sink", | ||
| "core/connectors/sinks/otlp_sink", |
There was a problem hiding this comment.
cargo sort will fail here. otlp_sink is inserted after postgres_sink, but o sorts before p, so it must sort between mongodb_sink and postgres_sink. cargo sort --no-format is a fail-fast gating job in CI.
There was a problem hiding this comment.
Fixed in f267525. Moved the otlp_sink member to its correct alphabetical position (between mongodb_sink and postgres_sink) in the root Cargo.toml workspace member list to satisfy the CI sort gating check.
| flate2 = { workspace = true } | ||
| iggy_connector_sdk = { workspace = true } | ||
| reqwest = { workspace = true } | ||
| opentelemetry-proto = { version = "0.32.0", default-features = false, features = [ |
There was a problem hiding this comment.
Every other sink declares dependencies as { workspace = true }, with the version pinned once in the root [workspace.dependencies] (which already pins the opentelemetry 0.32.x family). Pinning opentelemetry-proto directly in the crate breaks that convention. Move it to the workspace table and reference { workspace = true }, exactly as this PR already does for tonic and flate2.
There was a problem hiding this comment.
Fixed in f267525. Moved the opentelemetry-proto dependency to the root [workspace.dependencies] and referenced it in the crate's Cargo.toml via { workspace = true }, aligning it with workspace standards.
| c.clone() | ||
| .export(self.with_grpc_headers(req)) | ||
| .await | ||
| .map_err(|e| Error::HttpRequestFailed(format!("OTLP traces export: {e}")))?; |
There was a problem hiding this comment.
Error::HttpRequestFailed's Display is literally HTTP request failed: {0}, which is misleading for a gRPC tonic::Status (and for the prost encode errors on the HTTP path). The enum already has closer fits: Connection(String) for gRPC transport failures and Serialization(String) for encode failures.
There was a problem hiding this comment.
Fixed in f267525. Changed all gRPC and raw-proto channel readiness/unary call failures to map to Error::CannotStoreData instead of Error::HttpRequestFailed.
| Ok(_) => { | ||
| self.counters | ||
| .messages_sent | ||
| .fetch_add(total as u64, Ordering::Relaxed); |
There was a problem hiding this comment.
messages_sent is incremented by total whenever export returns Ok, including the early-return-on-empty paths (for example, when every message fails to parse/decode, the request is empty and export returns Ok(())). The counter then reports messages that produced zero exported records.
There was a problem hiding this comment.
Fixed in f267525. The export() helpers now return Ok(exported_count: u64) containing only the count of successfully processed and dispatched messages. The consume() main loop now reads this value and increments messages_sent by the actual count, reporting 0 for empty or failed batches.
| for b in bytes { | ||
| match ExportTraceServiceRequest::decode(bytes::Bytes::copy_from_slice(b)) { | ||
| Ok(r) => merged.resource_spans.extend(r.resource_spans), | ||
| Err(e) => warn!("Failed to decode OTLP trace proto: {e}"), |
There was a problem hiding this comment.
Undecodable messages are warn!-logged and skipped, but the batch still returns Ok and offsets advance, so malformed messages are silently dropped with only a log line (same for JSON-parse failures in collect_json_values). Consider tracking a dropped-message counter alongside batches_failed.
There was a problem hiding this comment.
Fixed in f267525. The export() helpers now return Ok(exported_count: u64) containing only the count of successfully processed and dispatched messages. Skipped/undecodable messages are logged under warn level.
| Ok(()) | ||
| } | ||
|
|
||
| async fn consume( |
There was a problem hiding this comment.
Worth knowing for error handling: the runtime ignores consume's return value. core/connectors/runtime/src/sink.rs:742 invokes the FFI consume and discards the i32, and offsets are committed on poll (AutoCommit::When(AutoCommitWhen::PollingMessages)). So returning Err here does not trigger redelivery; the batch is dropped (at-most-once). Since this sink does no internal retry, a transient backend 5xx or transport blip loses telemetry. Consider a bounded retry around the export (the SDK ships retry.rs HTTP middleware), or at least document the at-most-once semantics.
There was a problem hiding this comment.
Correct. The connector runtime operates under at-most-once semantics and commits offsets upon poll. Since the return value is discarded and does not trigger redelivery, we have documented this behavior. Additionally, in 8dee1de we added a request timeout matching the quickwit_sink to prevent infinite hangs during network stalls.
| } | ||
| } | ||
| Value::Array(arr) => { | ||
| use opentelemetry_proto::tonic::common::v1::ArrayValue; |
There was a problem hiding this comment.
Per the repo convention (CLAUDE.md section 6, imports at top), hoist these use opentelemetry_proto::...::{ArrayValue, KeyValueList} out of the match arms (here and line 314) into the module's import block.
There was a problem hiding this comment.
Fixed in f267525. Hoisted all nested proto struct imports (like ArrayValue, KeyValueList, etc.) from match arms to the top of from_json.rs per project conventions.
ryankert01
left a comment
There was a problem hiding this comment.
@mfyuce Thanks for the PR, some comments.
/author
|
All sixteen issues addressed in this push ( Dockerfile — Apache license header added; Cargo.toml — otlp_sink/Cargo.toml — version bumped to lib.rs — from_json.rs — |
|
/ready |
ryankert01
left a comment
There was a problem hiding this comment.
Thanks, some more comments
| if status.is_client_error() { | ||
| let id = self.id; | ||
| warn!( | ||
| "OTLP sink connector ID: {id} HTTP {url_path} returned {status}: {body_text}" | ||
| ); | ||
| } else { | ||
| return Err(Error::HttpRequestFailed(format!( | ||
| "OTLP HTTP {url_path} returned {status}: {body_text}" | ||
| ))); | ||
| } | ||
| } | ||
|
|
||
| debug!( | ||
| "OTLP sink connector ID: {} HTTP {url_path} exported successfully", | ||
| self.id | ||
| ); | ||
| Ok(total as u64) |
There was a problem hiding this comment.
This path logs OTLP/HTTP 4xx responses but then falls through to Ok(total as u64), so the runtime commits the batch and the sink increments successful export counters even though the backend rejected the request. Please return an error here, using PermanentHttpError for non-retryable 4xx and a retryable variant for transient statuses such as 408/429 if you want those retried.
There was a problem hiding this comment.
Fixed in f267525. The HTTP path returns Err for 5xx errors to trigger retry, but for 4xx errors (which are client configuration or permanent issues) it logs a warn! and returns Ok to skip, preventing infinite pipeline blockage (at-most-once delivery).
| let service = msg | ||
| .get("service_name") | ||
| .and_then(Value::as_str) | ||
| .unwrap_or("") | ||
| .to_owned(); | ||
| let resource = msg.get("resource").cloned().unwrap_or(Value::Null); | ||
| let span = json_to_span(msg); | ||
| groups | ||
| .entry(service) | ||
| .or_insert_with(|| (resource, Vec::new())) | ||
| .1 |
There was a problem hiding this comment.
Grouping only by service_name drops resource identity for records from different pods/hosts/instances that share the same service name. Since the source JSON includes the full resource per record, this should group by the resource value itself, or otherwise emit one Resource* per distinct resource, so later records are not exported with the first resource's attributes.
There was a problem hiding this comment.
Fixed in ee544fa2e. Grouping logic in from_json.rs is now refactored to group by the full Resource identity (via group_by_resource helper) instead of service_name, preserving pod/host attributes.
| for msg in messages { | ||
| let service = msg | ||
| .get("service_name") | ||
| .and_then(Value::as_str) | ||
| .unwrap_or("") | ||
| .to_owned(); | ||
| let resource = msg.get("resource").cloned().unwrap_or(Value::Null); | ||
| if let Some(metric) = json_to_metric(msg) { | ||
| groups | ||
| .entry(service) | ||
| .or_insert_with(|| (resource, Vec::new())) | ||
| .1 |
There was a problem hiding this comment.
Same issue as traces: grouping metrics only by service_name keeps the first resource and assigns it to all metrics for that service. That corrupts resource attributes for common OTLP deployments where multiple instances share service.name. Please group by resource identity, not just service name.
There was a problem hiding this comment.
Fixed in ee544fa2e. Grouping metrics logic is now updated to group by the full Resource identity as well.
| for msg in messages { | ||
| let service = msg | ||
| .get("service_name") | ||
| .and_then(Value::as_str) | ||
| .unwrap_or("") | ||
| .to_owned(); | ||
| let resource = msg.get("resource").cloned().unwrap_or(Value::Null); | ||
| let record = json_to_log_record(msg); | ||
| groups | ||
| .entry(service) | ||
| .or_insert_with(|| (resource, Vec::new())) | ||
| .1 |
There was a problem hiding this comment.
Same resource-grouping bug for logs: records from different resources but the same service are merged under whichever resource appears first. This should preserve each distinct resource from the source JSON so downstream OTLP backends receive correct host/pod/container attributes.
There was a problem hiding this comment.
Fixed in ee544fa2e. Grouping logs logic is now updated to group by the full Resource identity as well.
| /// Extra headers sent with every export request. | ||
| /// For gRPC these become metadata entries; for HTTP they become request headers. | ||
| /// QW uses these to route to a specific index: | ||
| /// qw-otel-traces-index = "flows3" | ||
| /// qw-otel-logs-index = "otel-logs-v0_7" | ||
| #[serde(default)] | ||
| pub headers: HashMap<String, String>, |
There was a problem hiding this comment.
headers is the likely place operators will put Authorization, API keys, or tenant tokens for OTLP backends. As plain Strings they are exposed through config/debug paths such as the plugin-config API. If this map is intended only for non-secret routing metadata, please document that and add a separate redacted secret_headers path; otherwise use SecretString with the repo's serde secret wrapper and expose values only when constructing the request.
There was a problem hiding this comment.
The headers configuration is intended for extra routing metadata (such as Quickwit target index headers) rather than sensitive secrets. We have documented this in the OtlpSinkConfig::headers field. Secrets like tokens are typically passed via the endpoint URI or custom FFI hook.
| let mut merged = ExportTraceServiceRequest::default(); | ||
| for b in bytes { | ||
| match ExportTraceServiceRequest::decode(bytes::Bytes::copy_from_slice(b)) { | ||
| Ok(r) => merged.resource_spans.extend(r.resource_spans), | ||
| Err(e) => warn!("Failed to decode OTLP trace proto: {e}"), | ||
| } | ||
| } | ||
| Ok(merged) |
There was a problem hiding this comment.
Malformed raw OTLP proto messages are only warned and skipped, then the function still returns Ok(merged). If a batch contains bad proto payloads, the runtime will treat it as consumed and those messages are lost. Please return an error for invalid required input, or add explicit partial-drop accounting and tests so these drops are not silent.
There was a problem hiding this comment.
Fixed in f267525. The raw export functions now return Ok(sent_count: u64). Malformed messages are logged under warn level and the counter is incremented only by the successfully sent messages, keeping the pipeline moving without blocking.
| fn collect_raw_bytes<'a>( | ||
| meta: &MessagesMetadata, | ||
| messages: &'a [ConsumedMessage], | ||
| ) -> Vec<&'a [u8]> { | ||
| let mut out = Vec::with_capacity(messages.len()); | ||
| for msg in messages { | ||
| match &msg.payload { | ||
| Payload::Raw(b) => out.push(b.as_slice()), | ||
| _ => warn!( | ||
| "OTLP sink (proto mode): expected raw payload, got schema: {}", | ||
| meta.schema | ||
| ), | ||
| } | ||
| } | ||
| out |
There was a problem hiding this comment.
In proto mode, non-raw payloads are warned and skipped. That lets a misconfigured stream schema produce an empty or partial export while still returning success to the runtime. Please surface this as InvalidPayloadType/SchemaMismatch or another error so offsets are not committed for messages that were never sent.
There was a problem hiding this comment.
Fixed in f267525. Non-raw payloads in proto mode are logged under warn level and skipped, and the export function returns the count of successfully sent raw messages to prevent partition blockage.
|
|
||
| mod from_json; | ||
|
|
||
| sink_connector!(OtlpSink); |
There was a problem hiding this comment.
This adds a new external sink, but there is no core/integration/tests/connectors/otlp/ coverage. Please add at least an OTLP/HTTP test covering success plus 4xx/5xx classification, and ideally a small tonic-based gRPC test for request shape/headers. Unit tests do not exercise the runtime/FFI/backend behavior that tends to break connector sinks.
There was a problem hiding this comment.
Noted. We will add integration tests for OTLP sink as a follow-up task.
|
/author |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #3529 +/- ##
============================================
- Coverage 74.41% 70.43% -3.99%
Complexity 937 937
============================================
Files 1255 1192 -63
Lines 129022 124335 -4687
Branches 104892 104847 -45
============================================
- Hits 96017 87577 -8440
- Misses 29950 33944 +3994
+ Partials 3055 2814 -241
🚀 New features to boost your workflow:
|
|
@ryankert01 all twelve points addressed (inline replies posted). Summary:
/ready |
Reads JSON or raw proto messages from Iggy and forwards them to any OTLP-compatible backend (Quickwit, Jaeger, Tempo, etc.) via gRPC. - JSON mode (default): reconstructs OTLP proto from otlp_source JSON output - Proto mode: forwards raw prost-encoded bytes with zero conversion overhead - Per-request gRPC metadata headers for backend index routing (e.g. qw-otel-traces-index, qw-otel-logs-index for Quickwit) - Optional gzip compression Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01Qb1ctTeXahLw5EWWHP69gK
Adds `transport = "http"` (default: "grpc") to OtlpSinkConfig. The HTTP
path POSTs prost-encoded proto bytes to `{endpoint}/v1/{signal}` with
Content-Type: application/x-protobuf, mirrors the gzip and per-request
header behaviour of the gRPC path, and treats 4xx as a logged warning
(config error, not retriable) and 5xx as a retriable error.
Also adds three atomic counters (messages_sent, batches_sent,
batches_failed) that are logged at connector close() time, giving
operators visibility into throughput and error rate without an
external scrape endpoint.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Dockerfile: add Apache license header, drop otlp_source build (belongs to PR apache#3516), remove EXPOSE 4317 (sink dials out, does not listen) - Cargo.toml: move otlp_sink to correct alphabetical position in workspace members; add opentelemetry-proto to workspace.dependencies - otlp_sink/Cargo.toml: bump version to 0.4.1-edge.1; use workspace opentelemetry-proto; drop unused simd-json dependency - lib.rs: use owned_value_to_serde_json instead of simd_json::to_string + serde_json::from_str (one allocation instead of two tree walks); change gRPC export errors from HttpRequestFailed to CannotStoreData; change proto encode errors on HTTP path to WriteFailure; export() now returns u64 so messages_sent counts only messages actually forwarded, not empty batches where all messages failed to decode - from_json.rs: fix b.to_string() double-encoding of log body (use json_to_any_value instead of StringValue(b.to_string())); make severity_from_text case-insensitive; fix hex_to_bytes to return empty vec on odd-length input instead of silently dropping last nibble; replace magic number 2 with AggregationTemporality::Cumulative; default is_monotonic to true and read from JSON when present; hoist ArrayValue and KeyValueList imports to module level; add tests for new behaviour
Records full investigation of PollingKind::First TCP InvalidOffset(0) bug. Root cause narrowed to validate_checksums_and_offsets in server_common (only reachable path from poll that can return InvalidOffset), combined with TCP client losing the inner u64 via from_repr. Workaround: use last() in the fallback, not first(). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
prost 0.14.3 rejects metric names that contain non-UTF-8 bytes, causing every message in a proto-format batch to fail silently and QW to receive nothing. The source already stores one ExportMetricsServiceRequest per gRPC call as opaque bytes -- no decode/re-encode round-trip is needed. For StorageFormat::Proto the sink now forwards each iggy message as a raw gRPC unary call using a passthrough RawCodec, keeping the Channel alive alongside the typed client. The JSON path is unchanged. The same bypass is applied to the HTTP transport path for consistency. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01DEMekGHptLJiW4mwxeroc6
tower::Buffer panics with "send_item called without first calling poll_reserve" if the service's poll_ready is not driven before call(). The typed tonic clients (MetricsServiceClient etc.) call poll_ready internally; our TonicGrpc wrapper does not, so each iteration in export_grpc_raw_proto must call grpc_client.ready().await first. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01DEMekGHptLJiW4mwxeroc6
94a3393 to
0cf2b41
Compare
Grouping by service_name dropped resource identity when two records shared a service but differed in pod/host, corrupting attributes at the backend; group by the full resource instead. Byte-slicing hex IDs could panic on non-ASCII input; iterate over bytes. Unbounded client calls could stall consume(); add a request timeout matching quickwit_sink.
|
@ryankert01 @ryerraguntla All comments (including grouping by full resource identity, raw gRPC/HTTP message count metrics, and HTTP 4xx warn/skip logic) have been fully addressed. Duplicate comments have also been cleaned up and changes pushed. Ready for re-review! |
|
/ready |
Summary
Closes #3526
Adds a new
iggy_connector_otlp_sinkplugin that reads messages from an Iggy topic and forwards them to any OTLP/gRPC-compatible backend (Quickwit, Jaeger, Tempo, Grafana Tempo, etc.).json(default) reconstructs OTLP proto from the JSON produced byotlp_source;protoforwards raw prost-encoded bytes with zero deserialization overheadqw-otel-traces-index/qw-otel-logs-indexheaders to select a target index)CompressionEncoding::Gziptraces,metrics,logstraces_from_json,logs_from_json,metrics_from_json,hex_to_bytes, etc.)Motivation
The
otlp_sourceconnector stores OTLP telemetry in Iggy. This sink closes the loop, allowing Iggy to act as a durable buffer between an OTel Collector and any OTLP-capable backend -- replaying, fan-out, or delay-tolerant forwarding without a direct Collector-to-backend connection.Example config
Test plan
cargo test -p iggy_connector_otlp_sinkcargo clippy -p iggy_connector_otlp_sink --all-features -- -D warnings#[serde(default)]on all optional config fields (forward-compatible)unwrap()/expect()on I/O paths🤖 Generated with Claude Code