Data Analysis Agent

The narrative documentation for the data analysis package — covering the Data Analysis Agent and the two analysis algorithms (anomaly detection and Adtributor drill-down) — lives in the package README, included below:

Data Analysis Package (openchatbi.analysis)

This package provides OpenChatBI’s advanced data analysis capabilities: a specialized Data Analysis Agent that orchestrates analysis tools into multi-step workflows, plus the two analysis algorithms it relies on — time series anomaly detection and multi-dimensional anomaly drill-down (Adtributor).

Layout

openchatbi/analysis/
├── agent.py             # Data Analysis Agent + the `data_analysis` delegation tool
├── anomaly_detection.py # Anomaly detection scoring algorithm (core)
├── adtributor.py        # Adtributor root-cause / drill-down algorithm (core)
└── models.py            # Pydantic output models for the Adtributor algorithm

openchatbi/tool/
├── anomaly_detection.py # `anomaly_detection` tool wrapper (LLM-facing)
└── adtributor_tool.py   # `adtributor_drilldown` tool wrapper (LLM-facing)

openchatbi/prompts/
└── data_analysis_prompt.md  # System prompt with the 5 workflow playbooks

The convention is: algorithm core lives in analysis/, the LLM-facing tool wrapper lives in tool/, and the agent that orchestrates the tools lives in analysis/agent.py.


1. Data Analysis Agent (agent.py)

A specialized sub-agent (built on the deepagents framework) that the main agent delegates complex analysis to via the data_analysis tool. Keeping the orchestration inside a sub-agent isolates the analysis context and prevents the main agent’s context from ballooning with intermediate tool output.

Public API

  • build_data_analysis_agent(sql_graph, sync_mode=False, llm_provider=None, checkpointer=None, memory_store=None) — assembles the tool set and compiles the agent graph.

  • get_data_analysis_tool(...) — wraps the agent in a data_analysis StructuredTool (sync or async) that the main agent registers.

Tool set

Tool

Source

Notes

text2sql

main SQL generation subgraph

reused

timeseries_forecast

openchatbi/tool/timeseries_forecast.py

requires forecast service

anomaly_detection

openchatbi/tool/anomaly_detection.py

requires forecast service

adtributor_drilldown

openchatbi/tool/adtributor_tool.py

no external service

run_python_code

openchatbi/tool/run_python_code.py

no external service

timeseries_forecast and anomaly_detection share the forecasting service health check and are registered together only when the service is healthy.

Supported scenarios (prompt-driven workflows)

Scenario

Typical flow

Single-metric trend forecasting

text2sqltimeseries_forecast → interpret

Single-metric anomaly detection

text2sqlanomaly_detection → interpret

Single-metric anomaly drill-down

text2sql (1D melted table) → adtributor_drilldown → interpret

Multi-metric correlation

text2sqlrun_python_code → interpret

Business combination analysis

text2sqlrun_python_code → interpret

LLM selection

The agent uses the optional analysis_llm configuration when present, otherwise it falls back to default_llm (see openchatbi.llm.llm.get_analysis_llm). This lets you point analysis at a stronger reasoning model without changing the main agent’s model.

Sub-agent isolation

The agent is a separately compiled graph that may share the main agent’s checkpointer. The data_analysis tool therefore derives an isolated child config (_build_sub_agent_config):

  • a deterministic child thread_id ("{parent_thread_id}:data_analysis") so it satisfies LangGraph’s checkpointer requirement, keeps interrupt/resume stable, and never clobbers the main agent’s checkpoint thread;

  • inherited checkpoint_ns / checkpoint_id are cleared so the sub-agent starts from a clean namespace;

  • other config (callbacks, tags, metadata) is propagated unchanged.

GraphInterrupt raised inside the sub-agent is re-raised (not swallowed) so human-in-the-loop interrupts can bubble up to the main graph. Final output is normalized to a string via _extract_final_content, including the case where the model returns multimodal content blocks.


2. Anomaly Detection (anomaly_detection.py)

