Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 9 additions & 19 deletions rust_snuba/src/processors/eap_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ use crate::config::ProcessorConfig;
use crate::processors::utils::enforce_retention;
use crate::types::{InsertBatch, ItemTypeMetrics, KafkaMessageMetadata};

use crate::runtime_config::get_str_config;

const INSERT_ARRAYS_CONFIG: &str = "eap_items_consumer_insert_arrays";

pub fn process_message(
msg: KafkaPayload,
_metadata: KafkaMessageMetadata,
Expand Down Expand Up @@ -106,16 +102,7 @@ impl TryFrom<TraceItem> for EAPItem {
Some(Value::DoubleValue(double)) => eap_item.attributes.insert_float(key, double),
Some(Value::IntValue(int)) => eap_item.attributes.insert_int(key, int),
Some(Value::BoolValue(bool)) => eap_item.attributes.insert_bool(key, bool),
Some(Value::ArrayValue(array)) => {
if get_str_config(INSERT_ARRAYS_CONFIG)
.ok()
.flatten()
.unwrap_or("0".to_string())
== "1"
{
eap_item.attributes.insert_array(key, array)
}
}
Some(Value::ArrayValue(array)) => eap_item.attributes.insert_array(key, array),
Some(Value::BytesValue(_)) => (),
Some(Value::KvlistValue(_)) => (),
None => (),
Expand Down Expand Up @@ -177,17 +164,22 @@ enum EAPValue {
seq_attrs! {
#[derive(Debug, Default, Serialize)]
struct AttributeMap {
#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes_bool: HashMap<String, bool>,

#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes_int: HashMap<String, i64>,

#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes_array: HashMap<String, Vec<EAPValue>>,

#(
#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes_string_~N: HashMap<String, String>,

#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes_float_~N: HashMap<String, f64>,
)*

attributes_array: HashMap<String, Vec<EAPValue>>,
}
}

Expand Down Expand Up @@ -230,6 +222,7 @@ impl AttributeMap {

pub fn insert_array(&mut self, k: String, v: ArrayValue) {
let mut values: Vec<EAPValue> = Vec::default();

for value in v.values {
match value.value {
Some(Value::StringValue(string)) => values.push(EAPValue::String(string)),
Expand All @@ -251,7 +244,6 @@ impl AttributeMap {
mod tests {
use std::time::SystemTime;

use crate::runtime_config::patch_str_config_for_test;
use prost_types::Timestamp;
use sentry_protos::snuba::v1::any_value::Value;
use sentry_protos::snuba::v1::{AnyValue, ArrayValue, TraceItemType};
Expand Down Expand Up @@ -453,8 +445,6 @@ mod tests {
},
);

patch_str_config_for_test(INSERT_ARRAYS_CONFIG, Some("1"));

let eap_item = EAPItem::try_from(trace_item);

assert!(eap_item.is_ok());
Expand Down
137 changes: 109 additions & 28 deletions snuba/web/rpc/v1/endpoint_get_trace.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import random
import uuid
from datetime import datetime
Expand All @@ -12,7 +13,11 @@
GetTraceResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
Array,
AttributeKey,
AttributeValue,
)
from sentry_protos.snuba.v1.trace_item_filter_pb2 import (
AndFilter,
ComparisonFilter,
Expand Down Expand Up @@ -58,6 +63,8 @@
"project_id",
"trace_id",
"sampling_factor",
"attributes_bool",
"attributes_int",
]
APPLY_FINAL_ROLLOUT_PERCENTAGE_CONFIG_KEY = "EndpointGetTrace.apply_final_rollout_percentage"

Expand Down Expand Up @@ -204,21 +211,29 @@ def _build_query(
else:
selected_columns += [
SelectedExpression(
name=("attributes_string"),
name="attributes_string",
expression=FunctionCall(
("attributes_string"),
"mapConcat",
tuple(column(f"attributes_string_{i}") for i in range(40)),
),
),
SelectedExpression(
name=("attributes_float"),
name="attributes_float",
expression=FunctionCall(
("attributes_float"),
"mapConcat",
tuple(column(f"attributes_float_{i}") for i in range(40)),
),
),
SelectedExpression(
name="attributes_array",
expression=FunctionCall(
"attributes_array",
"toJSONString",
(column("attributes_array"),),
),
),
]
selected_columns.extend(
map(
Expand Down Expand Up @@ -363,46 +378,80 @@ def _build_snuba_request(
)


def convert_to_attribute_value(value: Any) -> AttributeValue:
if isinstance(value, bool):
return AttributeValue(
val_bool=value,
)
elif isinstance(value, int):
return AttributeValue(
val_int=value,
)
elif isinstance(value, float):
return AttributeValue(
val_double=value,
)
elif isinstance(value, str):
return AttributeValue(
val_str=value,
)
elif isinstance(value, list):
return AttributeValue(
val_array=Array(values=[convert_to_attribute_value(v) for v in value])
)
elif isinstance(value, datetime):
return AttributeValue(
val_double=value.timestamp(),
)
else:
raise BadSnubaRPCRequestException(f"data type unknown: {type(value)}")


def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeValue]:
if isinstance(value, int):
if isinstance(value, bool):
return (
AttributeKey(
name=key,
type=AttributeKey.Type.TYPE_INT,
type=AttributeKey.Type.TYPE_BOOLEAN,
),
AttributeValue(
val_int=value,
convert_to_attribute_value(value),
)
elif isinstance(value, int):
return (
AttributeKey(
name=key,
type=AttributeKey.Type.TYPE_INT,
),
convert_to_attribute_value(value),
)
elif isinstance(value, float):
return (
AttributeKey(
name=key,
type=AttributeKey.Type.TYPE_DOUBLE,
),
AttributeValue(
val_double=value,
),
convert_to_attribute_value(value),
)
elif isinstance(value, str):
return (
AttributeKey(
name=key,
type=AttributeKey.Type.TYPE_STRING,
),
AttributeValue(
val_str=value,
),
convert_to_attribute_value(value),
)
elif isinstance(value, list):
return (
AttributeKey(name=key, type=AttributeKey.Type.TYPE_ARRAY),
convert_to_attribute_value(value),
)
elif isinstance(value, datetime):
return (
AttributeKey(
name=key,
type=AttributeKey.Type.TYPE_DOUBLE,
),
AttributeValue(
val_double=value.timestamp(),
),
convert_to_attribute_value(value),
)
else:
raise BadSnubaRPCRequestException(f"data type unknown: {type(value)}")
Expand All @@ -418,6 +467,25 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa
)


def _transform_array_value(value: dict[str, str]) -> Any:
for t, v in value.items():
if t == "Int":
return int(v)
if t == "Double":
return float(v)
if t in {"String", "Bool"}:
return v
raise BadSnubaRPCRequestException(f"array value type unknown: {type(v)}")


def _process_arrays(raw: str) -> dict[str, list[Any]]:
parsed = json.loads(raw) or {}
arrays = {}
for key, values in parsed.items():
arrays[key] = [_transform_array_value(v) for v in values]
return arrays


def _process_results(
data: Iterable[Dict[str, Any]],
) -> ProcessedResults:
Expand All @@ -433,6 +501,11 @@ def _process_results(
for row in data:
id = row.pop("id")
ts = row.pop("timestamp")
arrays = row.pop("attributes_array", "{}") or "{}"
# We want to merge these values after to overwrite potential floats
# with the same name.
booleans = row.pop("attributes_bool", {}) or {}
integers = row.pop("attributes_int", {}) or {}
last_seen_timestamp_precise = float(ts)
last_seen_id = id

Expand All @@ -441,29 +514,37 @@ def _process_results(
# then transform to nanoseconds
timestamp.FromNanoseconds(int(ts * 1e6) * 1000)

attributes: list[GetTraceResponse.Item.Attribute] = []
attributes: dict[str, GetTraceResponse.Item.Attribute] = {}

def add_attribute(key: str, value: Any) -> None:
attribute_key, attribute_value = _value_to_attribute(key, value)
attributes.append(
GetTraceResponse.Item.Attribute(
key=attribute_key,
value=attribute_value,
)
attributes[key] = GetTraceResponse.Item.Attribute(
key=attribute_key,
value=attribute_value,
)

for key, value in row.items():
if isinstance(value, dict):
for k, v in value.items():
add_attribute(k, v)
for row_key, row_value in row.items():
if isinstance(row_value, dict):
for column_key, column_value in row_value.items():
add_attribute(column_key, column_value)
else:
add_attribute(key, value)
add_attribute(row_key, row_value)

attributes_array = _process_arrays(arrays)
for array_key, array_value in attributes_array.items():
add_attribute(array_key, array_value)

for bool_key, bool_value in booleans.items():
add_attribute(bool_key, bool_value)

for int_key, int_value in integers.items():
add_attribute(int_key, int_value)

item = GetTraceResponse.Item(
id=id,
timestamp=timestamp,
attributes=sorted(
attributes,
attributes.values(),
key=attrgetter("key.name"),
),
)
Expand Down
12 changes: 12 additions & 0 deletions tests/datasets/configuration/test_storage_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Any

from snuba.clickhouse.columns import (
JSON,
Array,
Bool,
Column,
Expand Down Expand Up @@ -196,6 +197,13 @@ def test_column_parser(self) -> None:
"schema_modifiers": ["nullable"],
},
},
{
"name": "json_col",
"type": "JSON",
"args": {
"max_dynamic_paths": 128,
},
},
]

expected_python_columns = [
Expand All @@ -222,6 +230,10 @@ def test_column_parser(self) -> None:
SchemaModifiers(nullable=True, readonly=False),
),
),
Column(
"json_col",
JSON(max_dynamic_paths=128),
),
]

assert parse_columns(serialized_columns) == expected_python_columns
Loading
Loading