Scores how anomalous the most recent points of a single-metric time series are, by comparing them against a forecasting baseline. The forecast comes from the time series forecasting service; if it is unavailable, detection fails fast with an error in the details dict.

Output

evaluate_anomalies(...) returns (score, details) where score is in [0, 1] (closer to 1 = more significant / higher-impact anomaly). format_anomaly_report turns it into a human-readable report (used by the tool wrapper). Severity bands: >0.8 Critical, >0.6 High, >0.4 Medium, else Low.

Scoring strategy

The score combines a set of intentionally orthogonal factors so the same evidence is not double-counted:

  • Deviation significance — how far the actual value is from the forecast in robust noise-scale (sigma) units. Significance ramps to 0.5 at 3σ and to 1.0 at 6σ. The noise scale is estimated from first differences with a MAD-based estimator (robust to historical spikes; not inflated by seasonality).

  • Direction weighting — drops vs rises can be weighted differently (drop_weight / rise_weight), since an unexpected drop is often more severe.

  • Volume modulation — anomalies on high-traffic moments matter more. A multiplier in [0.6, 1.0] driven by the expected (predicted) level, so a drop-to-zero is never penalized by it.

  • Historical anomaly frequency — intrinsically noisy/jumpy metrics are dampened (fewer false positives).

  • Duration — a run of consecutive anomalous points near the end of the window boosts the score.

Recent points are weighted more heavily (linear weighting). Per-point sigmas from the forecast service (prediction_std) are used when available, otherwise the historical noise-scale estimate is used.

Input contract

input_data is a list of numbers or dicts; the last evaluation_window points are the points to evaluate, the preceding points are the historical context (used for forecast input and for the noise/volume/frequency factors). input_data length must be greater than evaluation_window.


3. Anomaly Drill-Down / Adtributor (adtributor.py, models.py)

Multi-dimensional root-cause analysis based on Microsoft’s Adtributor algorithm. Given baseline (predict) vs actual (real) values broken down by dimension elements, it finds the dimensions and elements that best explain an anomaly.

Core concepts

  • Surprise — a Jensen-Shannon-divergence-style measure of how much an element’s distribution shifted between predict and real.

  • Explanatory Power (EP) — the share of the overall metric change attributable to an element. For derived (ratio) metrics, EP is computed with the ratio decomposition and normalized to sum to 1.

  • Thresholdstep (cumulative EP threshold to accept a dimension’s top elements as root cause) and teep (per-element EP threshold to be considered), plus k (number of top candidate dimensions to return).

  • additional_check — guards against degenerate explanations (e.g. when the candidate elements already account for ~100% of the value).

Absolute vs derived metrics

  • Absolute metrics use predict / real.

  • Derived (ratio) metrics use predict_numerator / predict_denominator / real_numerator / real_denominator (set derived=True).

Output (AdtributorOutput)

  • root_causes{dimension: [elements]} for the top-k explaining dimensions.

  • ranked_dimensions — all analyzed dimensions sorted by total surprise.

  • dimension_details — per-dimension DimensionResult (EP, surprise, elements, reason).

  • status — one of:

    • success — root cause elements identified;

    • no_root_cause — anomaly is systemic / evenly distributed, no element passed the thresholds;

    • no_anomaly_direction — no element matched the requested issue_type (drop / rise).

Input contract (tool layer)

The adtributor_drilldown tool accepts a 1D melted table (list[dict]) with keys dimension_name, element_value, and the relevant predict/real fields. The tool reshapes it into the per-dimension dict[str, DataFrame] the core algorithm expects, runs the algorithm, and attaches a business-friendly narrative to the result. The prompt instructs the agent to prepare this melted table via text2sql before drilling down.


See also

  • API reference: the docs Tools and Utilities page documents agent.py and the two tool wrappers.

  • Prompt / workflows: openchatbi/prompts/data_analysis_prompt.md.

  • Forecasting service: timeseries_forecasting/README.md.

API Reference

Data Analysis Agent implementation.

openchatbi.analysis.agent.build_data_analysis_agent(sql_graph: CompiledStateGraph, sync_mode: bool = False, llm_provider: str | None = None, checkpointer: BaseCheckpointSaver | None = None, memory_store: BaseStore | None = None) CompiledStateGraph[source]

Build the data analysis agent.

Parameters:
  • sql_graph – Compiled SQL generation graph to use for text2sql tool.

  • sync_mode – Whether to use synchronous mode.

  • llm_provider – LLM provider to use.

  • checkpointer – Checkpointer for state persistence.

  • memory_store – Store for long-term memory.

Returns:

The compiled data analysis agent.

Return type:

CompiledStateGraph

class openchatbi.analysis.agent.DataAnalysisInput(*, reasoning: str, task: str)[source]

Bases: BaseModel

Input schema for data analysis tool.

reasoning: str
task: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

openchatbi.analysis.agent.get_data_analysis_tool(sql_graph: CompiledStateGraph, sync_mode: bool = False, llm_provider: str | None = None, checkpointer: BaseCheckpointSaver | None = None, memory_store: BaseStore | None = None) StructuredTool[source]

Create the data analysis tool that delegates to the data analysis agent.

Parameters:
  • sql_graph – Compiled SQL generation graph.

  • sync_mode – Whether to use synchronous mode.

  • llm_provider – LLM provider to use.

  • checkpointer – Checkpointer for state persistence.

  • memory_store – Store for long-term memory.

Returns:

The data analysis tool.

Return type:

StructuredTool

Core algorithms for time series anomaly detection.

The scoring strategy follows a rule/strategy-based design where each factor is intentionally orthogonal so that they do not double-count the same evidence:

  • Deviation significance: how statistically far the actual value is from the forecast, expressed in robust noise-scale (sigma) units. This is the single “how abnormal” signal and replaces the previously redundant reconstruction-error + bound-violation pair.

  • Direction: drops vs rises can be weighted differently because, for most business metrics, an unexpected drop is more severe than a rise.

  • Volume modulation: anomalies on high-traffic moments matter more than the same relative deviation on low-traffic moments. This is a multiplier driven by the expected (predicted) level, NOT by the anomalous actual value, so a drop-to-zero is never penalised by it.

  • Historical anomaly frequency: noisy/jumpy metrics that violate their own bounds frequently are dampened, reducing false positives.

  • Duration: a run of consecutive anomalous points near the end of the window boosts the score.

The final score is in the range [0, 1]; values closer to 1 indicate a more significant / higher-impact anomaly.

openchatbi.analysis.anomaly_detection.evaluate_anomalies(input_data: list[float | int | dict[str, Any]], evaluation_window: int = 3, frequency: str = 'hourly', target_column: str = 'value', input_length: int | None = None, drop_weight: float = 1.0, rise_weight: float = 1.0, smoothing: float = 0.5) tuple[float, dict[str, Any]][source]

Forecast the evaluation window and score it for anomalies.

The last evaluation_window points of input_data are treated as the points to evaluate; the preceding points are used both as the forecast input and as the historical context for the noise scale, volume and frequency factors.

Parameters:
  • input_data – Time series as numbers or dicts; evaluation points are at the end.

  • evaluation_window – Number of trailing points to evaluate (must be < len(input_data)).

  • frequency – Time series frequency (e.g. ‘hourly’, ‘daily’).

  • target_column – Column to read from structured (dict) data.

  • input_length – Target historical input length for the service; if the supplied history is shorter, the service left-pads the earliest points with zeros to reach this length.

  • drop_weight – Severity multiplier applied to downward deviations.

  • rise_weight – Severity multiplier applied to upward deviations.

Returns:

Tuple of (anomaly score in [0, 1], details dict). On any failure the score is 0.0 and details contains an "error" key.

openchatbi.analysis.anomaly_detection.detect_anomaly_range(input_data: list[float | int | dict[str, Any]], detection_range: int, frequency: str = 'hourly', target_column: str = 'value', input_length: int | None = None, stride: int | None = None, evaluation_window: int | None = None, drop_weight: float = 1.0, rise_weight: float = 1.0, smoothing: float = 0.5, t_high: float = 0.6, t_low: float = 0.4, merge_gap: int = 1, area_min: float = 1.0, peak_high: float = 0.8) tuple[float, dict[str, Any]][source]

Scan a long range for anomalies and summarise it into anomalous intervals.

The trailing detection_range points are scanned; everything before them is used as forecast/context history. The range is forecast in stride blocks (block forecasting), the lower window slides over it to produce a per-point severity curve, and the curve is segmented (hysteresis) and filtered (area / peak) into zero or more anomalous intervals.

Parameters:
  • input_data – Continuous (gap-free) time series; the scan range is at the end.

  • detection_range – Number of trailing points to scan (must be < len(input_data)).

  • frequency – Time series frequency; drives the evaluation_window and stride defaults.

  • target_column – Column to read from structured (dict) data.

  • input_length – Target historical input length for the forecast service per block.

  • stride – Block-forecast horizon; defaults from frequency when None.

  • evaluation_window – Lower-window size; defaults from frequency when None.

  • drop_weight – Severity multiplier applied to downward deviations.

  • rise_weight – Severity multiplier applied to upward deviations.

  • smoothing – Lower-window smoothing strength in [0, 1].

  • t_high – Hysteresis upper threshold to open an interval.

  • t_low – Hysteresis lower threshold to close an interval.

  • merge_gap – Merge intervals separated by <= this many points.

  • area_min – Minimum interval area (sum of severity) to keep it.

  • peak_high – Peak severity that keeps an interval regardless of its area.

Returns:

Tuple of (overall score in [0, 1] = max interval peak, details dict). On any failure the score is 0.0 and details contains an "error" key.

openchatbi.analysis.anomaly_detection.format_anomaly_report(score: float, details: dict[str, Any]) str[source]

Format the anomaly detection result into a human-readable report.

openchatbi.analysis.anomaly_detection.format_anomaly_range_report(score: float, details: dict[str, Any]) str[source]

Format a range (sliding) anomaly detection result into a readable report.

openchatbi.analysis.adtributor.additional_check(dimension: str, df: DataFrame, attr_list: list[Any]) tuple[bool, str][source]
  1. if rc real value proportion ~= 100%, skip this dimension

  2. if all attrs evenly drop, skip this check for now

openchatbi.analysis.adtributor.add_surprise(df: DataFrame, derived: bool, merged_divide: int = 1) DataFrame[source]

Computes the surprise for all elements in the dataframe.

openchatbi.analysis.adtributor.add_explanatory_power(df: DataFrame, derived: bool, issue_type: str = 'drop') DataFrame[source]

Computes the explanatory power for all elements in the dataframe.

openchatbi.analysis.adtributor.merge_dimensions(df: DataFrame, derived: bool) DataFrame[source]
openchatbi.analysis.adtributor.adtributor(derived: bool, df_dict: dict[str, DataFrame], dimension_weights: dict[str, float] | None = None, tep: float = 0.7, teep: float = 0.02, k: int = 1, issue_type: str = 'drop') AdtributorOutput[source]

Analyzes the input data and identifies candidate dimensions for drill-down analysis.

class openchatbi.analysis.models.DimensionResult(*, explanatory_power: float | None = None, total_surprise: float, elements: list[Any] | None = None, surprise: float | None = None, reason: str)[source]

Bases: BaseModel

explanatory_power: float | None
total_surprise: float
elements: list[Any] | None
surprise: float | None
reason: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class openchatbi.analysis.models.AdtributorOutput(*, root_causes: dict[str, list[Any]], ranked_dimensions: list[str], dimension_details: dict[str, DimensionResult], status: str, reason_flag: str = '')[source]

Bases: BaseModel

root_causes: dict[str, list[Any]]
ranked_dimensions: list[str]
dimension_details: dict[str, DimensionResult]
status: str
reason_flag: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].