Skip to content

API Reference

Complete API documentation for StringSight's main functions and classes.

Main Entry Points

explain()

explain(df, method='single_model', system_prompt=None, prompt_builder=None, task_description=None, *, sample_size=None, model_a=None, model_b=None, score_columns=None, prompt_column='prompt', model_column=None, model_response_column=None, question_id_column=None, model_a_column=None, model_b_column=None, model_a_response_column=None, model_b_response_column=None, model_name='gpt-4.1', temperature=0.7, top_p=0.95, max_tokens=16000, max_workers=DEFAULT_MAX_WORKERS, include_scores_in_prompt=False, prompt_expansion=False, expansion_num_traces=5, expansion_model='gpt-4.1', clusterer='hdbscan', min_cluster_size=5, embedding_model='text-embedding-3-large', prettify_labels=False, assign_outliers=False, summary_model='gpt-4.1', cluster_assignment_model='gpt-4.1-mini', metrics_kwargs=None, use_wandb=True, wandb_project=None, include_embeddings=False, verbose=False, output_dir=None, custom_pipeline=None, extraction_cache_dir=None, clustering_cache_dir=None, metrics_cache_dir=None, progress_callback=None, **kwargs)

Explain model behavior patterns from conversation data.

This is the main entry point for StringSight. It takes a DataFrame of conversations and returns the same data with extracted properties and clusters.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with conversation data

required
method str

"side_by_side" or "single_model"

'single_model'
system_prompt str | None

System prompt for property extraction (if None, will be auto-determined)

None
prompt_builder Callable[[Series, str], str] | None

Optional custom prompt builder function

None
task_description str | None

Optional description of the task; when provided with method="single_model" and no explicit system_prompt, a task-aware system prompt is constructed from single_model_system_prompt_custom. If prompt_expansion=True, this description will be expanded using example traces before being used in prompts.

None
sample_size int | None

Optional number of rows to sample from the dataset before processing. If None, uses the entire dataset. For single_model method with balanced datasets (each prompt answered by all models), automatically samples prompts evenly across models. Otherwise falls back to row-level sampling.

None
model_a str | None

For side_by_side method with tidy data, specifies first model to select

None
model_b str | None

For side_by_side method with tidy data, specifies second model to select

None
score_columns List[str] | None

Optional list of column names containing score metrics. Instead of providing scores as a dictionary in a 'score' column, you can specify separate columns for each metric. For single_model: columns should be named like 'accuracy', 'helpfulness'. For side_by_side: columns should be named like 'accuracy_a', 'accuracy_b', 'helpfulness_a', 'helpfulness_b'. If provided, these columns will be converted to the expected score dict format.

None
prompt_column str

Name of the prompt column in your dataframe (default: "prompt")

'prompt'
model_column str | None

Name of the model column for single_model (default: "model")

None
model_response_column str | None

Name of the model response column for single_model (default: "model_response")

None
question_id_column str | None

Name of the question_id column (default: "question_id" if column exists)

None
model_a_column str | None

Name of the model_a column for side_by_side (default: "model_a")

None
model_b_column str | None

Name of the model_b column for side_by_side (default: "model_b")

None
model_a_response_column str | None

Name of the model_a_response column for side_by_side (default: "model_a_response")

None
model_b_response_column str | None

Name of the model_b_response column for side_by_side (default: "model_b_response")

None
model_name str

LLM model for property extraction

'gpt-4.1'
temperature float

Temperature for LLM

0.7
top_p float

Top-p for LLM

0.95
max_tokens int

Max tokens for LLM

16000
max_workers int

Max parallel workers for API calls

DEFAULT_MAX_WORKERS
prompt_expansion bool

If True, expand task_description using example traces before extraction (default: False)

False
expansion_num_traces int

Number of traces to sample for expansion (default: 5)

5
expansion_model str

LLM model to use for expansion (default: "gpt-4.1")

'gpt-4.1'
clusterer Union[str, Any]

Clustering method ("hdbscan", "hdbscan_native") or PipelineStage

'hdbscan'
min_cluster_size int | None

Minimum cluster size

5
embedding_model str

Embedding model ("openai" or sentence-transformer model)

'text-embedding-3-large'
assign_outliers bool

Whether to assign outliers to nearest clusters

False
summary_model str

LLM model for generating cluster summaries (default: "gpt-4.1")

'gpt-4.1'
cluster_assignment_model str

LLM model for assigning outliers to clusters (default: "gpt-4.1-mini")

'gpt-4.1-mini'
metrics_kwargs Dict[str, Any | None] | None

Additional metrics configuration

None
use_wandb bool

Whether to log to Weights & Biases

True
wandb_project str | None

W&B project name

None
include_embeddings bool

Whether to include embeddings in output

False
verbose bool

Whether to print progress

False
output_dir str | None

Directory to save results (optional). If provided, saves: - clustered_results.parquet: DataFrame with all results - full_dataset.json: Complete PropertyDataset (JSON format) - full_dataset.parquet: Complete PropertyDataset (parquet format) - model_stats.json: Model statistics and rankings - summary.txt: Human-readable summary

None
custom_pipeline Pipeline | None

Custom pipeline to use instead of default

None
**kwargs Any

Additional configuration options

{}

Returns:

Type Description
DataFrame

Tuple of (clustered_df, model_stats)

Dict[str, DataFrame]
  • clustered_df: Original DataFrame with added property and cluster columns
Tuple[DataFrame, Dict[str, DataFrame]]
  • model_stats: Dictionary containing three DataFrames:
  • "model_cluster_scores": Per model-cluster metrics (size, proportion, quality, etc.)
  • "cluster_scores": Per cluster aggregated metrics (across all models)
  • "model_scores": Per model aggregated metrics (across all clusters)
Notes on input format
  • For method="single_model": expect columns [question_id, prompt, model, model_response, (optional) score]
  • For method="side_by_side": expect columns [question_id, prompt, model_a, model_b, model_a_response, model_b_response]
  • Alternatively, for method="side_by_side" you may pass tidy single-model-like data (columns [prompt, model, model_response] and optionally question_id) and specify model_a and model_b parameters. The function will select these two models and convert the input to the expected side-by-side schema.
Example

import pandas as pd from stringsight import explain

Load your conversation data

df = pd.read_csv("conversations.csv")

Explain model behavior and save results

clustered_df, model_stats = explain( ... df, ... method="side_by_side", ... min_cluster_size=5, ... output_dir="results/" # Automatically saves results ... )

Explore the results

print(clustered_df.columns) print(model_stats.keys())

Source code in stringsight/public.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
def explain(
    df: pd.DataFrame,
    method: str = "single_model",
    system_prompt: str | None = None,
    prompt_builder: Callable[[pd.Series, str], str] | None = None,
    task_description: str | None = None,
    *,
    # Data preparation
    sample_size: int | None = None,
    model_a: str | None = None,
    model_b: str | None = None,
    score_columns: List[str] | None = None,
    # Column mapping parameters
    prompt_column: str = "prompt",
    model_column: str | None = None,
    model_response_column: str | None = None,
    question_id_column: str | None = None,
    model_a_column: str | None = None,
    model_b_column: str | None = None,
    model_a_response_column: str | None = None,
    model_b_response_column: str | None = None,
    # Extraction parameters
    model_name: str = "gpt-4.1",
    temperature: float = 0.7,
    top_p: float = 0.95,
    max_tokens: int = 16000,
    max_workers: int = DEFAULT_MAX_WORKERS,
    include_scores_in_prompt: bool = False,
    # Prompt expansion parameters
    prompt_expansion: bool = False,
    expansion_num_traces: int = 5,
    expansion_model: str = "gpt-4.1",
    # Clustering parameters
    clusterer: Union[str, Any] = "hdbscan",
    min_cluster_size: int | None = 5,
    embedding_model: str = "text-embedding-3-large",
    prettify_labels: bool = False,
    assign_outliers: bool = False,
    summary_model: str = "gpt-4.1",
    cluster_assignment_model: str = "gpt-4.1-mini",
    # Metrics parameters
    metrics_kwargs: Dict[str, Any | None] | None = None,
    # Caching & logging
    use_wandb: bool = True,
    wandb_project: str | None = None,
    include_embeddings: bool = False,
    verbose: bool = False,
    # Output parameters
    output_dir: str | None = None,
    # Pipeline configuration
    custom_pipeline: Pipeline | None = None,
    # Cache configuration
    extraction_cache_dir: str | None = None,
    clustering_cache_dir: str | None = None,
    metrics_cache_dir: str | None = None,
    progress_callback: Callable[[float], None] | None = None,
    **kwargs: Any
) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
    """
    Explain model behavior patterns from conversation data.

    This is the main entry point for StringSight. It takes a DataFrame of conversations
    and returns the same data with extracted properties and clusters.

    Args:
        df: DataFrame with conversation data
        method: "side_by_side" or "single_model"
        system_prompt: System prompt for property extraction (if None, will be auto-determined)
        prompt_builder: Optional custom prompt builder function
        task_description: Optional description of the task; when provided with
            method="single_model" and no explicit system_prompt, a task-aware
            system prompt is constructed from single_model_system_prompt_custom.
            If prompt_expansion=True, this description will be expanded using
            example traces before being used in prompts.

        # Data preparation
        sample_size: Optional number of rows to sample from the dataset before processing.
                    If None, uses the entire dataset. For single_model method with balanced
                    datasets (each prompt answered by all models), automatically samples prompts
                    evenly across models. Otherwise falls back to row-level sampling.
        model_a: For side_by_side method with tidy data, specifies first model to select
        model_b: For side_by_side method with tidy data, specifies second model to select
        score_columns: Optional list of column names containing score metrics. Instead of
                    providing scores as a dictionary in a 'score' column, you can specify
                    separate columns for each metric. For single_model: columns should be
                    named like 'accuracy', 'helpfulness'. For side_by_side: columns should
                    be named like 'accuracy_a', 'accuracy_b', 'helpfulness_a', 'helpfulness_b'.
                    If provided, these columns will be converted to the expected score dict format.

        # Column mapping parameters
        prompt_column: Name of the prompt column in your dataframe (default: "prompt")
        model_column: Name of the model column for single_model (default: "model")
        model_response_column: Name of the model response column for single_model (default: "model_response")
        question_id_column: Name of the question_id column (default: "question_id" if column exists)
        model_a_column: Name of the model_a column for side_by_side (default: "model_a")
        model_b_column: Name of the model_b column for side_by_side (default: "model_b")
        model_a_response_column: Name of the model_a_response column for side_by_side (default: "model_a_response")
        model_b_response_column: Name of the model_b_response column for side_by_side (default: "model_b_response")

        # Extraction parameters
        model_name: LLM model for property extraction
        temperature: Temperature for LLM
        top_p: Top-p for LLM
        max_tokens: Max tokens for LLM
        max_workers: Max parallel workers for API calls

        # Prompt expansion parameters
        prompt_expansion: If True, expand task_description using example traces
            before extraction (default: False)
        expansion_num_traces: Number of traces to sample for expansion (default: 5)
        expansion_model: LLM model to use for expansion (default: "gpt-4.1")

        # Clustering parameters
        clusterer: Clustering method ("hdbscan", "hdbscan_native") or PipelineStage
        min_cluster_size: Minimum cluster size
        embedding_model: Embedding model ("openai" or sentence-transformer model)
        assign_outliers: Whether to assign outliers to nearest clusters
        summary_model: LLM model for generating cluster summaries (default: "gpt-4.1")
        cluster_assignment_model: LLM model for assigning outliers to clusters (default: "gpt-4.1-mini")

        # Metrics parameters
        metrics_kwargs: Additional metrics configuration

        # Caching & logging
        use_wandb: Whether to log to Weights & Biases
        wandb_project: W&B project name
        include_embeddings: Whether to include embeddings in output
        verbose: Whether to print progress

        # Output parameters
        output_dir: Directory to save results (optional). If provided, saves:
                   - clustered_results.parquet: DataFrame with all results
                   - full_dataset.json: Complete PropertyDataset (JSON format)
                   - full_dataset.parquet: Complete PropertyDataset (parquet format)
                   - model_stats.json: Model statistics and rankings
                   - summary.txt: Human-readable summary

        # Pipeline configuration
        custom_pipeline: Custom pipeline to use instead of default
        **kwargs: Additional configuration options

    Returns:
        Tuple of (clustered_df, model_stats)
        - clustered_df: Original DataFrame with added property and cluster columns
        - model_stats: Dictionary containing three DataFrames:
            - "model_cluster_scores": Per model-cluster metrics (size, proportion, quality, etc.)
            - "cluster_scores": Per cluster aggregated metrics (across all models)
            - "model_scores": Per model aggregated metrics (across all clusters)

    Notes on input format:
        - For method="single_model": expect columns [question_id, prompt, model, model_response, (optional) score]
        - For method="side_by_side": expect columns [question_id, prompt, model_a, model_b, model_a_response, model_b_response]
        - Alternatively, for method="side_by_side" you may pass tidy single-model-like data
          (columns [prompt, model, model_response] and optionally question_id) and specify
          `model_a` and `model_b` parameters. The function will select these two
          models and convert the input to the expected side-by-side schema.

    Example:
        >>> import pandas as pd
        >>> from stringsight import explain
        >>> 
        >>> # Load your conversation data
        >>> df = pd.read_csv("conversations.csv")
        >>> 
        >>> # Explain model behavior and save results
        >>> clustered_df, model_stats = explain(
        ...     df,
        ...     method="side_by_side",
        ...     min_cluster_size=5,
        ...     output_dir="results/"  # Automatically saves results
        ... )
        >>> 
        >>> # Explore the results
        >>> print(clustered_df.columns)
        >>> print(model_stats.keys())
    """

    # Validate OpenAI API key is set if using GPT models
    validate_openai_api_key(
        model_name=model_name,
        embedding_model=embedding_model,
        **kwargs
    )

    # Preprocess data: handle score_columns, sampling, tidy→side_by_side conversion, column mapping
    from .core.preprocessing import validate_and_prepare_dataframe
    df = validate_and_prepare_dataframe(
        df,
        method=method,
        score_columns=score_columns,
        sample_size=sample_size,
        model_a=model_a,
        model_b=model_b,
        prompt_column=prompt_column,
        model_column=model_column,
        model_response_column=model_response_column,
        question_id_column=question_id_column,
        model_a_column=model_a_column,
        model_b_column=model_b_column,
        model_a_response_column=model_a_response_column,
        model_b_response_column=model_b_response_column,
        verbose=verbose,
    )

    # Prompt expansion: if enabled, expand task_description using example traces
    if prompt_expansion and task_description:
        from .prompts.expansion.trace_based import expand_task_description
        from .formatters.traces import format_single_trace_from_row, format_side_by_side_trace_from_row

        if verbose:
            logger.info("Expanding task description using example traces...")

        # Convert dataframe rows to traces
        traces = []
        for idx, row in df.iterrows():
            if method == "single_model":
                trace = format_single_trace_from_row(row)
            else:  # side_by_side
                trace = format_side_by_side_trace_from_row(row)
            traces.append(trace)

        # Expand task description
        expanded_description = expand_task_description(
            task_description=task_description,
            traces=traces,
            model=expansion_model,
            num_traces=expansion_num_traces,
        )

        if verbose:
            logger.info(f"Original task description length: {len(task_description)}")
            logger.info(f"Expanded task description length: {len(expanded_description)}")

        # Use expanded description
        task_description = expanded_description

    # Auto-determine/resolve system prompt with the centralized helper
    system_prompt = get_system_prompt(method, system_prompt, task_description)

    # Print the system prompt for verification
    if verbose:
        logger.info("\n" + "="*80)
        logger.info("SYSTEM PROMPT")
        logger.info("="*80)
        logger.info(system_prompt)
        logger.info("="*80 + "\n")
    if len(system_prompt) < 50:
        raise ValueError("System prompt is too short. Please provide a longer system prompt.")

    print(f"df length: {len(df)}")

    # Create PropertyDataset from input DataFrame
    dataset = PropertyDataset.from_dataframe(df, method=method)

    # Print initial dataset information
    if verbose:
        logger.info(f"\n📋 Initial dataset summary:")
        logger.info(f"   • Conversations: {len(dataset.conversations)}")
        logger.info(f"   • Models: {len(dataset.all_models)}")
        if len(dataset.all_models) <= 20:
            logger.info(f"   • Model names: {', '.join(sorted(dataset.all_models))}")
        logger.info("")

    # 2️⃣  Initialize wandb if enabled (and explicitly disable via env when off)
    # Ensure environment flag aligns with the provided setting to prevent
    # accidental logging by submodules that import wandb directly.
    import os as _os
    if not use_wandb:
        _os.environ["WANDB_DISABLED"] = "true"
    else:
        _os.environ.pop("WANDB_DISABLED", None)

    # 2️⃣  Initialize wandb if enabled
    # Create run name based on input filename if available
    if use_wandb:
        try:
            import wandb
            # import weave
            import os

            # Try to get input filename from the DataFrame or use a default
            input_filename = "unknown_dataset"
            if hasattr(df, 'name') and df.name:
                input_filename = df.name
            elif hasattr(df, '_metadata') and df._metadata and 'filename' in df._metadata:
                input_filename = df._metadata['filename']
            else:
                # Try to infer from the DataFrame source if it has a path attribute
                # This is a fallback for when we can't determine the filename
                input_filename = f"dataset_{len(df)}_rows"

            # Clean the filename for wandb (remove extension, replace spaces/special chars)
            if isinstance(input_filename, str):
                # Remove file extension and clean up the name
                input_filename = os.path.splitext(os.path.basename(input_filename))[0]
                # Replace spaces and special characters with underscores
                input_filename = input_filename.replace(' ', '_').replace('-', '_')
                # Remove any remaining special characters
                import re
                input_filename = re.sub(r'[^a-zA-Z0-9_]', '', input_filename)

            wandb_run_name = os.path.basename(os.path.normpath(output_dir)) if output_dir else f"{input_filename}_{method}"

            wandb.init(
                project=wandb_project or "StringSight",
                name=wandb_run_name,
                config={
                    "method": method,
                    "system_prompt": system_prompt,
                    "model_name": model_name,
                    "temperature": temperature,
                    "top_p": top_p,
                    "max_tokens": max_tokens,
                    "max_workers": max_workers,
                    "clusterer": clusterer,
                    "min_cluster_size": min_cluster_size,
                    "embedding_model": embedding_model,
                    "assign_outliers": assign_outliers,
                    "include_embeddings": include_embeddings,
                    "output_dir": output_dir,
                },
                reinit=False  # Don't reinitialize if already exists
            )
        except (ImportError, TypeError, Exception) as e:
            # wandb not installed, has corrupted package metadata, or initialization failed
            logger.warning(f"Wandb initialization failed: {e}. Disabling wandb tracking.")
            use_wandb = False
            _os.environ["WANDB_DISABLED"] = "true"

    # Use custom pipeline if provided, otherwise build default pipeline
    if custom_pipeline is not None:
        pipeline = custom_pipeline
        # Ensure the custom pipeline uses the same wandb configuration
        if hasattr(pipeline, 'use_wandb'):
            pipeline.use_wandb = use_wandb
            pipeline.wandb_project = wandb_project or "StringSight"
            if use_wandb:
                pipeline._wandb_ok = True  # Mark that wandb is already initialized
    else:
        pipeline = _build_default_pipeline(
            method=method,
            system_prompt=system_prompt,
            prompt_builder=prompt_builder,
            model_name=model_name,
            temperature=temperature,
            top_p=top_p,
            max_tokens=max_tokens,
            max_workers=max_workers,
            include_scores_in_prompt=include_scores_in_prompt,
            clusterer=clusterer,
            min_cluster_size=min_cluster_size,
            embedding_model=embedding_model,
            assign_outliers=assign_outliers,
            prettify_labels=prettify_labels,
            summary_model=summary_model,
            cluster_assignment_model=cluster_assignment_model,
            metrics_kwargs=metrics_kwargs or {},  # type: ignore[arg-type]
            use_wandb=use_wandb,
            wandb_project=wandb_project,
            include_embeddings=include_embeddings,
            verbose=verbose,
            extraction_cache_dir=extraction_cache_dir,
            clustering_cache_dir=clustering_cache_dir,
            metrics_cache_dir=metrics_cache_dir,
            output_dir=output_dir,
            **kwargs
        )

    # 4️⃣  Execute pipeline
    result_dataset = _run_pipeline_smart(pipeline, dataset, progress_callback=progress_callback)

       # Check for 0 properties before attempting to save
    if len([p for p in result_dataset.properties if p.property_description is not None]) == 0:
        raise RuntimeError(
            "\n" + "="*60 + "\n"
            "ERROR: Pipeline completed with 0 valid properties!\n"
            "="*60 + "\n"
            "This indicates that all property extraction attempts failed.\n"
            "Common causes:\n\n"
            "1. JSON PARSING FAILURES:\n"
            "   - LLM returning natural language instead of JSON\n"
            "   - Check logs above for 'Failed to parse JSON' errors\n\n"
            "2. SYSTEM PROMPT MISMATCH:\n"
            "   - Current system_prompt may not suit your data format\n"
            "   - Try a different system_prompt parameter\n\n"
            "3. API/MODEL ISSUES:\n"
            "   - OpenAI API key invalid or quota exceeded\n"
            "   - Model configuration problems\n\n"
            "Cannot save results with 0 properties.\n"
            "="*60
        )

    # Convert back to DataFrame format
    clustered_df = result_dataset.to_dataframe(type="all", method=method)
    model_stats = result_dataset.model_stats

    # Save final summary if output_dir is provided
    if output_dir is not None:
        _save_final_summary(result_dataset, clustered_df, model_stats, output_dir, verbose)

        # Also save the full dataset for backward compatibility with compute_metrics_only and other tools
        import pathlib
        import json

        output_path = pathlib.Path(output_dir)

        # Save full dataset as JSON
        full_dataset_json_path = output_path / "full_dataset.json"
        result_dataset.save(str(full_dataset_json_path))
        if verbose:
            logger.info(f"  ✓ Saved full dataset: {full_dataset_json_path}")

    # Log accumulated summary metrics from pipeline stages
    if use_wandb and hasattr(pipeline, 'log_final_summary'):
        pipeline.log_final_summary()

    # Log final results to wandb if enabled
    if use_wandb:
        try:
            import wandb
            # import weave
            _log_final_results_to_wandb(clustered_df, model_stats)
        except ImportError:
            # wandb not installed or not available
            use_wandb = False

    # Print analysis summary if verbose
    _print_analysis_summary(model_stats, max_behaviors=5)

    return clustered_df, model_stats

label()

label(df, *, taxonomy, sample_size=None, score_columns=None, prompt_column='prompt', model_column=None, model_response_column=None, question_id_column=None, model_name='gpt-4.1', temperature=0.0, top_p=1.0, max_tokens=2048, max_workers=DEFAULT_MAX_WORKERS, metrics_kwargs=None, use_wandb=True, wandb_project=None, include_embeddings=False, verbose=False, output_dir=None, extraction_cache_dir=None, metrics_cache_dir=None, **kwargs)

Run the fixed-taxonomy analysis pipeline. This is just you're run of the mill LLM-judge with a given rubric.

The user provides a dataframe with a model and its responses alone with a taxonomy.

Unlike :pyfunc:explain, this entry point does not perform clustering; each taxonomy label simply becomes its own cluster. The input df must be in single-model format (columns question_id, prompt, model, model_response, …).

Parameters:

Name Type Description Default
df DataFrame

DataFrame with single-model conversation data

required
taxonomy Dict[str, str]

Dictionary mapping label names to their descriptions

required
sample_size int | None

Optional number of rows to sample from the dataset before processing. If None, uses the entire dataset. For balanced datasets (each prompt answered by all models), automatically samples prompts evenly across models.

None
score_columns List[str] | None

Optional list of column names containing score metrics. Instead of providing scores as a dictionary in a 'score' column, you can specify separate columns for each metric (e.g., ['accuracy', 'helpfulness']). If provided, these columns will be converted to the expected score dict format.

None
prompt_column str

Name of the prompt column in your dataframe (default: "prompt")

'prompt'
model_column str | None

Name of the model column (default: "model")

None
model_response_column str | None

Name of the model response column (default: "model_response")

None
question_id_column str | None

Name of the question_id column (default: "question_id" if column exists)

None
model_name str

LLM model for property extraction (default: "gpt-4.1")

'gpt-4.1'
temperature float

Temperature for LLM (default: 0.0)

0.0
top_p float

Top-p for LLM (default: 1.0)

1.0
max_tokens int

Max tokens for LLM (default: 2048)

2048
max_workers int

Max parallel workers for API calls (default: 16)

DEFAULT_MAX_WORKERS
metrics_kwargs Dict[str, Any | None] | None

Additional metrics configuration

None
use_wandb bool

Whether to log to Weights & Biases (default: True)

True
wandb_project str | None

W&B project name

None
include_embeddings bool

Whether to include embeddings in output (default: True)

False
verbose bool

Whether to print progress (default: True)

False
output_dir str | None

Directory to save results (optional)

None
extraction_cache_dir str | None

Cache directory for extraction results

None
metrics_cache_dir str | None

Cache directory for metrics results

None
**kwargs Any

Additional configuration options

{}

Returns:

Type Description
DataFrame

Tuple of (clustered_df, model_stats)

Dict[str, DataFrame]
  • clustered_df: Original DataFrame with added property and cluster columns
Tuple[DataFrame, Dict[str, DataFrame]]
  • model_stats: Dictionary containing three DataFrames:
  • "model_cluster_scores": Per model-cluster metrics (size, proportion, quality, etc.)
  • "cluster_scores": Per cluster aggregated metrics (across all models)
  • "model_scores": Per model aggregated metrics (across all clusters)
Source code in stringsight/public.py
def label(
    df: pd.DataFrame,
    *,
    taxonomy: Dict[str, str],
    sample_size: int | None = None,
    # Column mapping parameters
    score_columns: List[str] | None = None,
    prompt_column: str = "prompt",
    model_column: str | None = None,
    model_response_column: str | None = None,
    question_id_column: str | None = None,
    model_name: str = "gpt-4.1",
    temperature: float = 0.0,
    top_p: float = 1.0,
    max_tokens: int = 2048,
    max_workers: int = DEFAULT_MAX_WORKERS,
    metrics_kwargs: Dict[str, Any | None] | None = None,
    use_wandb: bool = True,
    wandb_project: str | None = None,
    include_embeddings: bool = False,
    verbose: bool = False,
    output_dir: str | None = None,
    extraction_cache_dir: str | None = None,
    metrics_cache_dir: str | None = None,
    **kwargs: Any,
) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
    """Run the *fixed-taxonomy* analysis pipeline. This is just you're run of the mill LLM-judge with a given rubric. 

    The user provides a dataframe with a model and its responses alone with a taxonomy.

    Unlike :pyfunc:`explain`, this entry point does **not** perform clustering;
    each taxonomy label simply becomes its own cluster.  The input `df` **must**
    be in *single-model* format (columns `question_id`, `prompt`, `model`, `model_response`, …).

    Args:
        df: DataFrame with single-model conversation data
        taxonomy: Dictionary mapping label names to their descriptions
        sample_size: Optional number of rows to sample from the dataset before processing.
                    If None, uses the entire dataset. For balanced datasets (each prompt answered
                    by all models), automatically samples prompts evenly across models.
        score_columns: Optional list of column names containing score metrics. Instead of
                    providing scores as a dictionary in a 'score' column, you can specify
                    separate columns for each metric (e.g., ['accuracy', 'helpfulness']).
                    If provided, these columns will be converted to the expected score dict format.
        prompt_column: Name of the prompt column in your dataframe (default: "prompt")
        model_column: Name of the model column (default: "model")
        model_response_column: Name of the model response column (default: "model_response")
        question_id_column: Name of the question_id column (default: "question_id" if column exists)
        model_name: LLM model for property extraction (default: "gpt-4.1")
        temperature: Temperature for LLM (default: 0.0)
        top_p: Top-p for LLM (default: 1.0)
        max_tokens: Max tokens for LLM (default: 2048)
        max_workers: Max parallel workers for API calls (default: 16)
        metrics_kwargs: Additional metrics configuration
        use_wandb: Whether to log to Weights & Biases (default: True)
        wandb_project: W&B project name
        include_embeddings: Whether to include embeddings in output (default: True)
        verbose: Whether to print progress (default: True)
        output_dir: Directory to save results (optional)
        extraction_cache_dir: Cache directory for extraction results
        metrics_cache_dir: Cache directory for metrics results
        **kwargs: Additional configuration options

    Returns:
        Tuple of (clustered_df, model_stats)
        - clustered_df: Original DataFrame with added property and cluster columns
        - model_stats: Dictionary containing three DataFrames:
            - "model_cluster_scores": Per model-cluster metrics (size, proportion, quality, etc.)
            - "cluster_scores": Per cluster aggregated metrics (across all models)
            - "model_scores": Per model aggregated metrics (across all clusters)
    """
    t0 = time.perf_counter()
    timings = {}

    method = "single_model"  # hard-coded, we only support single-model here

    # Align environment with wandb toggle early to avoid accidental logging on import
    import os as _os
    if not use_wandb:
        _os.environ["WANDB_DISABLED"] = "true"
    else:
        _os.environ.pop("WANDB_DISABLED", None)
    if "model_b" in df.columns:
        raise ValueError("label() currently supports only single-model data.  Use explain() for side-by-side analyses.")

    # Preprocess data: handle score_columns, sampling, and column mapping
    # For label() mode, use row-level sampling to get exact sample_size
    from .core.preprocessing import validate_and_prepare_dataframe
    df = validate_and_prepare_dataframe(
        df,
        method=method,
        score_columns=score_columns,
        sample_size=sample_size,
        prompt_column=prompt_column,
        model_column=model_column,
        model_response_column=model_response_column,
        question_id_column=question_id_column,
        verbose=verbose,
        use_row_sampling=True,  # Use row-level sampling for label() to get exact count
    )

    timings['preprocessing'] = time.perf_counter() - t0
    logger.info(f"[TIMING] Preprocessing completed in {timings['preprocessing']:.3f}s")

    # ------------------------------------------------------------------
    # Create extractor first to get the system prompt
    # ------------------------------------------------------------------
    from .extractors.fixed_axes_labeler import FixedAxesLabeler

    # Create the extractor to generate the system prompt from taxonomy
    extractor = FixedAxesLabeler(
        taxonomy=taxonomy,
        model=model_name,
        temperature=temperature,
        top_p=top_p,
        max_tokens=max_tokens,
        max_workers=max_workers,
        cache_dir=extraction_cache_dir or ".cache/stringsight",
        output_dir=output_dir,
        verbose=verbose,
        use_wandb=use_wandb,
        wandb_project=wandb_project or "StringSight"
    )

    timings['extractor_init'] = time.perf_counter() - t0
    logger.info(f"[TIMING] Extractor initialization completed in {timings['extractor_init'] - timings['preprocessing']:.3f}s")

    # Print the system prompt for verification
    if verbose:
        logger.info("\n" + "="*80)
        logger.info("SYSTEM PROMPT")
        logger.info("="*80)
        logger.info(extractor.system_prompt)
        logger.info("="*80 + "\n")

    # ------------------------------------------------------------------
    # Build dataset & pipeline
    # ------------------------------------------------------------------
    dataset = PropertyDataset.from_dataframe(df, method=method)

    timings['dataset_creation'] = time.perf_counter() - t0
    logger.info(f"[TIMING] Dataset creation completed in {timings['dataset_creation'] - timings['extractor_init']:.3f}s")

    # Initialize wandb if enabled - short-circuit early to avoid expensive string operations
    if use_wandb:
        try:
            import wandb
            import os
            import re

            # Try to get input filename from the DataFrame or use a default
            input_filename = "unknown_dataset"
            if hasattr(df, 'name') and df.name:
                input_filename = df.name
            elif hasattr(df, '_metadata') and df._metadata and 'filename' in df._metadata:
                input_filename = df._metadata['filename']
            else:
                # Try to infer from the DataFrame source if it has a path attribute
                # This is a fallback for when we can't determine the filename
                input_filename = f"dataset_{len(df)}_rows"

            # Clean the filename for wandb (remove extension, replace spaces/special chars)
            if isinstance(input_filename, str):
                # Remove file extension and clean up the name
                input_filename = os.path.splitext(os.path.basename(input_filename))[0]
                # Replace spaces and special characters with underscores
                input_filename = input_filename.replace(' ', '_').replace('-', '_')
                # Remove any remaining special characters
                input_filename = re.sub(r'[^a-zA-Z0-9_]', '', input_filename)

            wandb_run_name = os.path.basename(os.path.normpath(output_dir)) if output_dir else f"{input_filename}_label"

            wandb.init(
                project=wandb_project or "StringSight",
                name=wandb_run_name,
                config={
                    "method": method,
                    "model_name": model_name,
                    "temperature": temperature,
                    "top_p": top_p,
                    "max_tokens": max_tokens,
                    "max_workers": max_workers,
                    "taxonomy_size": len(taxonomy),
                    "include_embeddings": include_embeddings,
                    "output_dir": output_dir,
                },
                reinit=False  # Don't reinitialize if already exists
            )
        except ImportError:
            # wandb not installed or not available
            use_wandb = False
    # If wandb is disabled, skip all the initialization overhead entirely

    timings['wandb_init'] = time.perf_counter() - t0
    logger.info(f"[TIMING] Wandb initialization completed in {timings['wandb_init'] - timings['dataset_creation']:.3f}s")

    pipeline = _build_fixed_axes_pipeline(
        extractor=extractor,
        taxonomy=taxonomy,
        model_name=model_name,
        temperature=temperature,
        top_p=top_p,
        max_tokens=max_tokens,
        max_workers=max_workers,
        metrics_kwargs=metrics_kwargs,
        use_wandb=use_wandb,
        wandb_project=wandb_project,
        include_embeddings=include_embeddings,
        verbose=verbose,
        output_dir=output_dir,
        extraction_cache_dir=extraction_cache_dir,
        metrics_cache_dir=metrics_cache_dir,
        **kwargs,
    )

    timings['pipeline_build'] = time.perf_counter() - t0
    logger.info(f"[TIMING] Pipeline build completed in {timings['pipeline_build'] - timings['wandb_init']:.3f}s")

    timings['setup_total'] = time.perf_counter() - t0
    logger.info(f"[TIMING] Setup total (before pipeline execution): {timings['setup_total']:.3f}s")
    logger.info(f"[TIMING] Label() setup breakdown: {timings}")

    # ------------------------------------------------------------------
    # Execute
    # ------------------------------------------------------------------
    t_pipeline_start = time.perf_counter()
    result_dataset = _run_pipeline_smart(pipeline, dataset)
    timings['pipeline_execution'] = time.perf_counter() - t_pipeline_start
    logger.info(f"[TIMING] Pipeline execution completed in {timings['pipeline_execution']:.3f}s")

    # Check for 0 properties before attempting to save
    if len([p for p in result_dataset.properties if p.property_description is not None]) == 0:
        raise RuntimeError("Label pipeline completed with 0 valid properties. Check logs for parsing errors or API issues.")

    clustered_df = result_dataset.to_dataframe(type="clusters", method=method)

    # Save final summary and full dataset if output_dir is provided (same as explain() function)
    if output_dir is not None:
        _save_final_summary(result_dataset, clustered_df, result_dataset.model_stats, output_dir, verbose)

        # Also save the full dataset for backward compatibility with compute_metrics_only and other tools
        import pathlib
        import json

        output_path = pathlib.Path(output_dir)

        # Save full dataset as JSON
        full_dataset_json_path = output_path / "full_dataset.json"
        result_dataset.save(str(full_dataset_json_path))
        if verbose:
            logger.info(f"  ✓ Saved full dataset: {full_dataset_json_path}")

    # Print analysis summary if verbose
    _print_analysis_summary(result_dataset.model_stats, max_behaviors=5)

    return clustered_df, result_dataset.model_stats

extract_properties_only()

extract_properties_only(df, *, method='single_model', system_prompt=None, task_description=None, score_columns=None, sample_size=None, model_a=None, model_b=None, prompt_column='prompt', model_column=None, model_response_column=None, question_id_column=None, model_a_column=None, model_b_column=None, model_a_response_column=None, model_b_response_column=None, model_name='gpt-4.1', temperature=0.7, top_p=0.95, max_tokens=16000, max_workers=DEFAULT_MAX_WORKERS, include_scores_in_prompt=False, use_wandb=True, wandb_project=None, verbose=False, output_dir=None, extraction_cache_dir=None, return_debug=False)

Run only the extraction → parsing → validation stages and return a PropertyDataset.

Parameters:

Name Type Description Default
df DataFrame

Input conversations dataframe (single_model or side_by_side format)

required
method str

"single_model" | "side_by_side"

'single_model'
system_prompt str | None

Explicit system prompt text or a short prompt name from stringsight.prompts

None
task_description str | None

Optional task-aware description (used only if the chosen prompt has {task_description})

None
score_columns List[str] | None

Optional list of column names containing score metrics to convert to dict format

None
sample_size int | None

Optional number of rows to sample from the dataset before processing

None
model_a str | None

For side_by_side method with tidy data, specifies first model to select

None
model_b str | None

For side_by_side method with tidy data, specifies second model to select

None
prompt_column str

Name of the prompt column in your dataframe (default: "prompt")

'prompt'
model_column str | None

Name of the model column for single_model (default: "model")

None
model_response_column str | None

Name of the model response column for single_model (default: "model_response")

None
question_id_column str | None

Name of the question_id column (default: "question_id" if column exists)

None
model_a_column str | None

Name of the model_a column for side_by_side (default: "model_a")

None
model_b_column str | None

Name of the model_b column for side_by_side (default: "model_b")

None
model_a_response_column str | None

Name of the model_a_response column for side_by_side (default: "model_a_response")

None
model_b_response_column str | None

Name of the model_b_response column for side_by_side (default: "model_b_response")

None
model_name, temperature, top_p, max_tokens, max_workers

LLM config for extraction

required
include_scores_in_prompt bool

Whether to include any provided score fields in the prompt context

False
use_wandb, wandb_project, verbose

Logging configuration

required
output_dir str | None

If provided, stages will auto-save their artefacts to this directory

None
extraction_cache_dir str | None

Optional cache directory for extractor

None

Returns:

Type Description
PropertyDataset | tuple[PropertyDataset, list[dict[str, Any]]]

PropertyDataset containing parsed Property objects (no clustering or metrics).

Source code in stringsight/public.py
def extract_properties_only(
    df: pd.DataFrame,
    *,
    method: str = "single_model",
    system_prompt: str | None = None,
    task_description: str | None = None,
    # Data preparation
    score_columns: List[str] | None = None,
    sample_size: int | None = None,
    model_a: str | None = None,
    model_b: str | None = None,
    # Column mapping parameters
    prompt_column: str = "prompt",
    model_column: str | None = None,
    model_response_column: str | None = None,
    question_id_column: str | None = None,
    model_a_column: str | None = None,
    model_b_column: str | None = None,
    model_a_response_column: str | None = None,
    model_b_response_column: str | None = None,
    # Extraction parameters
    model_name: str = "gpt-4.1",
    temperature: float = 0.7,
    top_p: float = 0.95,
    max_tokens: int = 16000,
    max_workers: int = DEFAULT_MAX_WORKERS,
    include_scores_in_prompt: bool = False,
    # Logging & output
    use_wandb: bool = True,
    wandb_project: str | None = None,
    verbose: bool = False,
    output_dir: str | None = None,
    # Caching
    extraction_cache_dir: str | None = None,
    return_debug: bool = False,
) -> PropertyDataset | tuple[PropertyDataset, list[dict[str, Any]]]:
    """Run only the extraction → parsing → validation stages and return a PropertyDataset.

    Args:
        df: Input conversations dataframe (single_model or side_by_side format)
        method: "single_model" | "side_by_side"
        system_prompt: Explicit system prompt text or a short prompt name from stringsight.prompts
        task_description: Optional task-aware description (used only if the chosen prompt has {task_description})
        score_columns: Optional list of column names containing score metrics to convert to dict format
        sample_size: Optional number of rows to sample from the dataset before processing
        model_a: For side_by_side method with tidy data, specifies first model to select
        model_b: For side_by_side method with tidy data, specifies second model to select
        prompt_column: Name of the prompt column in your dataframe (default: "prompt")
        model_column: Name of the model column for single_model (default: "model")
        model_response_column: Name of the model response column for single_model (default: "model_response")
        question_id_column: Name of the question_id column (default: "question_id" if column exists)
        model_a_column: Name of the model_a column for side_by_side (default: "model_a")
        model_b_column: Name of the model_b column for side_by_side (default: "model_b")
        model_a_response_column: Name of the model_a_response column for side_by_side (default: "model_a_response")
        model_b_response_column: Name of the model_b_response column for side_by_side (default: "model_b_response")
        model_name, temperature, top_p, max_tokens, max_workers: LLM config for extraction
        include_scores_in_prompt: Whether to include any provided score fields in the prompt context
        use_wandb, wandb_project, verbose: Logging configuration
        output_dir: If provided, stages will auto-save their artefacts to this directory
        extraction_cache_dir: Optional cache directory for extractor

    Returns:
        PropertyDataset containing parsed Property objects (no clustering or metrics).
    """
    # Validate OpenAI API key is set if using GPT models
    validate_openai_api_key(
        model_name=model_name
    )

    # Resolve system prompt using centralized resolver
    system_prompt = get_system_prompt(method, system_prompt, task_description)

    if verbose:
        logger.info("\n" + "="*80)
        logger.info("SYSTEM PROMPT")
        logger.info("="*80)
        logger.info(system_prompt)
        logger.info("="*80 + "\n")
    if len(system_prompt) < 50:
        raise ValueError("System prompt is too short. Please provide a longer system prompt.")

    # Preprocess data: handle score_columns, sampling, tidy→side_by_side conversion, column mapping
    from .core.preprocessing import validate_and_prepare_dataframe
    df = validate_and_prepare_dataframe(
        df,
        method=method,
        score_columns=score_columns,
        sample_size=sample_size,
        model_a=model_a,
        model_b=model_b,
        prompt_column=prompt_column,
        model_column=model_column,
        model_response_column=model_response_column,
        question_id_column=question_id_column,
        model_a_column=model_a_column,
        model_b_column=model_b_column,
        model_a_response_column=model_a_response_column,
        model_b_response_column=model_b_response_column,
        verbose=verbose,
    )

    # Prepare dataset
    dataset = PropertyDataset.from_dataframe(df, method=method)

    # Align env with wandb toggle early
    import os as _os
    if not use_wandb:
        _os.environ["WANDB_DISABLED"] = "true"
    else:
        _os.environ.pop("WANDB_DISABLED", None)

    # Build a minimal pipeline: extractor → parser → validator
    from .extractors import get_extractor
    from .postprocess import LLMJsonParser, PropertyValidator

    common_cfg = {"verbose": verbose, "use_wandb": use_wandb, "wandb_project": wandb_project or "StringSight"}

    extractor_kwargs = {
        "model_name": model_name,
        "system_prompt": system_prompt,
        "prompt_builder": None,
        "temperature": temperature,
        "top_p": top_p,
        "max_tokens": max_tokens,
        "max_workers": max_workers,
        "include_scores_in_prompt": include_scores_in_prompt,
        "output_dir": output_dir,
        **({"cache_dir": extraction_cache_dir} if extraction_cache_dir else {}),
        **common_cfg,
    }

    extractor = get_extractor(**extractor_kwargs)  # type: ignore[arg-type]
    # Do not fail the whole run on parsing errors – collect failures and drop those rows
    parser = LLMJsonParser(fail_fast=False, output_dir=output_dir, **common_cfg)  # type: ignore[arg-type]
    validator = PropertyValidator(output_dir=output_dir, **common_cfg)  # type: ignore[arg-type]

    pipeline = PipelineBuilder(name=f"StringSight-extract-{method}") \
        .extract_properties(extractor) \
        .parse_properties(parser) \
        .add_stage(validator) \
        .configure(output_dir=output_dir, **common_cfg) \
        .build()

    result_dataset = _run_pipeline_smart(pipeline, dataset)
    if return_debug:
        try:
            failures = parser.get_parsing_failures()
        except Exception:
            failures = []
        return result_dataset, failures
    return result_dataset

compute_metrics_only()

compute_metrics_only(input_path, method='single_model', output_dir=None, metrics_kwargs=None, use_wandb=True, verbose=False)

Run only the metrics computation stage on existing pipeline results.

This function loads existing pipeline results (from extraction and clustering stages) and runs only the metrics computation stage. Useful for: - Recomputing metrics with different parameters - Running metrics on results from previous pipeline runs - Debugging metrics computation without re-running the full pipeline

Parameters:

Name Type Description Default
input_path str

Path to existing pipeline results (file or directory)

required
method str

"single_model" or "side_by_side"

'single_model'
output_dir str | None

Directory to save metrics results (optional)

None
metrics_kwargs Dict[str, Any | None] | None

Additional arguments for metrics computation

None
use_wandb bool

Whether to enable wandb logging

True
verbose bool

Whether to print verbose output

False

Returns:

Type Description
Tuple[DataFrame, Dict[str, Any]]

Tuple of (clustered_df, model_stats)

Example

from stringsight import compute_metrics_only

Run metrics on existing pipeline results

clustered_df, model_stats = compute_metrics_only( ... input_path="results/previous_run/full_dataset.json", ... method="single_model", ... output_dir="results/metrics_only" ... )

Or run on a directory containing pipeline outputs

clustered_df, model_stats = compute_metrics_only( ... input_path="results/previous_run/", ... method="side_by_side" ... )

Source code in stringsight/public.py
def compute_metrics_only(
    input_path: str,
    method: str = "single_model",
    output_dir: str | None = None,
    metrics_kwargs: Dict[str, Any | None] | None = None,
    use_wandb: bool = True,
    verbose: bool = False
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Run only the metrics computation stage on existing pipeline results.

    This function loads existing pipeline results (from extraction and clustering stages)
    and runs only the metrics computation stage. Useful for:
    - Recomputing metrics with different parameters
    - Running metrics on results from previous pipeline runs
    - Debugging metrics computation without re-running the full pipeline

    Args:
        input_path: Path to existing pipeline results (file or directory)
        method: "single_model" or "side_by_side"
        output_dir: Directory to save metrics results (optional)
        metrics_kwargs: Additional arguments for metrics computation
        use_wandb: Whether to enable wandb logging
        verbose: Whether to print verbose output

    Returns:
        Tuple of (clustered_df, model_stats)

    Example:
        >>> from stringsight import compute_metrics_only
        >>> 
        >>> # Run metrics on existing pipeline results
        >>> clustered_df, model_stats = compute_metrics_only(
        ...     input_path="results/previous_run/full_dataset.json",
        ...     method="single_model",
        ...     output_dir="results/metrics_only"
        ... )
        >>> 
        >>> # Or run on a directory containing pipeline outputs
        >>> clustered_df, model_stats = compute_metrics_only(
        ...     input_path="results/previous_run/",
        ...     method="side_by_side"
        ... )
    """
    from pathlib import Path
    from .metrics import get_metrics
    from .pipeline import Pipeline
    import json

    # Align environment with wandb toggle early to avoid accidental logging on import
    import os as _os
    if not use_wandb:
        _os.environ["WANDB_DISABLED"] = "true"
    else:
        _os.environ.pop("WANDB_DISABLED", None)

    path = Path(input_path)

    # Load existing dataset
    if path.is_dir():
        # Try to load from a directory containing pipeline outputs
        possible_files = [
            path / "full_dataset.json",
            path / "full_dataset.parquet",
            path / "clustered_results.parquet",
            path / "dataset.json",
            path / "dataset.parquet"
        ]

        for file_path in possible_files:
            if file_path.exists():
                if verbose:
                    logger.info(f"Loading from: {file_path}")
                dataset = PropertyDataset.load(str(file_path))
                break
        else:
            raise FileNotFoundError(f"No recognizable dataset file found in {path}")

    elif path.is_file():
        # Load from a specific file
        if verbose:
            logger.info(f"Loading from: {path}")
        dataset = PropertyDataset.load(str(path))

    else:
        raise FileNotFoundError(f"Input path does not exist: {path}")

    # Verify we have the required data for metrics
    if not dataset.clusters:
        raise ValueError("No clusters found in the dataset. Metrics computation requires clustered data.")

    if not dataset.properties:
        raise ValueError("No properties found in the dataset. Metrics computation requires extracted properties.")

    if verbose:
        logger.info(f"Loaded dataset with:")
        logger.info(f"  - {len(dataset.conversations)} conversations")
        logger.info(f"  - {len(dataset.properties)} properties")
        logger.info(f"  - {len(dataset.clusters)} clusters")
        logger.info(f"  - Models: {dataset.all_models}")

        # Count unique models from conversations for verification
        unique_models = set()
        for conv in dataset.conversations:
            if isinstance(conv.model, list):
                unique_models.update(conv.model)
            else:
                unique_models.add(conv.model)

        logger.info(f"  - Total unique models: {len(unique_models)}")
        if len(unique_models) <= 20:
            model_list = sorted(list(unique_models))
            logger.info(f"  - Model names: {', '.join(model_list)}")
        logger.info("")

    # Create metrics stage
    metrics_config = {
        'method': method,
        'use_wandb': use_wandb,
        'verbose': verbose,
        **(metrics_kwargs or {})
    }

    # Add output directory if provided
    if output_dir:
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        metrics_config['output_dir'] = str(output_path)

    # Initialize wandb if enabled
    if use_wandb:
        try:
            import wandb
            # import weave
            import os

            # Try to get input filename from the input path
            input_filename = "unknown_dataset"
            if path.is_file():
                input_filename = path.name
            elif path.is_dir():
                # Try to find a recognizable dataset file in the directory
                possible_files = [
                    path / "full_dataset.json",
                    path / "full_dataset.parquet",
                    path / "clustered_results.parquet",
                    path / "dataset.json",
                    path / "dataset.parquet"
                ]

                for file_path in possible_files:
                    if file_path.exists():
                        input_filename = file_path.name
                        break
                else:
                    # If no recognizable file found, use the directory name
                    input_filename = path.name

            # Clean the filename for wandb (remove extension, replace spaces/special chars)
            if isinstance(input_filename, str):
                # Remove file extension and clean up the name
                input_filename = os.path.splitext(os.path.basename(input_filename))[0]
                # Replace spaces and special characters with underscores
                input_filename = input_filename.replace(' ', '_').replace('-', '_')
                # Remove any remaining special characters
                import re
                input_filename = re.sub(r'[^a-zA-Z0-9_]', '', input_filename)

            wandb_run_name = os.path.basename(os.path.normpath(output_dir)) if output_dir else f"{input_filename}_metrics_only"

            wandb.init(
                project="StringSight",
                name=wandb_run_name,
                config={
                    "method": method,
                    "input_path": str(path),
                    "output_dir": output_dir,
                    "metrics_kwargs": metrics_kwargs,
                },
                reinit=False  # Don't reinitialize if already exists
            )
        except ImportError:
            # wandb not installed or not available
            use_wandb = False

    metrics_stage = get_metrics(method, **{k: v for k, v in metrics_config.items() if k != 'method'})

    # Create a minimal pipeline with just the metrics stage
    pipeline = Pipeline("Metrics-Only", [metrics_stage])

    # Run metrics computation
    if verbose:
        logger.info("\n" + "="*60)
        logger.info("COMPUTING METRICS")
        logger.info("="*60)

    result_dataset = _run_pipeline_smart(pipeline, dataset)

    # Convert back to DataFrame format
    clustered_df = result_dataset.to_dataframe()
    model_stats = result_dataset.model_stats

    # Save results if output_dir is provided
    if output_dir:
        if verbose:
            logger.info(f"\nSaving results to: {output_dir}")

        # Use the same saving mechanism as the full pipeline
        _save_final_summary(
            result_dataset=result_dataset,
            clustered_df=clustered_df,
            model_stats=model_stats,
            output_dir=output_dir,
            verbose=verbose
        )

        # Print summary
        logger.info(f"\n📊 Metrics Summary:")
        logger.info(f"  - Models analyzed: {len(model_stats)}")

        # Handle new functional metrics format
        if model_stats and "functional_metrics" in model_stats:
            functional_metrics = model_stats["functional_metrics"]
            model_scores = functional_metrics.get("model_scores", {})
            cluster_scores = functional_metrics.get("cluster_scores", {})

            logger.info(f"  - Functional metrics computed:")
            logger.info(f"    • Model scores: {len(model_scores)} models")
            logger.info(f"    • Cluster scores: {len(cluster_scores)} clusters")

            # Print model-level summary
            for model_name, model_data in model_scores.items():
                if isinstance(model_data, dict):
                    size = model_data.get("size", 0)
                    quality = model_data.get("quality", {})
                    logger.info(f"    • {model_name}: {size} conversations")
                    if quality:
                        for metric_name, metric_value in quality.items():
                            if isinstance(metric_value, (int, float)):
                                logger.info(f"      - {metric_name}: {metric_value:.3f}")

        # Handle legacy format for backward compatibility
        else:
            for model_name, stats in model_stats.items():
                if "fine" in stats:
                    logger.info(f"  - {model_name}: {len(stats['fine'])} fine clusters")
                if "coarse" in stats:
                    logger.info(f"    {len(stats['coarse'])} coarse clusters")

    return clustered_df, model_stats 

Convenience Functions

explain_side_by_side()

explain_side_by_side(df, system_prompt=None, tidy_side_by_side_models=None, **kwargs)

Convenience function for side-by-side model comparison.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with columns: model_a, model_b, model_a_response, model_b_response, winner

required
system_prompt str | None

System prompt for extraction (if None, will be auto-determined)

None
**kwargs Any

Additional arguments passed to explain()

{}

Returns:

Type Description
Tuple[DataFrame, Dict[str, DataFrame]]

Tuple of (clustered_df, model_stats)

Source code in stringsight/public.py
def explain_side_by_side(
    df: pd.DataFrame,
    system_prompt: str | None = None,
    tidy_side_by_side_models: Tuple[str, str] | None = None,
    **kwargs: Any
) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
    """
    Convenience function for side-by-side model comparison.

    Args:
        df: DataFrame with columns: model_a, model_b, model_a_response, model_b_response, winner
        system_prompt: System prompt for extraction (if None, will be auto-determined)
        **kwargs: Additional arguments passed to explain()

    Returns:
        Tuple of (clustered_df, model_stats)
    """
    return explain(
        df,
        method="side_by_side",
        system_prompt=system_prompt,
        tidy_side_by_side_models=tidy_side_by_side_models,
        **kwargs,
    )

explain_single_model()

explain_single_model(df, system_prompt=None, **kwargs)

Convenience function for single model analysis.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with columns: model, model_response, score

required
system_prompt str | None

System prompt for extraction (if None, will be auto-determined)

None
**kwargs Any

Additional arguments passed to explain()

{}

Returns:

Type Description
Tuple[DataFrame, Dict[str, DataFrame]]

Tuple of (clustered_df, model_stats)

Source code in stringsight/public.py
def explain_single_model(
    df: pd.DataFrame,
    system_prompt: str | None = None,
    **kwargs: Any
) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
    """
    Convenience function for single model analysis.

    Args:
        df: DataFrame with columns: model, model_response, score
        system_prompt: System prompt for extraction (if None, will be auto-determined)
        **kwargs: Additional arguments passed to explain()

    Returns:
        Tuple of (clustered_df, model_stats)
    """
    return explain(df, method="single_model", system_prompt=system_prompt, **kwargs)

explain_with_custom_pipeline()

explain_with_custom_pipeline(df, pipeline, method='single_model')

Explain model behavior using a custom pipeline.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with conversation data

required
pipeline Pipeline

Custom pipeline to use

required
method str

"side_by_side" or "single_model"

'single_model'

Returns:

Type Description
Tuple[DataFrame, Dict[str, Any]]

Tuple of (clustered_df, model_stats)

Source code in stringsight/public.py
def explain_with_custom_pipeline(
    df: pd.DataFrame,
    pipeline: Pipeline,
    method: str = "single_model"
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Explain model behavior using a custom pipeline.

    Args:
        df: DataFrame with conversation data
        pipeline: Custom pipeline to use
        method: "side_by_side" or "single_model"

    Returns:
        Tuple of (clustered_df, model_stats)
    """
    dataset = PropertyDataset.from_dataframe(df)
    result_dataset = _run_pipeline_smart(pipeline, dataset)
    return result_dataset.to_dataframe(), result_dataset.model_stats

Core Data Structures

PropertyDataset

PropertyDataset dataclass

Container for all data flowing through the pipeline.

This is the single data contract between all pipeline stages.

Source code in stringsight/core/data_objects.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
@dataclass
class PropertyDataset:
    """
    Container for all data flowing through the pipeline.

    This is the single data contract between all pipeline stages.
    """
    conversations: List[ConversationRecord] = field(default_factory=list)
    all_models: List[str] = field(default_factory=list)
    properties: List[Property] = field(default_factory=list)
    clusters: List[Cluster] = field(default_factory=list)
    model_stats: Dict[str, Any] = field(default_factory=dict)

    def __str__(self) -> str:
        """Return a readable string representation of the PropertyDataset."""
        lines = [
            "PropertyDataset:",
            f"  conversations: List[ConversationRecord] ({len(self.conversations)} items)",
            f"  all_models: List[str] ({len(self.all_models)} items) - {self.all_models}",
            f"  properties: List[Property] ({len(self.properties)} items)",
            f"  clusters: List[Cluster] ({len(self.clusters)} items)",
            f"  model_stats: Dict[str, Any] ({len(self.model_stats)} entries)"
        ]

        return "\n".join(lines)

    @classmethod
    def from_dataframe(cls, df: pd.DataFrame, method: str = "single_model") -> "PropertyDataset":
        """
        Create PropertyDataset from existing DataFrame formats.

        Args:
            df: Input DataFrame with conversation data
            method: "side_by_side" for comparison data, "single_model" for single responses

        Returns:
            PropertyDataset with populated conversations
        """
        conversations: list[dict[str, Any]] = []
        if method == "side_by_side":
            all_models = list(set(df["model_a"].unique().tolist() + df["model_b"].unique().tolist()))
            # Expected columns: question_id, prompt, model_a, model_b,
            # model_a_response, model_b_response, scores_a, scores_b, winner, etc.

            # Convert to list of dicts once - MUCH faster than iterrows()
            rows_list = df.to_dict('records')

            # Parallelize OAI conversions for better performance
            def _process_side_by_side_row(idx_row):
                idx, row = idx_row
                prompt = str(row.get('prompt', row.get('user_prompt', '')))
                model_a_response = row.get('model_a_response', '')
                model_b_response = row.get('model_b_response', '')

                # Convert responses to OAI format if they're strings
                oai_response_a, was_converted_a = check_and_convert_to_oai_format(prompt, model_a_response)
                oai_response_b, was_converted_b = check_and_convert_to_oai_format(prompt, model_b_response)

                return idx, oai_response_a, oai_response_b, row

            # Pre-allocate results list
            oai_results = [None] * len(rows_list)

            # Process conversions in parallel
            with ThreadPoolExecutor(max_workers=min(DEFAULT_MAX_WORKERS, len(rows_list))) as executor:
                futures = {executor.submit(_process_side_by_side_row, (idx, row)): idx
                          for idx, row in enumerate(rows_list)}
                for future in as_completed(futures):
                    idx, oai_response_a, oai_response_b, row = future.result()
                    oai_results[idx] = (oai_response_a, oai_response_b, row)

            # Now build conversations with pre-converted OAI responses
            for idx, result in enumerate(oai_results):
                if result is None:
                    continue
                oai_response_a, oai_response_b, row = result
                prompt = str(row.get('prompt', row.get('user_prompt', '')))

                # Convert score formats to list format [scores_a, scores_b]
                def parse_score_field(score_value):
                    """Parse score field that might be a string, dict, or other type."""
                    if isinstance(score_value, dict):
                        return score_value
                    elif isinstance(score_value, str) and score_value.strip():
                        try:
                            import ast
                            parsed = ast.literal_eval(score_value.strip())
                            return parsed if isinstance(parsed, dict) else {}
                        except (ValueError, SyntaxError):
                            return {}
                    else:
                        return {}

                if 'score_a' in row and 'score_b' in row:
                    # Format: score_a, score_b columns
                    scores_a = parse_score_field(row.get('score_a', {}))
                    scores_b = parse_score_field(row.get('score_b', {}))
                else:
                    # No score data found
                    scores_a, scores_b = {}, {}

                scores = [scores_a, scores_b]

                # Store winner and other metadata
                meta_with_winner = {k: v for k, v in row.items() 
                                  if k not in ['question_id', 'prompt', 'user_prompt', 'model_a', 'model_b', 
                                             'model_a_response', 'model_b_response', 'score', 'score_a', 'score_b']}

                # Add winner to meta if present
                winner = row.get('winner')
                if winner is None and isinstance(row.get('score'), dict):
                    # Fallback to looking in score dict
                    winner = row.get('score').get('winner')

                if winner is not None:
                    meta_with_winner['winner'] = winner

                # Use question_id column if present and not None, else fall back to row index
                qid = row.get('question_id')
                if qid is None:
                    qid = idx
                conversation = ConversationRecord(
                    question_id=str(qid),
                    prompt=prompt,
                    model=[row.get('model_a', 'model_a'), row.get('model_b', 'model_b')],
                    responses=[oai_response_a, oai_response_b],
                    scores=scores,
                    meta=meta_with_winner
                )
                conversations.append(conversation)

        elif method == "single_model":
            all_models = df["model"].unique().tolist()
            # Expected columns: question_id, prompt, model, model_response, score, etc.

            def parse_single_score_field(score_value):
                """Parse single model score field that might be a string, dict, number, or other type."""
                if isinstance(score_value, dict):
                    return score_value
                elif isinstance(score_value, (int, float)):
                    return {'score': score_value}
                elif isinstance(score_value, str) and score_value.strip():
                    try:
                        import ast
                        parsed = ast.literal_eval(score_value.strip())
                        if isinstance(parsed, dict):
                            return parsed
                        elif isinstance(parsed, (int, float)):
                            return {'score': parsed}
                        else:
                            return {'score': 0}
                    except (ValueError, SyntaxError):
                        return {'score': 0}
                else:
                    return {'score': 0}

            # Convert to list of dicts once - MUCH faster than iterrows()
            rows_list = df.to_dict('records')

            # Parallelize OAI conversions for better performance
            def _process_single_model_row(idx_row):
                idx, row = idx_row
                prompt = str(row.get('prompt', row.get('user_prompt', '')))
                response = row.get('model_response', '')

                # Convert response to OAI format if it's a string
                oai_response, was_converted = check_and_convert_to_oai_format(prompt, response)

                return idx, oai_response, row

            # Pre-allocate results list
            oai_results = [None] * len(rows_list)

            # Process conversions in parallel
            with ThreadPoolExecutor(max_workers=min(DEFAULT_MAX_WORKERS, len(rows_list))) as executor:
                futures = {executor.submit(_process_single_model_row, (idx, row)): idx
                          for idx, row in enumerate(rows_list)}
                for future in as_completed(futures):
                    idx, oai_response, row = future.result()
                    oai_results[idx] = (oai_response, row)

            # Now build conversations with pre-converted OAI responses
            for idx, result in enumerate(oai_results):
                if result is None:
                    continue
                oai_response, row = result
                scores = parse_single_score_field(row.get('score'))
                prompt = str(row.get('prompt', row.get('user_prompt', '')))

                # Use question_id column if present and not None, else fall back to row index
                qid = row.get('question_id')
                if qid is None:
                    qid = idx
                conversation = ConversationRecord(
                    question_id=str(qid),
                    prompt=prompt,
                    model=str(row.get('model', 'model')),
                    responses=oai_response,
                    scores=scores,
                    meta={k: v for k, v in row.items()
                          if k not in ['question_id', 'prompt', 'user_prompt', 'model', 'model_response', 'score']}
                )
                conversations.append(conversation)
        else:
            raise ValueError(f"Unknown method: {method}. Must be 'side_by_side' or 'single_model'")

        # Convert dict conversations to ConversationRecord objects
        conversation_records = [
            ConversationRecord(**conv) if isinstance(conv, dict) else conv
            for conv in conversations
        ]
        return cls(conversations=conversation_records, all_models=all_models)

    def to_dataframe(self, type: str = "all", method: str = "side_by_side") -> pd.DataFrame:
        """
        Convert PropertyDataset back to DataFrame format.

        Returns:
            DataFrame with original data plus extracted properties and clusters
        """

        assert type in ["base", "properties", "clusters", "all"], f"Invalid type: {type}. Must be 'all' or 'base'"
        # Start with conversation data
        rows = []
        for conv in self.conversations:
            if isinstance(conv.model, str):
                base_row = {
                    'question_id': conv.question_id,
                    'prompt': conv.prompt,
                    'model': conv.model,
                    'model_response': conv.responses,
                    'score': conv.scores,
                    **conv.meta
                }
            elif isinstance(conv.model, list):
                # Side-by-side format: scores stored as [scores_a, scores_b]
                if isinstance(conv.scores, list) and len(conv.scores) == 2:
                    scores_a, scores_b = conv.scores[0], conv.scores[1]
                else:
                    # Fallback if scores isn't properly formatted
                    scores_a, scores_b = {}, {}

                base_row = {
                    'question_id': conv.question_id,
                    'prompt': conv.prompt,
                    'model_a': conv.model[0],
                    'model_b': conv.model[1],
                    'model_a_response': conv.responses[0],
                    'model_b_response': conv.responses[1],
                    'score_a': scores_a,
                    'score_b': scores_b,
                    'winner': conv.meta.get('winner'),  # Winner stored in meta
                    **{k: v for k, v in conv.meta.items() if k != 'winner'}  # Exclude winner from other meta
                }
            else:
                raise ValueError(f"Invalid model type: {type(conv.model)}. Must be str or list.")

            rows.append(base_row)

        df = pd.DataFrame(rows)
        # Ensure question_id is a string
        if not df.empty and "question_id" in df.columns:
            df["question_id"] = df["question_id"].astype(str)
        logger.debug(f"Original unique questions: {df.question_id.nunique()}")

        # Add properties if they exist
        if self.properties and type in ["all", "properties", "clusters"]:
            # Create a mapping from (question_id, model) to properties
            prop_map: Dict[tuple, List[Property]] = {}
            for prop in self.properties:
                key = (prop.question_id, prop.model)
                if key not in prop_map:
                    prop_map[key] = []
                prop_map[key].append(prop)

            # create property df
            prop_df = pd.DataFrame([p.to_dict() for p in self.properties])
            # Ensure question_id is a string in properties df
            if not prop_df.empty and "question_id" in prop_df.columns:
                prop_df["question_id"] = prop_df["question_id"].astype(str)
            logger.debug(f"len of base df {len(df)}")
            if "model_a" in df.columns and "model_b" in df.columns:
                # For side-by-side inputs, merge properties by question_id (both models share the question)
                df = df.merge(prop_df, on=["question_id"], how="left")

                # Handle id collision (id_x=conversation, id_y=property)
                if "id_y" in df.columns:
                    df["property_id"] = df["id_y"]
                    df["id"] = df["id_y"] # Ensure 'id' is property_id for downstream
                elif "id" in df.columns and "property_id" not in df.columns:
                    df["property_id"] = df["id"]

                # Deduplicate by property id when available
                if "property_id" in df.columns:
                    df = df.drop_duplicates(subset="property_id")
            else:
                # CHANGE: Use left join to preserve all conversations, including those without properties
                # Don't drop duplicates to ensure conversations without properties are preserved
                df = df.merge(prop_df, on=["question_id", "model"], how="left")

                # Handle id collision
                if "id_y" in df.columns:
                    df["property_id"] = df["id_y"]
                    df["id"] = df["id_y"]
                elif "id" in df.columns and "property_id" not in df.columns:
                    df["property_id"] = df["id"]
            logger.debug(f"len of df after merge with properties {len(df)}")

            # ------------------------------------------------------------------
            # Ensure `model` column is present (avoid _x / _y duplicates)
            # ------------------------------------------------------------------
            if "model" not in df.columns:
                if "model_y" in df.columns:
                    print(f"df.model_y.value_counts(): {df.model_y.value_counts()}")
                if "model_x" in df.columns:
                    print(f"df.model_x.value_counts(): {df.model_x.value_counts()}")
                if "model_x" in df.columns or "model_y" in df.columns:
                    df["model"] = df.get("model_y").combine_first(df.get("model_x"))
                    df.drop(columns=[c for c in ["model_x", "model_y"] if c in df.columns], inplace=True)

        # Only print model value counts if the column exists
        if "model" in df.columns:
            logger.debug(f"df.model.value_counts() NEW: {df.model.value_counts()}")
        logger.debug(f"total questions: {df.question_id.nunique()}")

        if self.clusters and type in ["all", "clusters"]:
            # If cluster columns already exist (e.g. after reload from parquet)
            # skip the merge to avoid duplicate _x / _y columns.
            if "cluster_id" not in df.columns:
                cluster_df = pd.DataFrame([c.to_dict() for c in self.clusters])
                cluster_df.rename(
                    columns={
                        "id": "cluster_id",
                        "label": "cluster_label",
                        "size": "cluster_size",
                        "property_descriptions": "property_description",
                    },
                    inplace=True,
                )
                # Explode aligned list columns so each row maps to a single property
                # Explode only aligned columns to avoid mismatched element counts
                list_cols = [
                    col for col in [
                        "property_description",
                        "question_ids",
                    ] if col in cluster_df.columns
                ]
                if list_cols:
                    try:
                        cluster_df = cluster_df.explode(list_cols, ignore_index=True)
                    except (TypeError, ValueError):
                        # Fallback: explode sequentially to avoid alignment constraints
                        for col in list_cols:
                            cluster_df = cluster_df.explode(col, ignore_index=True)
                df = df.merge(cluster_df, on=["property_description"], how="left")

        # CHANGE: Handle conversations without properties by creating a "No properties" cluster
        # This ensures all conversations are considered in metrics calculation
        if type in ["all", "clusters"]:
            # Identify rows without properties (no property_description or it's NaN)
            mask_no_properties = df["property_description"].isna() | (df["property_description"].astype(str).str.strip() == "")

            # Only add the synthetic cluster if *all* rows lack a property description.
            # If at least one property exists, we skip to avoid mixing partially
            # processed conversations into a global "No properties" cluster.

            if mask_no_properties.all():
                logger.info("All conversations lack properties – creating 'No properties' cluster")

                # Fill in missing data for conversations without properties
                df.loc[mask_no_properties, "property_description"] = "No properties"
                df.loc[mask_no_properties, "cluster_id"] = -2  # Use -2 since -1 is for outliers
                df.loc[mask_no_properties, "cluster_label"] = "No properties"

                # Handle missing scores for conversations without properties
                mask_no_score = mask_no_properties & (df["score"].isna() | (df["score"] == {}))
                if mask_no_score.any():
                    df.loc[mask_no_score, "score"] = df.loc[mask_no_score, "score"].apply(lambda x: {"score": 0} if pd.isna(x) or x == {} else x)

        return df

    def add_property(self, property: Property):
        """Add a property to the dataset."""
        self.properties.append(property)
        if isinstance(property.model, str) and property.model not in self.all_models:
            self.all_models.append(property.model)
        if isinstance(property.model, list):
            for model in property.model:
                if model not in self.all_models:
                    self.all_models.append(model)

    def get_properties_for_model(self, model: str) -> List[Property]:
        """Get all properties for a specific model."""
        return [p for p in self.properties if p.model == model]

    def get_properties_for_question(self, question_id: str) -> List[Property]:
        """Get all properties for a specific question."""
        return [p for p in self.properties if p.question_id == question_id]

    def _json_safe(self, obj: Any):
        """Recursively convert *obj* into JSON-safe types (lists, dicts, ints, floats, strings, bool, None)."""
        if obj is None:
            return obj
        if isinstance(obj, str):
            return obj
        if isinstance(obj, bool):
            return obj
        if isinstance(obj, (int, float)):
            # Handle NaN and infinity values - convert to None for valid JSON
            if isinstance(obj, float) and (math.isnan(obj) or math.isinf(obj)):
                return None
            return obj
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        if isinstance(obj, np.generic):
            return obj.item()
        if isinstance(obj, (list, tuple, set)):
            return [self._json_safe(o) for o in obj]
        if isinstance(obj, dict):
            # Convert keys to strings if they're not JSON-safe
            json_safe_dict = {}
            for k, v in obj.items():
                # Convert tuple/list keys to string representation
                safe_key: str | int | float | bool | None
                if isinstance(k, (tuple, list)):
                    safe_key = str(k)
                elif isinstance(k, (str, int, float, bool)) or k is None:
                    safe_key = k
                else:
                    safe_key = str(k)
                json_safe_dict[safe_key] = self._json_safe(v)
            return json_safe_dict

        return str(obj)

    def to_serializable_dict(self) -> Dict[str, Any]:
        """Convert the whole dataset into a JSON-serialisable dict."""
        return {
            "conversations": [self._json_safe(asdict(conv)) for conv in self.conversations],
            "properties": [self._json_safe(asdict(prop)) for prop in self.properties],
            "clusters": [self._json_safe(asdict(cluster)) for cluster in self.clusters],
            "model_stats": self._json_safe(self.model_stats),
            "all_models": self.all_models,
        }

    def get_valid_properties(self) -> List[Property]:
        """Get all properties where the property model is unknown, there is no property description, or the property description is empty."""
        if self.properties:
            logger.debug(f"All models: {self.all_models}")
            logger.debug(f"Properties: {self.properties[0].model}")
            logger.debug(f"Property description: {self.properties[0].property_description}")
        return [prop for prop in self.properties if prop.model in self.all_models and prop.property_description is not None and prop.property_description.strip() != ""]

    # ------------------------------------------------------------------
    # 📝 Persistence helpers
    # ------------------------------------------------------------------
    def save(self, path: str, format: str = "json", storage: StorageAdapter | None = None) -> None:
        """Save the dataset to *path* in either ``json``, ``dataframe``, ``parquet`` or ``pickle`` format.

        The JSON variant produces a fully human-readable file while the pickle
        variant preserves the exact Python objects.
        """
        import json, pickle, os

        if storage is None:
            storage = get_storage_adapter()

        fmt = format.lower()

        # Ensure parent directory exists
        parent_dir = os.path.dirname(path)
        if parent_dir:
            storage.ensure_directory(parent_dir)

        if fmt == "json":
            storage.write_json(path, self.to_serializable_dict())
        elif fmt == "dataframe":
            df_content = self.to_dataframe().to_json(orient="records", lines=True)
            storage.write_text(path, df_content)
        elif fmt == "parquet":
            # Parquet requires special handling - write to temp file then upload
            import tempfile
            with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.parquet') as tmp:
                tmp_path = tmp.name
                self.to_dataframe().to_parquet(tmp_path)
            # Read and write via storage
            with open(tmp_path, 'rb') as f:
                content = f.read()
            storage.write_text(path, content.decode('latin1'))  # Binary as text hack
            os.unlink(tmp_path)
        elif fmt in {"pkl", "pickle"}:
            # Pickle requires binary - use temp file approach
            import tempfile
            with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl') as tmp:
                tmp_path = tmp.name
                pickle.dump(self, tmp)
            with open(tmp_path, 'rb') as f:
                content = f.read()
            storage.write_text(path, content.decode('latin1'))  # Binary as text hack
            os.unlink(tmp_path)
        else:
            raise ValueError(f"Unsupported format: {format}. Use 'json' or 'pickle'.")

    @staticmethod
    def get_all_models(conversations: List[ConversationRecord]):
        """Get all models in the dataset."""
        models = set()
        for conv in conversations:
            if isinstance(conv.model, list):
                models.update(conv.model)
            else:
                models.add(conv.model)
        return list(models)

    @classmethod
    def load(cls, path: str, format: str = "json", storage: StorageAdapter | None = None) -> "PropertyDataset":
        """Load a dataset previously saved with :py:meth:`save`."""
        import json, pickle, io

        if storage is None:
            storage = get_storage_adapter()

        fmt = format.lower()
        logger.info(f"Loading dataset from {path} with format {fmt}")
        if fmt == "json":
            logger.info(f"Loading dataset from {path}")
            data = storage.read_json(path)
            logger.debug(f"Data: {data.keys()}")

            # Expected format: dictionary with keys like "conversations", "properties", etc.
            conversations = [ConversationRecord(**conv) for conv in data["conversations"]]
            properties = [Property(**prop) for prop in data.get("properties", [])]

            # Convert cluster data to Cluster objects
            clusters = [Cluster(**cluster) for cluster in data.get("clusters", [])]

            model_stats = data.get("model_stats", {})
            all_models = data.get("all_models", PropertyDataset.get_all_models(conversations))
            return cls(conversations=conversations, properties=properties, clusters=clusters, model_stats=model_stats, all_models=all_models)
        elif fmt == "dataframe":
            # Handle dataframe format - this creates a list of objects when saved
            import pandas as pd
            content = storage.read_text(path)
            try:
                # Try to load as JSON Lines first
                df = pd.read_json(io.StringIO(content), orient="records", lines=True)
            except ValueError:
                # If that fails, try regular JSON
                df = pd.read_json(io.StringIO(content), orient="records")

            # Detect method based on columns
            method = "side_by_side" if {"model_a", "model_b"}.issubset(df.columns) else "single_model"

            return cls.from_dataframe(df, method=method)
        elif fmt in {"pkl", "pickle"}:
            # Pickle requires binary - read as text then decode
            import tempfile
            content_text = storage.read_text(path)
            content_bytes = content_text.encode('latin1')
            obj = pickle.loads(content_bytes)
            if not isinstance(obj, cls):
                raise TypeError("Pickle file does not contain a PropertyDataset object")
            return obj
        elif fmt == "parquet":
            # Load DataFrame and reconstruct minimal PropertyDataset with clusters
            import pandas as pd, tempfile, os
            # Read parquet via storage
            content_text = storage.read_text(path)
            content_bytes = content_text.encode('latin1')
            # Write to temp file for pandas
            with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.parquet') as tmp:
                tmp.write(content_bytes)
                tmp_path = tmp.name
            df = pd.read_parquet(tmp_path)
            os.unlink(tmp_path)

            # Attempt to detect method
            method = "side_by_side" if {"model_a", "model_b"}.issubset(df.columns) else "single_model"

            dataset = cls.from_dataframe(df, method=method)

            # Reconstruct Cluster objects if cluster columns are present
            required_cols = {
                "cluster_id",
                "cluster_label",
                "property_description",
            }
            if required_cols.issubset(df.columns):
                clusters_dict: Dict[Any, Cluster] = {}
                for _, row in df.iterrows():
                    cid = row["cluster_id"]
                    if pd.isna(cid):
                        continue
                    cluster = clusters_dict.setdefault(
                        cid,
                        Cluster(
                            id=int(cid),
                            label=row.get("cluster_label", str(cid)),
                            size=0,
                        ),
                    )
                    cluster.size += 1
                    pd_desc = row.get("property_description")
                    if pd_desc and pd_desc not in cluster.property_descriptions:
                        cluster.property_descriptions.append(pd_desc)
                    cluster.question_ids.append(str(row.get("question_id", "")))

                dataset.clusters = list(clusters_dict.values())

            return dataset
        else:
            raise ValueError(f"Unsupported format: {format}. Use 'json', 'dataframe', 'parquet', or 'pickle'.") 

from_dataframe(df, method='single_model') classmethod

Create PropertyDataset from existing DataFrame formats.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame with conversation data

required
method str

"side_by_side" for comparison data, "single_model" for single responses

'single_model'

Returns:

Type Description
PropertyDataset

PropertyDataset with populated conversations

Source code in stringsight/core/data_objects.py
@classmethod
def from_dataframe(cls, df: pd.DataFrame, method: str = "single_model") -> "PropertyDataset":
    """
    Create PropertyDataset from existing DataFrame formats.

    Args:
        df: Input DataFrame with conversation data
        method: "side_by_side" for comparison data, "single_model" for single responses

    Returns:
        PropertyDataset with populated conversations
    """
    conversations: list[dict[str, Any]] = []
    if method == "side_by_side":
        all_models = list(set(df["model_a"].unique().tolist() + df["model_b"].unique().tolist()))
        # Expected columns: question_id, prompt, model_a, model_b,
        # model_a_response, model_b_response, scores_a, scores_b, winner, etc.

        # Convert to list of dicts once - MUCH faster than iterrows()
        rows_list = df.to_dict('records')

        # Parallelize OAI conversions for better performance
        def _process_side_by_side_row(idx_row):
            idx, row = idx_row
            prompt = str(row.get('prompt', row.get('user_prompt', '')))
            model_a_response = row.get('model_a_response', '')
            model_b_response = row.get('model_b_response', '')

            # Convert responses to OAI format if they're strings
            oai_response_a, was_converted_a = check_and_convert_to_oai_format(prompt, model_a_response)
            oai_response_b, was_converted_b = check_and_convert_to_oai_format(prompt, model_b_response)

            return idx, oai_response_a, oai_response_b, row

        # Pre-allocate results list
        oai_results = [None] * len(rows_list)

        # Process conversions in parallel
        with ThreadPoolExecutor(max_workers=min(DEFAULT_MAX_WORKERS, len(rows_list))) as executor:
            futures = {executor.submit(_process_side_by_side_row, (idx, row)): idx
                      for idx, row in enumerate(rows_list)}
            for future in as_completed(futures):
                idx, oai_response_a, oai_response_b, row = future.result()
                oai_results[idx] = (oai_response_a, oai_response_b, row)

        # Now build conversations with pre-converted OAI responses
        for idx, result in enumerate(oai_results):
            if result is None:
                continue
            oai_response_a, oai_response_b, row = result
            prompt = str(row.get('prompt', row.get('user_prompt', '')))

            # Convert score formats to list format [scores_a, scores_b]
            def parse_score_field(score_value):
                """Parse score field that might be a string, dict, or other type."""
                if isinstance(score_value, dict):
                    return score_value
                elif isinstance(score_value, str) and score_value.strip():
                    try:
                        import ast
                        parsed = ast.literal_eval(score_value.strip())
                        return parsed if isinstance(parsed, dict) else {}
                    except (ValueError, SyntaxError):
                        return {}
                else:
                    return {}

            if 'score_a' in row and 'score_b' in row:
                # Format: score_a, score_b columns
                scores_a = parse_score_field(row.get('score_a', {}))
                scores_b = parse_score_field(row.get('score_b', {}))
            else:
                # No score data found
                scores_a, scores_b = {}, {}

            scores = [scores_a, scores_b]

            # Store winner and other metadata
            meta_with_winner = {k: v for k, v in row.items() 
                              if k not in ['question_id', 'prompt', 'user_prompt', 'model_a', 'model_b', 
                                         'model_a_response', 'model_b_response', 'score', 'score_a', 'score_b']}

            # Add winner to meta if present
            winner = row.get('winner')
            if winner is None and isinstance(row.get('score'), dict):
                # Fallback to looking in score dict
                winner = row.get('score').get('winner')

            if winner is not None:
                meta_with_winner['winner'] = winner

            # Use question_id column if present and not None, else fall back to row index
            qid = row.get('question_id')
            if qid is None:
                qid = idx
            conversation = ConversationRecord(
                question_id=str(qid),
                prompt=prompt,
                model=[row.get('model_a', 'model_a'), row.get('model_b', 'model_b')],
                responses=[oai_response_a, oai_response_b],
                scores=scores,
                meta=meta_with_winner
            )
            conversations.append(conversation)

    elif method == "single_model":
        all_models = df["model"].unique().tolist()
        # Expected columns: question_id, prompt, model, model_response, score, etc.

        def parse_single_score_field(score_value):
            """Parse single model score field that might be a string, dict, number, or other type."""
            if isinstance(score_value, dict):
                return score_value
            elif isinstance(score_value, (int, float)):
                return {'score': score_value}
            elif isinstance(score_value, str) and score_value.strip():
                try:
                    import ast
                    parsed = ast.literal_eval(score_value.strip())
                    if isinstance(parsed, dict):
                        return parsed
                    elif isinstance(parsed, (int, float)):
                        return {'score': parsed}
                    else:
                        return {'score': 0}
                except (ValueError, SyntaxError):
                    return {'score': 0}
            else:
                return {'score': 0}

        # Convert to list of dicts once - MUCH faster than iterrows()
        rows_list = df.to_dict('records')

        # Parallelize OAI conversions for better performance
        def _process_single_model_row(idx_row):
            idx, row = idx_row
            prompt = str(row.get('prompt', row.get('user_prompt', '')))
            response = row.get('model_response', '')

            # Convert response to OAI format if it's a string
            oai_response, was_converted = check_and_convert_to_oai_format(prompt, response)

            return idx, oai_response, row

        # Pre-allocate results list
        oai_results = [None] * len(rows_list)

        # Process conversions in parallel
        with ThreadPoolExecutor(max_workers=min(DEFAULT_MAX_WORKERS, len(rows_list))) as executor:
            futures = {executor.submit(_process_single_model_row, (idx, row)): idx
                      for idx, row in enumerate(rows_list)}
            for future in as_completed(futures):
                idx, oai_response, row = future.result()
                oai_results[idx] = (oai_response, row)

        # Now build conversations with pre-converted OAI responses
        for idx, result in enumerate(oai_results):
            if result is None:
                continue
            oai_response, row = result
            scores = parse_single_score_field(row.get('score'))
            prompt = str(row.get('prompt', row.get('user_prompt', '')))

            # Use question_id column if present and not None, else fall back to row index
            qid = row.get('question_id')
            if qid is None:
                qid = idx
            conversation = ConversationRecord(
                question_id=str(qid),
                prompt=prompt,
                model=str(row.get('model', 'model')),
                responses=oai_response,
                scores=scores,
                meta={k: v for k, v in row.items()
                      if k not in ['question_id', 'prompt', 'user_prompt', 'model', 'model_response', 'score']}
            )
            conversations.append(conversation)
    else:
        raise ValueError(f"Unknown method: {method}. Must be 'side_by_side' or 'single_model'")

    # Convert dict conversations to ConversationRecord objects
    conversation_records = [
        ConversationRecord(**conv) if isinstance(conv, dict) else conv
        for conv in conversations
    ]
    return cls(conversations=conversation_records, all_models=all_models)

save(path, format='json', storage=None)

Save the dataset to path in either json, dataframe, parquet or pickle format.

The JSON variant produces a fully human-readable file while the pickle variant preserves the exact Python objects.

Source code in stringsight/core/data_objects.py
def save(self, path: str, format: str = "json", storage: StorageAdapter | None = None) -> None:
    """Save the dataset to *path* in either ``json``, ``dataframe``, ``parquet`` or ``pickle`` format.

    The JSON variant produces a fully human-readable file while the pickle
    variant preserves the exact Python objects.
    """
    import json, pickle, os

    if storage is None:
        storage = get_storage_adapter()

    fmt = format.lower()

    # Ensure parent directory exists
    parent_dir = os.path.dirname(path)
    if parent_dir:
        storage.ensure_directory(parent_dir)

    if fmt == "json":
        storage.write_json(path, self.to_serializable_dict())
    elif fmt == "dataframe":
        df_content = self.to_dataframe().to_json(orient="records", lines=True)
        storage.write_text(path, df_content)
    elif fmt == "parquet":
        # Parquet requires special handling - write to temp file then upload
        import tempfile
        with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.parquet') as tmp:
            tmp_path = tmp.name
            self.to_dataframe().to_parquet(tmp_path)
        # Read and write via storage
        with open(tmp_path, 'rb') as f:
            content = f.read()
        storage.write_text(path, content.decode('latin1'))  # Binary as text hack
        os.unlink(tmp_path)
    elif fmt in {"pkl", "pickle"}:
        # Pickle requires binary - use temp file approach
        import tempfile
        with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl') as tmp:
            tmp_path = tmp.name
            pickle.dump(self, tmp)
        with open(tmp_path, 'rb') as f:
            content = f.read()
        storage.write_text(path, content.decode('latin1'))  # Binary as text hack
        os.unlink(tmp_path)
    else:
        raise ValueError(f"Unsupported format: {format}. Use 'json' or 'pickle'.")

load(path, format='json', storage=None) classmethod

Load a dataset previously saved with :py:meth:save.

Source code in stringsight/core/data_objects.py
@classmethod
def load(cls, path: str, format: str = "json", storage: StorageAdapter | None = None) -> "PropertyDataset":
    """Load a dataset previously saved with :py:meth:`save`."""
    import json, pickle, io

    if storage is None:
        storage = get_storage_adapter()

    fmt = format.lower()
    logger.info(f"Loading dataset from {path} with format {fmt}")
    if fmt == "json":
        logger.info(f"Loading dataset from {path}")
        data = storage.read_json(path)
        logger.debug(f"Data: {data.keys()}")

        # Expected format: dictionary with keys like "conversations", "properties", etc.
        conversations = [ConversationRecord(**conv) for conv in data["conversations"]]
        properties = [Property(**prop) for prop in data.get("properties", [])]

        # Convert cluster data to Cluster objects
        clusters = [Cluster(**cluster) for cluster in data.get("clusters", [])]

        model_stats = data.get("model_stats", {})
        all_models = data.get("all_models", PropertyDataset.get_all_models(conversations))
        return cls(conversations=conversations, properties=properties, clusters=clusters, model_stats=model_stats, all_models=all_models)
    elif fmt == "dataframe":
        # Handle dataframe format - this creates a list of objects when saved
        import pandas as pd
        content = storage.read_text(path)
        try:
            # Try to load as JSON Lines first
            df = pd.read_json(io.StringIO(content), orient="records", lines=True)
        except ValueError:
            # If that fails, try regular JSON
            df = pd.read_json(io.StringIO(content), orient="records")

        # Detect method based on columns
        method = "side_by_side" if {"model_a", "model_b"}.issubset(df.columns) else "single_model"

        return cls.from_dataframe(df, method=method)
    elif fmt in {"pkl", "pickle"}:
        # Pickle requires binary - read as text then decode
        import tempfile
        content_text = storage.read_text(path)
        content_bytes = content_text.encode('latin1')
        obj = pickle.loads(content_bytes)
        if not isinstance(obj, cls):
            raise TypeError("Pickle file does not contain a PropertyDataset object")
        return obj
    elif fmt == "parquet":
        # Load DataFrame and reconstruct minimal PropertyDataset with clusters
        import pandas as pd, tempfile, os
        # Read parquet via storage
        content_text = storage.read_text(path)
        content_bytes = content_text.encode('latin1')
        # Write to temp file for pandas
        with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.parquet') as tmp:
            tmp.write(content_bytes)
            tmp_path = tmp.name
        df = pd.read_parquet(tmp_path)
        os.unlink(tmp_path)

        # Attempt to detect method
        method = "side_by_side" if {"model_a", "model_b"}.issubset(df.columns) else "single_model"

        dataset = cls.from_dataframe(df, method=method)

        # Reconstruct Cluster objects if cluster columns are present
        required_cols = {
            "cluster_id",
            "cluster_label",
            "property_description",
        }
        if required_cols.issubset(df.columns):
            clusters_dict: Dict[Any, Cluster] = {}
            for _, row in df.iterrows():
                cid = row["cluster_id"]
                if pd.isna(cid):
                    continue
                cluster = clusters_dict.setdefault(
                    cid,
                    Cluster(
                        id=int(cid),
                        label=row.get("cluster_label", str(cid)),
                        size=0,
                    ),
                )
                cluster.size += 1
                pd_desc = row.get("property_description")
                if pd_desc and pd_desc not in cluster.property_descriptions:
                    cluster.property_descriptions.append(pd_desc)
                cluster.question_ids.append(str(row.get("question_id", "")))

            dataset.clusters = list(clusters_dict.values())

        return dataset
    else:
        raise ValueError(f"Unsupported format: {format}. Use 'json', 'dataframe', 'parquet', or 'pickle'.") 

to_dataframe(type='all', method='side_by_side')

Convert PropertyDataset back to DataFrame format.

Returns:

Type Description
DataFrame

DataFrame with original data plus extracted properties and clusters

Source code in stringsight/core/data_objects.py
def to_dataframe(self, type: str = "all", method: str = "side_by_side") -> pd.DataFrame:
    """
    Convert PropertyDataset back to DataFrame format.

    Returns:
        DataFrame with original data plus extracted properties and clusters
    """

    assert type in ["base", "properties", "clusters", "all"], f"Invalid type: {type}. Must be 'all' or 'base'"
    # Start with conversation data
    rows = []
    for conv in self.conversations:
        if isinstance(conv.model, str):
            base_row = {
                'question_id': conv.question_id,
                'prompt': conv.prompt,
                'model': conv.model,
                'model_response': conv.responses,
                'score': conv.scores,
                **conv.meta
            }
        elif isinstance(conv.model, list):
            # Side-by-side format: scores stored as [scores_a, scores_b]
            if isinstance(conv.scores, list) and len(conv.scores) == 2:
                scores_a, scores_b = conv.scores[0], conv.scores[1]
            else:
                # Fallback if scores isn't properly formatted
                scores_a, scores_b = {}, {}

            base_row = {
                'question_id': conv.question_id,
                'prompt': conv.prompt,
                'model_a': conv.model[0],
                'model_b': conv.model[1],
                'model_a_response': conv.responses[0],
                'model_b_response': conv.responses[1],
                'score_a': scores_a,
                'score_b': scores_b,
                'winner': conv.meta.get('winner'),  # Winner stored in meta
                **{k: v for k, v in conv.meta.items() if k != 'winner'}  # Exclude winner from other meta
            }
        else:
            raise ValueError(f"Invalid model type: {type(conv.model)}. Must be str or list.")

        rows.append(base_row)

    df = pd.DataFrame(rows)
    # Ensure question_id is a string
    if not df.empty and "question_id" in df.columns:
        df["question_id"] = df["question_id"].astype(str)
    logger.debug(f"Original unique questions: {df.question_id.nunique()}")

    # Add properties if they exist
    if self.properties and type in ["all", "properties", "clusters"]:
        # Create a mapping from (question_id, model) to properties
        prop_map: Dict[tuple, List[Property]] = {}
        for prop in self.properties:
            key = (prop.question_id, prop.model)
            if key not in prop_map:
                prop_map[key] = []
            prop_map[key].append(prop)

        # create property df
        prop_df = pd.DataFrame([p.to_dict() for p in self.properties])
        # Ensure question_id is a string in properties df
        if not prop_df.empty and "question_id" in prop_df.columns:
            prop_df["question_id"] = prop_df["question_id"].astype(str)
        logger.debug(f"len of base df {len(df)}")
        if "model_a" in df.columns and "model_b" in df.columns:
            # For side-by-side inputs, merge properties by question_id (both models share the question)
            df = df.merge(prop_df, on=["question_id"], how="left")

            # Handle id collision (id_x=conversation, id_y=property)
            if "id_y" in df.columns:
                df["property_id"] = df["id_y"]
                df["id"] = df["id_y"] # Ensure 'id' is property_id for downstream
            elif "id" in df.columns and "property_id" not in df.columns:
                df["property_id"] = df["id"]

            # Deduplicate by property id when available
            if "property_id" in df.columns:
                df = df.drop_duplicates(subset="property_id")
        else:
            # CHANGE: Use left join to preserve all conversations, including those without properties
            # Don't drop duplicates to ensure conversations without properties are preserved
            df = df.merge(prop_df, on=["question_id", "model"], how="left")

            # Handle id collision
            if "id_y" in df.columns:
                df["property_id"] = df["id_y"]
                df["id"] = df["id_y"]
            elif "id" in df.columns and "property_id" not in df.columns:
                df["property_id"] = df["id"]
        logger.debug(f"len of df after merge with properties {len(df)}")

        # ------------------------------------------------------------------
        # Ensure `model` column is present (avoid _x / _y duplicates)
        # ------------------------------------------------------------------
        if "model" not in df.columns:
            if "model_y" in df.columns:
                print(f"df.model_y.value_counts(): {df.model_y.value_counts()}")
            if "model_x" in df.columns:
                print(f"df.model_x.value_counts(): {df.model_x.value_counts()}")
            if "model_x" in df.columns or "model_y" in df.columns:
                df["model"] = df.get("model_y").combine_first(df.get("model_x"))
                df.drop(columns=[c for c in ["model_x", "model_y"] if c in df.columns], inplace=True)

    # Only print model value counts if the column exists
    if "model" in df.columns:
        logger.debug(f"df.model.value_counts() NEW: {df.model.value_counts()}")
    logger.debug(f"total questions: {df.question_id.nunique()}")

    if self.clusters and type in ["all", "clusters"]:
        # If cluster columns already exist (e.g. after reload from parquet)
        # skip the merge to avoid duplicate _x / _y columns.
        if "cluster_id" not in df.columns:
            cluster_df = pd.DataFrame([c.to_dict() for c in self.clusters])
            cluster_df.rename(
                columns={
                    "id": "cluster_id",
                    "label": "cluster_label",
                    "size": "cluster_size",
                    "property_descriptions": "property_description",
                },
                inplace=True,
            )
            # Explode aligned list columns so each row maps to a single property
            # Explode only aligned columns to avoid mismatched element counts
            list_cols = [
                col for col in [
                    "property_description",
                    "question_ids",
                ] if col in cluster_df.columns
            ]
            if list_cols:
                try:
                    cluster_df = cluster_df.explode(list_cols, ignore_index=True)
                except (TypeError, ValueError):
                    # Fallback: explode sequentially to avoid alignment constraints
                    for col in list_cols:
                        cluster_df = cluster_df.explode(col, ignore_index=True)
            df = df.merge(cluster_df, on=["property_description"], how="left")

    # CHANGE: Handle conversations without properties by creating a "No properties" cluster
    # This ensures all conversations are considered in metrics calculation
    if type in ["all", "clusters"]:
        # Identify rows without properties (no property_description or it's NaN)
        mask_no_properties = df["property_description"].isna() | (df["property_description"].astype(str).str.strip() == "")

        # Only add the synthetic cluster if *all* rows lack a property description.
        # If at least one property exists, we skip to avoid mixing partially
        # processed conversations into a global "No properties" cluster.

        if mask_no_properties.all():
            logger.info("All conversations lack properties – creating 'No properties' cluster")

            # Fill in missing data for conversations without properties
            df.loc[mask_no_properties, "property_description"] = "No properties"
            df.loc[mask_no_properties, "cluster_id"] = -2  # Use -2 since -1 is for outliers
            df.loc[mask_no_properties, "cluster_label"] = "No properties"

            # Handle missing scores for conversations without properties
            mask_no_score = mask_no_properties & (df["score"].isna() | (df["score"] == {}))
            if mask_no_score.any():
                df.loc[mask_no_score, "score"] = df.loc[mask_no_score, "score"].apply(lambda x: {"score": 0} if pd.isna(x) or x == {} else x)

    return df

ConversationRecord

ConversationRecord dataclass

A single conversation with prompt, responses, and metadata.

Source code in stringsight/core/data_objects.py
@dataclass
class ConversationRecord:
    """A single conversation with prompt, responses, and metadata."""
    question_id: str 
    prompt: str
    model: str | List[str]  # model name(s) - single string or list for side-by-side comparisons
    responses: str | List[str] # model response(s) - single string or list for side-by-side comparisons
    scores: Dict[str, Any] | List[Dict[str, Any]]     # For single model: {score_name: score_value}. For side-by-side: [scores_a, scores_b] 
    meta: Dict[str, Any] = field(default_factory=dict)  # winner, language, etc. (winner stored here for side-by-side)

    def __post_init__(self):
        """Migrate legacy score formats to the new list format for side-by-side."""
        # Ensure question_id is a string
        self.question_id = str(self.question_id)

        # Handle migration of score_a/score_b from meta field to scores list for side-by-side
        if isinstance(self.model, (list, tuple)) and len(self.model) == 2:
            model_a, model_b = self.model[0], self.model[1]

            # 1. Handle migration of score_a/score_b from meta field
            if (not self.scores or self.scores == {}) and ('score_a' in self.meta and 'score_b' in self.meta):
                scores_a = self.meta.pop('score_a', {})
                scores_b = self.meta.pop('score_b', {})
                self.scores = [scores_a, scores_b]

            # 2. Handle "winner" -> numeric scores conversion
            # Check if we need to derive scores from a winner field
            # Winner can be in self.scores['winner'] or self.meta['winner']
            winner = None
            if isinstance(self.scores, dict) and 'winner' in self.scores:
                winner = self.scores.get('winner')
            elif 'winner' in self.meta:
                winner = self.meta.get('winner')

            # If we have a winner but no explicit per-model scores list, generate it
            # Also handle case where scores is a list of empty dicts [{}, {}] which can happen from_dataframe
            is_effectively_empty = False
            if not self.scores:
                is_effectively_empty = True
            elif isinstance(self.scores, list):
                # Check if all elements are empty dicts or None
                is_effectively_empty = all(not s for s in self.scores)
            elif isinstance(self.scores, dict) and not self.scores:
                 is_effectively_empty = True

            if winner is not None and is_effectively_empty:
                # Calculate scores (+1 winner, -1 loser, 0 tie)
                s_a, s_b = {}, {}

                if winner == model_a:
                    s_a['winner'] = 1.0
                    s_b['winner'] = -1.0
                elif winner == model_b:
                    s_a['winner'] = -1.0
                    s_b['winner'] = 1.0
                elif isinstance(winner, str) and 'tie' in winner.lower():
                    s_a['winner'] = 0.0
                    s_b['winner'] = 0.0
                else:
                     # Unknown winner string or format - leave empty
                     pass

                if s_a or s_b:
                    self.scores = [s_a, s_b]
                    # Ensure winner is also in meta for reference
                    self.meta['winner'] = winner

__post_init__()

Migrate legacy score formats to the new list format for side-by-side.

Source code in stringsight/core/data_objects.py
def __post_init__(self):
    """Migrate legacy score formats to the new list format for side-by-side."""
    # Ensure question_id is a string
    self.question_id = str(self.question_id)

    # Handle migration of score_a/score_b from meta field to scores list for side-by-side
    if isinstance(self.model, (list, tuple)) and len(self.model) == 2:
        model_a, model_b = self.model[0], self.model[1]

        # 1. Handle migration of score_a/score_b from meta field
        if (not self.scores or self.scores == {}) and ('score_a' in self.meta and 'score_b' in self.meta):
            scores_a = self.meta.pop('score_a', {})
            scores_b = self.meta.pop('score_b', {})
            self.scores = [scores_a, scores_b]

        # 2. Handle "winner" -> numeric scores conversion
        # Check if we need to derive scores from a winner field
        # Winner can be in self.scores['winner'] or self.meta['winner']
        winner = None
        if isinstance(self.scores, dict) and 'winner' in self.scores:
            winner = self.scores.get('winner')
        elif 'winner' in self.meta:
            winner = self.meta.get('winner')

        # If we have a winner but no explicit per-model scores list, generate it
        # Also handle case where scores is a list of empty dicts [{}, {}] which can happen from_dataframe
        is_effectively_empty = False
        if not self.scores:
            is_effectively_empty = True
        elif isinstance(self.scores, list):
            # Check if all elements are empty dicts or None
            is_effectively_empty = all(not s for s in self.scores)
        elif isinstance(self.scores, dict) and not self.scores:
             is_effectively_empty = True

        if winner is not None and is_effectively_empty:
            # Calculate scores (+1 winner, -1 loser, 0 tie)
            s_a, s_b = {}, {}

            if winner == model_a:
                s_a['winner'] = 1.0
                s_b['winner'] = -1.0
            elif winner == model_b:
                s_a['winner'] = -1.0
                s_b['winner'] = 1.0
            elif isinstance(winner, str) and 'tie' in winner.lower():
                s_a['winner'] = 0.0
                s_b['winner'] = 0.0
            else:
                 # Unknown winner string or format - leave empty
                 pass

            if s_a or s_b:
                self.scores = [s_a, s_b]
                # Ensure winner is also in meta for reference
                self.meta['winner'] = winner

Property

Property dataclass

An extracted behavioral property from a model response.

Source code in stringsight/core/data_objects.py
@dataclass
class Property:
    """An extracted behavioral property from a model response."""
    id: str # unique id for the property
    question_id: str
    model: str | list[str]
    # Parsed fields (filled by LLMJsonParser)
    property_description: str | None = None
    category: str | None = None
    reason: str | None = None
    evidence: str | None = None
    behavior_type: str | None = None # Positive|Negative (non-critical)|Negative (critical)|Style

    # Raw LLM response (captured by extractor before parsing)
    raw_response: str | None = None
    contains_errors: bool | None = None
    unexpected_behavior: bool | None = None
    meta: Dict[str, Any] = field(default_factory=dict) # all other metadata

    def to_dict(self):
        return asdict(self)

    def __post_init__(self):
        """Validate property fields after initialization."""
        # Ensure ids are strings
        self.id = str(self.id)
        self.question_id = str(self.question_id)

        # Require that the model has been resolved to a known value
        if isinstance(self.model, str) and self.model.lower() == "unknown":
            raise ValueError("Property must have a known model; got 'unknown'.")

__post_init__()

Validate property fields after initialization.

Source code in stringsight/core/data_objects.py
def __post_init__(self):
    """Validate property fields after initialization."""
    # Ensure ids are strings
    self.id = str(self.id)
    self.question_id = str(self.question_id)

    # Require that the model has been resolved to a known value
    if isinstance(self.model, str) and self.model.lower() == "unknown":
        raise ValueError("Property must have a known model; got 'unknown'.")

Cluster

Cluster dataclass

A cluster of properties.

Source code in stringsight/core/data_objects.py
@dataclass
class Cluster:
    """A cluster of properties."""
    id: str | int # cluster id
    label: str # cluster label
    size: int # cluster size
    property_descriptions: List[str] = field(default_factory=list) # property descriptions in the cluster
    property_ids: List[str] = field(default_factory=list) # property ids in the cluster
    question_ids: List[str] = field(default_factory=list) # ids of the conversations in the cluster
    meta: Dict[str, Any] = field(default_factory=dict) # all other metadata

    def __post_init__(self):
        """Ensure consistent types."""
        self.id = str(self.id)
        # Ensure lists contain strings
        if self.property_ids:
            self.property_ids = [str(pid) for pid in self.property_ids]
        if self.question_ids:
            self.question_ids = [str(qid) for qid in self.question_ids]

    def to_dict(self):
        return asdict(self)

    def to_sample_dict(self, n: int = 5):
        """Return a dictionary that samples n property descriptions and ids from the cluster."""
        return {
            "id": self.id,
            "label": self.label,
            "size": self.size,
            "property_descriptions": random.sample(self.property_descriptions, n),
            "question_ids": random.sample(self.question_ids, n),
            "property_ids": random.sample(self.property_ids, n),
            "meta": self.meta,
        }

__post_init__()

Ensure consistent types.

Source code in stringsight/core/data_objects.py
def __post_init__(self):
    """Ensure consistent types."""
    self.id = str(self.id)
    # Ensure lists contain strings
    if self.property_ids:
        self.property_ids = [str(pid) for pid in self.property_ids]
    if self.question_ids:
        self.question_ids = [str(qid) for qid in self.question_ids]

to_sample_dict(n=5)

Return a dictionary that samples n property descriptions and ids from the cluster.

Source code in stringsight/core/data_objects.py
def to_sample_dict(self, n: int = 5):
    """Return a dictionary that samples n property descriptions and ids from the cluster."""
    return {
        "id": self.id,
        "label": self.label,
        "size": self.size,
        "property_descriptions": random.sample(self.property_descriptions, n),
        "question_ids": random.sample(self.question_ids, n),
        "property_ids": random.sample(self.property_ids, n),
        "meta": self.meta,
    }

Pipeline Components

PipelineStage

PipelineStage

Bases: ABC

Abstract base class for all pipeline stages.

Each stage takes a PropertyDataset as input and returns a PropertyDataset as output. This allows stages to be composed into pipelines.

Source code in stringsight/core/stage.py
class PipelineStage(ABC):
    """
    Abstract base class for all pipeline stages.

    Each stage takes a PropertyDataset as input and returns a PropertyDataset as output.
    This allows stages to be composed into pipelines.
    """

    def __init__(self, *args, **kwargs):
        """Initialize the stage with configuration parameters and propagate to mixins."""
        # Store config before passing to mixins (copy to avoid mutating original)
        self.config = dict(kwargs)
        self.name = self.__class__.__name__

        # Call next __init__ in MRO – no kwargs so they don't reach object.__init__
        super().__init__()

    @abstractmethod
    def run(self, data: PropertyDataset, progress_callback: Any = None, **kwargs: Any) -> PropertyDataset | Any:
        """
        Process the input data and return the modified data.

        Can be either sync or async (returning PropertyDataset or Coroutine[Any, Any, PropertyDataset]).

        Args:
            data: Input PropertyDataset
            progress_callback: Optional callback(completed, total) for progress updates
            **kwargs: Additional keyword arguments specific to the stage implementation

        Returns:
            Modified PropertyDataset (or Coroutine that resolves to PropertyDataset for async stages)
        """
        pass

    def validate_input(self, data: PropertyDataset) -> None:
        """
        Validate that the input data meets the requirements for this stage.

        Args:
            data: Input PropertyDataset

        Raises:
            ValueError: If the input data is invalid
        """
        if not isinstance(data, PropertyDataset):
            raise ValueError(f"Input must be a PropertyDataset, got {type(data)}")

    def validate_output(self, data: PropertyDataset) -> None:
        """
        Validate that the output data is valid.

        Args:
            data: Output PropertyDataset

        Raises:
            ValueError: If the output data is invalid
        """
        if not isinstance(data, PropertyDataset):
            raise ValueError(f"Output must be a PropertyDataset, got {type(data)}")

    async def __call__(self, data: PropertyDataset, progress_callback: Any = None) -> PropertyDataset:
        """
        Convenience method to run the stage.

        This allows stages to be called directly: stage(data) or await stage(data)
        Handles both sync and async run() methods automatically.
        """
        import inspect
        self.validate_input(data)

        # Check if run() is a coroutine function (async)
        if inspect.iscoroutinefunction(self.run):
            # Check if run accepts progress_callback
            sig = inspect.signature(self.run)
            if 'progress_callback' in sig.parameters:
                result = await self.run(data, progress_callback=progress_callback)
            else:
                result = await self.run(data)
        else:
            # Check if run accepts progress_callback
            sig = inspect.signature(self.run)
            if 'progress_callback' in sig.parameters:
                result = self.run(data, progress_callback=progress_callback)
            else:
                result = self.run(data)

        self.validate_output(result)
        return result

    def __repr__(self) -> str:
        return f"{self.name}({self.config})"

__init__(*args, **kwargs)

Initialize the stage with configuration parameters and propagate to mixins.

Source code in stringsight/core/stage.py
def __init__(self, *args, **kwargs):
    """Initialize the stage with configuration parameters and propagate to mixins."""
    # Store config before passing to mixins (copy to avoid mutating original)
    self.config = dict(kwargs)
    self.name = self.__class__.__name__

    # Call next __init__ in MRO – no kwargs so they don't reach object.__init__
    super().__init__()

run(data, progress_callback=None, **kwargs) abstractmethod

Process the input data and return the modified data.

Can be either sync or async (returning PropertyDataset or Coroutine[Any, Any, PropertyDataset]).

Parameters:

Name Type Description Default
data PropertyDataset

Input PropertyDataset

required
progress_callback Any

Optional callback(completed, total) for progress updates

None
**kwargs Any

Additional keyword arguments specific to the stage implementation

{}

Returns:

Type Description
PropertyDataset | Any

Modified PropertyDataset (or Coroutine that resolves to PropertyDataset for async stages)

Source code in stringsight/core/stage.py
@abstractmethod
def run(self, data: PropertyDataset, progress_callback: Any = None, **kwargs: Any) -> PropertyDataset | Any:
    """
    Process the input data and return the modified data.

    Can be either sync or async (returning PropertyDataset or Coroutine[Any, Any, PropertyDataset]).

    Args:
        data: Input PropertyDataset
        progress_callback: Optional callback(completed, total) for progress updates
        **kwargs: Additional keyword arguments specific to the stage implementation

    Returns:
        Modified PropertyDataset (or Coroutine that resolves to PropertyDataset for async stages)
    """
    pass

validate_input(data)

Validate that the input data meets the requirements for this stage.

Parameters:

Name Type Description Default
data PropertyDataset

Input PropertyDataset

required

Raises:

Type Description
ValueError

If the input data is invalid

Source code in stringsight/core/stage.py
def validate_input(self, data: PropertyDataset) -> None:
    """
    Validate that the input data meets the requirements for this stage.

    Args:
        data: Input PropertyDataset

    Raises:
        ValueError: If the input data is invalid
    """
    if not isinstance(data, PropertyDataset):
        raise ValueError(f"Input must be a PropertyDataset, got {type(data)}")

validate_output(data)

Validate that the output data is valid.

Parameters:

Name Type Description Default
data PropertyDataset

Output PropertyDataset

required

Raises:

Type Description
ValueError

If the output data is invalid

Source code in stringsight/core/stage.py
def validate_output(self, data: PropertyDataset) -> None:
    """
    Validate that the output data is valid.

    Args:
        data: Output PropertyDataset

    Raises:
        ValueError: If the output data is invalid
    """
    if not isinstance(data, PropertyDataset):
        raise ValueError(f"Output must be a PropertyDataset, got {type(data)}")

__call__(data, progress_callback=None) async

Convenience method to run the stage.

This allows stages to be called directly: stage(data) or await stage(data) Handles both sync and async run() methods automatically.

Source code in stringsight/core/stage.py
async def __call__(self, data: PropertyDataset, progress_callback: Any = None) -> PropertyDataset:
    """
    Convenience method to run the stage.

    This allows stages to be called directly: stage(data) or await stage(data)
    Handles both sync and async run() methods automatically.
    """
    import inspect
    self.validate_input(data)

    # Check if run() is a coroutine function (async)
    if inspect.iscoroutinefunction(self.run):
        # Check if run accepts progress_callback
        sig = inspect.signature(self.run)
        if 'progress_callback' in sig.parameters:
            result = await self.run(data, progress_callback=progress_callback)
        else:
            result = await self.run(data)
    else:
        # Check if run accepts progress_callback
        sig = inspect.signature(self.run)
        if 'progress_callback' in sig.parameters:
            result = self.run(data, progress_callback=progress_callback)
        else:
            result = self.run(data)

    self.validate_output(result)
    return result

Pipeline

Pipeline

Bases: LoggingMixin, TimingMixin, ErrorHandlingMixin, WandbMixin

A pipeline for processing data through multiple stages.

The Pipeline class coordinates the execution of multiple pipeline stages, handles error recovery, and provides logging and timing information.

Source code in stringsight/pipeline.py
class Pipeline(LoggingMixin, TimingMixin, ErrorHandlingMixin, WandbMixin):
    """
    A pipeline for processing data through multiple stages.

    The Pipeline class coordinates the execution of multiple pipeline stages,
    handles error recovery, and provides logging and timing information.
    """

    def __init__(
        self,
        name: str,
        stages: List[PipelineStage] | None = None,
        storage: StorageAdapter | None = None,
        **kwargs: Any
    ):
        """
        Initialize a new Pipeline.

        Args:
            name: Name of the pipeline
            stages: List of pipeline stages to execute
            storage: Storage adapter for file I/O (defaults to configured adapter)
            **kwargs: Additional configuration options
        """
        # Set name first, before calling parent __init__ methods that might use it
        self.name = name
        self.stages = stages or []
        self.stage_times: dict[str, float] = {}
        self.stage_errors: dict[str, str] = {}
        # Store output directory (if any) so that we can automatically persist
        # intermediate pipeline results after each stage.  This enables tooling
        # such as compute_metrics_only() to pick up from any point in the
        # pipeline without the caller having to remember to save explicitly.
        self.output_dir: str | None = kwargs.get('output_dir')
        self.storage = storage or get_storage_adapter()

        # Now call parent __init__ methods safely
        super().__init__(**kwargs)

        # Initialize wandb if enabled (after all parent inits are done)
        if hasattr(self, 'use_wandb') and self.use_wandb:
            self.init_wandb()

        # Mark all stages as using the same wandb run
        for stage in self.stages:
            if hasattr(stage, 'use_wandb') and stage.use_wandb and hasattr(stage, '_wandb_ok'):
                stage._wandb_ok = True  # Mark that wandb is available

    def add_stage(self, stage: PipelineStage) -> None:
        """Add a stage to the end of the pipeline."""
        self.stages.append(stage)

        # Mark the new stage as using the same wandb run if wandb is enabled
        if hasattr(self, 'use_wandb') and self.use_wandb and hasattr(stage, 'use_wandb') and stage.use_wandb and hasattr(stage, '_wandb_ok'):
            stage._wandb_ok = True  # Mark that wandb is available

    def insert_stage(self, index: int, stage: PipelineStage) -> None:
        """Insert a stage at a specific position in the pipeline."""
        self.stages.insert(index, stage)

        # Mark the inserted stage as using the same wandb run if wandb is enabled
        if hasattr(self, 'use_wandb') and self.use_wandb and hasattr(stage, 'use_wandb') and stage.use_wandb and hasattr(stage, '_wandb_ok'):
            stage._wandb_ok = True  # Mark that wandb is available

    def remove_stage(self, index: int) -> PipelineStage:
        """Remove and return a stage at a specific position."""
        return self.stages.pop(index)

    async def run(self, data: PropertyDataset, progress_callback: Any = None) -> PropertyDataset:
        """
        Execute all stages in the pipeline.

        Args:
            data: Input PropertyDataset
            progress_callback: Optional callback(float) -> None to report progress (0.0-1.0)

        Returns:
            PropertyDataset after processing through all stages
        """
        self.log(f"Starting pipeline '{self.name}' with {len(self.stages)} stages")
        self.start_timer()

        # Count initial models
        initial_models = set()
        for conv in data.conversations:
            if isinstance(conv.model, list):
                initial_models.update(conv.model)
            else:
                initial_models.add(conv.model)

        print(f"\n🚀 Starting pipeline '{self.name}'")
        print(f"   • Input conversations: {len(data.conversations)}")
        print(f"   • Input models: {len(initial_models)}")
        if len(initial_models) <= 20:
            model_list = sorted(list(initial_models))
            print(f"   • Model names: {', '.join(model_list)}")
        print()

        current_data = data

        for i, stage in enumerate(self.stages):
            # Report progress at start of stage
            if progress_callback:
                # Progress is fraction of stages completed
                # We can also use i / len(self.stages) + something for intra-stage progress if we had it
                progress = i / len(self.stages)
                try:
                    progress_callback(progress)
                except Exception as e:
                    print(f"Warning: progress callback failed: {e}")

            stage_start_time = time.time()

            # try:
            self.log(f"Running stage {i+1}/{len(self.stages)}: {stage.name}")

            # Create a stage-specific progress callback
            stage_progress_callback = None
            if progress_callback:
                def make_callback(stage_idx, total_stages):
                    def callback(progress_or_completed, total=None):
                        # Handle both callback(progress) and callback(completed, total) signatures
                        if total is not None and total > 0:
                            stage_progress = progress_or_completed / total
                        else:
                            stage_progress = progress_or_completed

                        # stage_progress is 0.0 to 1.0 within the stage
                        # overall progress = (stage_idx + stage_progress) / total_stages
                        # Ensure we don't exceed 1.0 or go backwards (though backwards is possible if stage resets)
                        if isinstance(stage_progress, (int, float)):
                            overall = (stage_idx + min(max(stage_progress, 0.0), 1.0)) / total_stages
                            try:
                                progress_callback(overall)
                            except Exception:
                                pass
                    return callback

                stage_progress_callback = make_callback(i, len(self.stages))

            # Pass progress callback to stage
            # The stage.__call__ method we updated handles checking if the underlying run() accepts it
            current_data = await stage(current_data, progress_callback=stage_progress_callback)

            # Track timing
            stage_execution_time = time.time() - stage_start_time
            self.stage_times[stage.name] = stage_execution_time

            self.log(f"Stage {stage.name} completed in {stage_execution_time:.2f}s")

            # Log stage-specific metrics
            self._log_stage_metrics(stage, current_data)

            # --------------------------------------------------------------
            # 📝  Auto-save full dataset snapshot after each stage
            # --------------------------------------------------------------
            output_dir = getattr(self, "output_dir", None)
            if output_dir:
                from pathlib import Path
                import os
                import json

                # Ensure the directory exists
                self.storage.ensure_directory(output_dir)

                # File name pattern: full_dataset_after_<idx>_<stage>.json
                # snapshot_name = (
                #     f"full_dataset_after_{i+1}_{stage.name.replace(' ', '_').lower()}.json"
                # )
                snapshot_name = f"full_dataset.json"
                snapshot_path = os.path.join(output_dir, snapshot_name)

                # Persist using the JSON format for maximum portability
                current_data.save(snapshot_path, storage=self.storage)

                # Also save conversations separately as JSONL
                conversation_path = os.path.join(output_dir, "conversation.jsonl")
                conv_records: list[dict[str, Any]] = []
                for conv in current_data.conversations:
                    # Build base conversation dict
                    conv_dict = cast(
                        dict[str, Any],
                        {
                            "question_id": conv.question_id,
                            "prompt": conv.prompt,
                        },
                    )

                    # Handle side-by-side vs single model format
                    if isinstance(conv.model, list):
                        # Side-by-side format
                        conv_dict["model_a"] = str(conv.model[0]) if len(conv.model) > 0 else ""
                        conv_dict["model_b"] = str(conv.model[1]) if len(conv.model) > 1 else ""

                        # Type narrow responses to list
                        responses = conv.responses if isinstance(conv.responses, list) else [conv.responses]
                        conv_dict["model_a_response"] = str(responses[0]) if len(responses) > 0 else ""
                        conv_dict["model_b_response"] = str(responses[1]) if len(responses) > 1 else ""

                        # Convert scores list to score_a/score_b
                        if isinstance(conv.scores, list) and len(conv.scores) == 2:
                            score_a_val = cast(Dict[str, Any], conv.scores[0]) if isinstance(conv.scores[0], dict) else {}
                            score_b_val = cast(Dict[str, Any], conv.scores[1]) if isinstance(conv.scores[1], dict) else {}
                            conv_dict["score_a"] = score_a_val
                            conv_dict["score_b"] = score_b_val
                        else:
                            conv_dict["score_a"] = cast(dict[str, Any], {})
                            conv_dict["score_b"] = cast(dict[str, Any], {})

                        # Add meta fields (includes winner)
                        conv_dict.update(conv.meta)
                    else:
                        # Single model format
                        model_str = str(conv.model) if not isinstance(conv.model, list) else (str(conv.model[0]) if len(conv.model) > 0 else "")
                        response_str = str(conv.responses) if not isinstance(conv.responses, list) else (str(conv.responses[0]) if len(conv.responses) > 0 else "")
                        score_val = cast(Dict[str, Any], conv.scores) if isinstance(conv.scores, dict) else (cast(Dict[str, Any], conv.scores[0]) if isinstance(conv.scores, list) and len(conv.scores) > 0 and isinstance(conv.scores[0], dict) else {})

                        conv_dict["model"] = model_str
                        conv_dict["model_response"] = response_str
                        conv_dict["score"] = score_val

                        # Add meta fields
                        conv_dict.update(conv.meta)

                    # Make JSON-safe and add to records
                    conv_dict = current_data._json_safe(conv_dict)
                    conv_records.append(conv_dict)

                # Write all conversations at once
                self.storage.write_jsonl(conversation_path, conv_records)

                # Save properties separately as JSONL
                if current_data.properties:
                    properties_path = os.path.join(output_dir, "properties.jsonl")
                    prop_records = [current_data._json_safe(prop.to_dict()) for prop in current_data.properties]
                    self.storage.write_jsonl(properties_path, prop_records)

                # Save clusters separately as JSONL
                if current_data.clusters:
                    clusters_path = os.path.join(output_dir, "clusters.jsonl")
                    cluster_records = [current_data._json_safe(cluster.to_dict()) for cluster in current_data.clusters]
                    self.storage.write_jsonl(clusters_path, cluster_records)

                if getattr(self, "verbose", False):
                    print(f"   • Saved dataset snapshot: {snapshot_path}")
                    print(f"   • Saved conversations: {conversation_path}")
                    if current_data.properties:
                        print(f"   • Saved properties: {properties_path}")
                    if current_data.clusters:
                        print(f"   • Saved clusters: {clusters_path}")

            # except Exception as e:
            #     self.stage_errors[stage.name] = str(e)
            #     self.handle_error(e, f"stage {i+1} ({stage.name})")

        total_time = self.end_timer()
        self.log(f"Pipeline '{self.name}' completed in {total_time:.2f}s")

        # Print final summary
        final_models = set()
        for conv in current_data.conversations:
            if isinstance(conv.model, list):
                final_models.update(conv.model)
            else:
                final_models.add(conv.model)

        print(f"\n🎉 Pipeline '{self.name}' completed!")
        print(f"   • Total execution time: {total_time:.2f}s")
        print(f"   • Final conversations: {len(current_data.conversations)}")
        print(f"   • Final properties: {len(current_data.properties)}")
        print(f"   • Final models: {len(final_models)}")
        if current_data.clusters:
            print(f"   • Final clusters: {len(current_data.clusters)}")
        if current_data.model_stats:
            print(f"   • Models with final stats: {len(current_data.model_stats)}")
        print()

        return current_data

    def _log_stage_metrics(self, stage: PipelineStage, data: PropertyDataset) -> None:
        """Log metrics for a completed stage."""
        metrics = {
            'conversations': len(data.conversations),
            'properties': len(data.properties),
            'clusters': len(data.clusters),
            'models_in_stats': len(data.model_stats)
        }

        # Count unique models from conversations
        unique_models = set()
        for conv in data.conversations:
            if isinstance(conv.model, list):
                unique_models.update(conv.model)
            else:
                unique_models.add(conv.model)

        total_models = len(unique_models)

        # Add model count to metrics
        metrics['total_models'] = total_models

        self.log(f"Stage {stage.name} metrics: {metrics}")

        # Print specific model count information
        print(f"\n📊 Stage '{stage.name}' completed:")
        print(f"   • Total conversations: {len(data.conversations)}")
        print(f"   • Total properties: {len(data.properties)}")
        print(f"   • Total models: {total_models}")
        if data.clusters:
            print(f"   • Total clusters: {len(data.clusters)}")
        if data.model_stats:
            print(f"   • Models with stats: {len(data.model_stats)}")

        # Show model names if verbose
        if hasattr(self, 'verbose') and self.verbose and total_models <= 20:
            model_list = sorted(list(unique_models))
            print(f"   • Models: {', '.join(model_list)}")

        print()  # Add spacing

        # Log to wandb as summary metrics (not regular metrics)
        if hasattr(self, 'log_wandb'):
            wandb_data: dict[str, int | float] = {f"{stage.name}_{k}": v for k, v in metrics.items()}
            wandb_data[f"{stage.name}_execution_time"] = self.stage_times.get(stage.name, 0.0)
            self.log_wandb(wandb_data, is_summary=True)

    def log_final_summary(self) -> None:
        """Log all accumulated summary metrics to wandb."""
        if hasattr(self, 'log_summary_metrics'):
            # Add pipeline-level summary metrics
            pipeline_summary = {
                'pipeline_total_stages': len(self.stages),
                'pipeline_total_time': self.get_execution_time(),
                'pipeline_success': len(self.stage_errors) == 0,
                'pipeline_error_count': len(self.stage_errors)
            }
            self.log_wandb(pipeline_summary, is_summary=True)

            # Log all accumulated summary metrics
            self.log_summary_metrics()

            if hasattr(self, 'log'):
                self.log("Logged final summary metrics to wandb", level="debug")

    def get_stage_summary(self) -> Dict[str, Any]:
        """Get a summary of pipeline execution."""
        return {
            'total_stages': len(self.stages),
            'total_time': self.get_execution_time(),
            'stage_times': self.stage_times,
            'stage_errors': self.stage_errors,
            'success': len(self.stage_errors) == 0
        }

    def validate_pipeline(self) -> List[str]:
        """
        Validate that the pipeline is correctly configured.

        Returns:
            List of validation errors (empty if valid)
        """
        errors = []

        if not self.stages:
            errors.append("Pipeline has no stages")

        for i, stage in enumerate(self.stages):
            if not isinstance(stage, PipelineStage):
                errors.append(f"Stage {i} is not a PipelineStage instance")

        return errors

    def __repr__(self) -> str:
        stage_names = [stage.name for stage in self.stages]
        return f"Pipeline({self.name}, stages={stage_names})"

    def __len__(self) -> int:
        return len(self.stages)

    def __getitem__(self, index: int) -> PipelineStage:
        return self.stages[index]

    def __iter__(self) -> Any:
        return iter(self.stages)

run(data, progress_callback=None) async

Execute all stages in the pipeline.

Parameters:

Name Type Description Default
data PropertyDataset

Input PropertyDataset

required
progress_callback Any

Optional callback(float) -> None to report progress (0.0-1.0)

None

Returns:

Type Description
PropertyDataset

PropertyDataset after processing through all stages

Source code in stringsight/pipeline.py
async def run(self, data: PropertyDataset, progress_callback: Any = None) -> PropertyDataset:
    """
    Execute all stages in the pipeline.

    Args:
        data: Input PropertyDataset
        progress_callback: Optional callback(float) -> None to report progress (0.0-1.0)

    Returns:
        PropertyDataset after processing through all stages
    """
    self.log(f"Starting pipeline '{self.name}' with {len(self.stages)} stages")
    self.start_timer()

    # Count initial models
    initial_models = set()
    for conv in data.conversations:
        if isinstance(conv.model, list):
            initial_models.update(conv.model)
        else:
            initial_models.add(conv.model)

    print(f"\n🚀 Starting pipeline '{self.name}'")
    print(f"   • Input conversations: {len(data.conversations)}")
    print(f"   • Input models: {len(initial_models)}")
    if len(initial_models) <= 20:
        model_list = sorted(list(initial_models))
        print(f"   • Model names: {', '.join(model_list)}")
    print()

    current_data = data

    for i, stage in enumerate(self.stages):
        # Report progress at start of stage
        if progress_callback:
            # Progress is fraction of stages completed
            # We can also use i / len(self.stages) + something for intra-stage progress if we had it
            progress = i / len(self.stages)
            try:
                progress_callback(progress)
            except Exception as e:
                print(f"Warning: progress callback failed: {e}")

        stage_start_time = time.time()

        # try:
        self.log(f"Running stage {i+1}/{len(self.stages)}: {stage.name}")

        # Create a stage-specific progress callback
        stage_progress_callback = None
        if progress_callback:
            def make_callback(stage_idx, total_stages):
                def callback(progress_or_completed, total=None):
                    # Handle both callback(progress) and callback(completed, total) signatures
                    if total is not None and total > 0:
                        stage_progress = progress_or_completed / total
                    else:
                        stage_progress = progress_or_completed

                    # stage_progress is 0.0 to 1.0 within the stage
                    # overall progress = (stage_idx + stage_progress) / total_stages
                    # Ensure we don't exceed 1.0 or go backwards (though backwards is possible if stage resets)
                    if isinstance(stage_progress, (int, float)):
                        overall = (stage_idx + min(max(stage_progress, 0.0), 1.0)) / total_stages
                        try:
                            progress_callback(overall)
                        except Exception:
                            pass
                return callback

            stage_progress_callback = make_callback(i, len(self.stages))

        # Pass progress callback to stage
        # The stage.__call__ method we updated handles checking if the underlying run() accepts it
        current_data = await stage(current_data, progress_callback=stage_progress_callback)

        # Track timing
        stage_execution_time = time.time() - stage_start_time
        self.stage_times[stage.name] = stage_execution_time

        self.log(f"Stage {stage.name} completed in {stage_execution_time:.2f}s")

        # Log stage-specific metrics
        self._log_stage_metrics(stage, current_data)

        # --------------------------------------------------------------
        # 📝  Auto-save full dataset snapshot after each stage
        # --------------------------------------------------------------
        output_dir = getattr(self, "output_dir", None)
        if output_dir:
            from pathlib import Path
            import os
            import json

            # Ensure the directory exists
            self.storage.ensure_directory(output_dir)

            # File name pattern: full_dataset_after_<idx>_<stage>.json
            # snapshot_name = (
            #     f"full_dataset_after_{i+1}_{stage.name.replace(' ', '_').lower()}.json"
            # )
            snapshot_name = f"full_dataset.json"
            snapshot_path = os.path.join(output_dir, snapshot_name)

            # Persist using the JSON format for maximum portability
            current_data.save(snapshot_path, storage=self.storage)

            # Also save conversations separately as JSONL
            conversation_path = os.path.join(output_dir, "conversation.jsonl")
            conv_records: list[dict[str, Any]] = []
            for conv in current_data.conversations:
                # Build base conversation dict
                conv_dict = cast(
                    dict[str, Any],
                    {
                        "question_id": conv.question_id,
                        "prompt": conv.prompt,
                    },
                )

                # Handle side-by-side vs single model format
                if isinstance(conv.model, list):
                    # Side-by-side format
                    conv_dict["model_a"] = str(conv.model[0]) if len(conv.model) > 0 else ""
                    conv_dict["model_b"] = str(conv.model[1]) if len(conv.model) > 1 else ""

                    # Type narrow responses to list
                    responses = conv.responses if isinstance(conv.responses, list) else [conv.responses]
                    conv_dict["model_a_response"] = str(responses[0]) if len(responses) > 0 else ""
                    conv_dict["model_b_response"] = str(responses[1]) if len(responses) > 1 else ""

                    # Convert scores list to score_a/score_b
                    if isinstance(conv.scores, list) and len(conv.scores) == 2:
                        score_a_val = cast(Dict[str, Any], conv.scores[0]) if isinstance(conv.scores[0], dict) else {}
                        score_b_val = cast(Dict[str, Any], conv.scores[1]) if isinstance(conv.scores[1], dict) else {}
                        conv_dict["score_a"] = score_a_val
                        conv_dict["score_b"] = score_b_val
                    else:
                        conv_dict["score_a"] = cast(dict[str, Any], {})
                        conv_dict["score_b"] = cast(dict[str, Any], {})

                    # Add meta fields (includes winner)
                    conv_dict.update(conv.meta)
                else:
                    # Single model format
                    model_str = str(conv.model) if not isinstance(conv.model, list) else (str(conv.model[0]) if len(conv.model) > 0 else "")
                    response_str = str(conv.responses) if not isinstance(conv.responses, list) else (str(conv.responses[0]) if len(conv.responses) > 0 else "")
                    score_val = cast(Dict[str, Any], conv.scores) if isinstance(conv.scores, dict) else (cast(Dict[str, Any], conv.scores[0]) if isinstance(conv.scores, list) and len(conv.scores) > 0 and isinstance(conv.scores[0], dict) else {})

                    conv_dict["model"] = model_str
                    conv_dict["model_response"] = response_str
                    conv_dict["score"] = score_val

                    # Add meta fields
                    conv_dict.update(conv.meta)

                # Make JSON-safe and add to records
                conv_dict = current_data._json_safe(conv_dict)
                conv_records.append(conv_dict)

            # Write all conversations at once
            self.storage.write_jsonl(conversation_path, conv_records)

            # Save properties separately as JSONL
            if current_data.properties:
                properties_path = os.path.join(output_dir, "properties.jsonl")
                prop_records = [current_data._json_safe(prop.to_dict()) for prop in current_data.properties]
                self.storage.write_jsonl(properties_path, prop_records)

            # Save clusters separately as JSONL
            if current_data.clusters:
                clusters_path = os.path.join(output_dir, "clusters.jsonl")
                cluster_records = [current_data._json_safe(cluster.to_dict()) for cluster in current_data.clusters]
                self.storage.write_jsonl(clusters_path, cluster_records)

            if getattr(self, "verbose", False):
                print(f"   • Saved dataset snapshot: {snapshot_path}")
                print(f"   • Saved conversations: {conversation_path}")
                if current_data.properties:
                    print(f"   • Saved properties: {properties_path}")
                if current_data.clusters:
                    print(f"   • Saved clusters: {clusters_path}")

        # except Exception as e:
        #     self.stage_errors[stage.name] = str(e)
        #     self.handle_error(e, f"stage {i+1} ({stage.name})")

    total_time = self.end_timer()
    self.log(f"Pipeline '{self.name}' completed in {total_time:.2f}s")

    # Print final summary
    final_models = set()
    for conv in current_data.conversations:
        if isinstance(conv.model, list):
            final_models.update(conv.model)
        else:
            final_models.add(conv.model)

    print(f"\n🎉 Pipeline '{self.name}' completed!")
    print(f"   • Total execution time: {total_time:.2f}s")
    print(f"   • Final conversations: {len(current_data.conversations)}")
    print(f"   • Final properties: {len(current_data.properties)}")
    print(f"   • Final models: {len(final_models)}")
    if current_data.clusters:
        print(f"   • Final clusters: {len(current_data.clusters)}")
    if current_data.model_stats:
        print(f"   • Models with final stats: {len(current_data.model_stats)}")
    print()

    return current_data

add_stage(stage)

Add a stage to the end of the pipeline.

Source code in stringsight/pipeline.py
def add_stage(self, stage: PipelineStage) -> None:
    """Add a stage to the end of the pipeline."""
    self.stages.append(stage)

    # Mark the new stage as using the same wandb run if wandb is enabled
    if hasattr(self, 'use_wandb') and self.use_wandb and hasattr(stage, 'use_wandb') and stage.use_wandb and hasattr(stage, '_wandb_ok'):
        stage._wandb_ok = True  # Mark that wandb is available

Extractors

get_extractor()

get_extractor(model_name='gpt-4.1-mini', system_prompt='one_sided_system_prompt', prompt_builder=None, temperature=0.6, top_p=0.95, max_tokens=16000, max_workers=DEFAULT_MAX_WORKERS, include_scores_in_prompt=False, **kwargs)

Factory function to get the appropriate extractor based on model name.

Parameters:

Name Type Description Default
model_name str

Name of the LLM to use for extraction

'gpt-4.1-mini'
system_prompt str

System prompt for property extraction

'one_sided_system_prompt'
prompt_builder Optional[Callable]

Optional custom prompt builder function

None
temperature float

Temperature for LLM

0.6
top_p float

Top-p for LLM

0.95
max_tokens int

Max tokens for LLM

16000
max_workers int

Max parallel workers for API calls

DEFAULT_MAX_WORKERS
**kwargs

Additional configuration

{}

Returns:

Type Description
PipelineStage

Configured extractor stage

Source code in stringsight/extractors/__init__.py
def get_extractor(
    model_name: str = "gpt-4.1-mini",
    system_prompt: str = "one_sided_system_prompt",
    prompt_builder: Optional[Callable] = None,
    temperature: float = 0.6,
    top_p: float = 0.95,
    max_tokens: int = 16000,
    max_workers: int = DEFAULT_MAX_WORKERS,
    include_scores_in_prompt: bool = False,
    **kwargs
) -> PipelineStage:
    """
    Factory function to get the appropriate extractor based on model name.

    Args:
        model_name: Name of the LLM to use for extraction
        system_prompt: System prompt for property extraction
        prompt_builder: Optional custom prompt builder function
        temperature: Temperature for LLM
        top_p: Top-p for LLM  
        max_tokens: Max tokens for LLM
        max_workers: Max parallel workers for API calls
        **kwargs: Additional configuration

    Returns:
        Configured extractor stage
    """

    # Route common hosted providers through the LiteLLM-backed extractor.
    lower_name = model_name.lower().strip()
    litellm_prefixes = (
        "gpt",           # OpenAI (bare)
        "openai/",      # OpenAI (provider-prefixed)
        "claude",       # Anthropic (bare)
        "anthropic/",   # Anthropic (provider-prefixed)
        "gemini",       # Google Gemini (bare)
        "google/",      # Google (provider-prefixed)
        "vertex",       # Vertex AI (provider-prefixed is usually vertex/..., allow bare prefix)
        "azure/",       # Azure OpenAI
        "cohere/",      # Cohere
        "mistral/",     # Mistral hosted
        "bedrock/",     # AWS Bedrock
    )

    # All models go through OpenAIExtractor which uses LiteLLM
    # LiteLLM handles routing to OpenAI, Anthropic, vLLM, etc.
    from .openai import OpenAIExtractor
    return OpenAIExtractor(
        model=model_name,
        system_prompt=system_prompt,
        prompt_builder=prompt_builder,
        temperature=temperature,
        top_p=top_p,
        max_tokens=max_tokens,
        max_workers=max_workers,
        include_scores_in_prompt=include_scores_in_prompt,
        **kwargs
    )

OpenAIExtractor

OpenAIExtractor

Bases: LoggingMixin, TimingMixin, ErrorHandlingMixin, WandbMixin, PipelineStage

Extract behavioral properties using OpenAI models.

This stage takes conversations and extracts structured properties describing model behaviors, differences, and characteristics.

Source code in stringsight/extractors/openai.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
class OpenAIExtractor(LoggingMixin, TimingMixin, ErrorHandlingMixin, WandbMixin, PipelineStage):
    """
    Extract behavioral properties using OpenAI models.

    This stage takes conversations and extracts structured properties describing
    model behaviors, differences, and characteristics.
    """

    def __init__(
        self,
        model: str = "gpt-4.1",
        system_prompt: str = "one_sided_system_prompt_no_examples",
        prompt_builder: Optional[Callable] = None,
        temperature: float = 0.7,
        top_p: float = 0.95,
        max_tokens: int = 16000,
        max_workers: int = DEFAULT_MAX_WORKERS,
        include_scores_in_prompt: bool = False,
        **kwargs
    ):
        """
        Initialize the OpenAI extractor.

        Args:
            model: OpenAI model name (e.g., "gpt-4.1-mini")
            system_prompt: System prompt for property extraction
            prompt_builder: Optional custom prompt builder function
            temperature: Temperature for LLM
            top_p: Top-p for LLM
            max_tokens: Max tokens for LLM
            max_workers: Max parallel workers for API calls
            include_scores_in_prompt: Whether to include scores in prompts
            **kwargs: Additional configuration

        Note:
            Caching is handled automatically by UnifiedCache singleton.
            Configure cache via STRINGSIGHT_* environment variables.
        """
        super().__init__(**kwargs)
        self.model = model
        # Allow caller to pass the name of a prompt template or the prompt text itself
        if isinstance(system_prompt, str) and hasattr(_extractor_prompts, system_prompt):
            self.system_prompt = getattr(_extractor_prompts, system_prompt)
        else:
            self.system_prompt = system_prompt

        self.prompt_builder = prompt_builder or self._default_prompt_builder
        self.temperature = temperature
        self.top_p = top_p
        self.max_tokens = max_tokens
        self.max_workers = max_workers
        # Control whether to include numeric scores/winner context in prompts
        self.include_scores_in_prompt = include_scores_in_prompt
        # Note: Caching is handled by parallel_completions via UnifiedCache singleton

    async def run(self, data: PropertyDataset, progress_callback: Any = None, **kwargs: Any) -> PropertyDataset:
        """Run OpenAI extraction for all conversations.

        Each conversation is formatted with ``prompt_builder`` and sent to the
        OpenAI model in parallel using async.  The raw LLM response is
        stored inside a *placeholder* ``Property`` object (one per
        conversation).  Down-stream stages (``LLMJsonParser``) will parse these
        raw strings into fully-formed properties.

        Args:
            data: PropertyDataset with conversations to extract from
            progress_callback: Optional callback(completed, total) for progress updates
        """

        n_conv = len(data.conversations)
        if n_conv == 0:
            self.log("No conversations found – skipping extraction")
            return data

        self.log(f"Extracting properties from {n_conv} conversations using {self.model}")


        # ------------------------------------------------------------------
        # 1️⃣  Build user messages for every conversation (in parallel)
        # ------------------------------------------------------------------
        user_messages: List[Union[str, List[Dict[str, Any]]]] = [""] * len(data.conversations)

        def _build_prompt(idx: int, conv):
            return idx, self.prompt_builder(conv)

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {executor.submit(_build_prompt, idx, conv): idx
                      for idx, conv in enumerate(data.conversations)}
            for future in as_completed(futures):
                idx, prompt = future.result()
                user_messages[idx] = prompt

        # ------------------------------------------------------------------
        # 2️⃣  Call the OpenAI API in parallel batches via shared async LLM utils
        # ------------------------------------------------------------------
        raw_responses = await parallel_completions_async(
            user_messages,
            model=self.model,
            system_prompt=self.system_prompt,
            max_workers=self.max_workers,
            temperature=self.temperature,
            top_p=self.top_p,
            max_tokens=self.max_tokens,
            show_progress=True,
            progress_desc="Property extraction",
            progress_callback=progress_callback
        )

        # ------------------------------------------------------------------
        # 3️⃣  Wrap raw responses in placeholder Property objects (filter None)
        # ------------------------------------------------------------------
        properties: List[Property] = []
        skipped_count = 0
        for conv, raw in zip(data.conversations, raw_responses):
            # Skip failed LLM calls (None responses)
            if raw is None:
                skipped_count += 1
                continue

            # We don't yet know which model(s) the individual properties will
            # belong to; the parser will figure it out from the model label in
            # each extracted property JSON.
            #
            # Important for side-by-side: preserve the model pair on the
            # placeholder property so `LLMJsonParser` can map "Model A"/"Model B"
            # (or equivalent) onto the correct concrete model name.
            model_name = conv.model
            prop = Property(
                id=str(uuid.uuid4()),
                question_id=conv.question_id,
                model=model_name,
                raw_response=raw,
            )
            properties.append(prop)

        if skipped_count > 0:
            self.log(f"Skipped {skipped_count} conversations due to failed LLM calls", level="warning")

        self.log(f"Received {len(properties)} valid LLM responses")


        # Log to wandb if enabled
        if hasattr(self, 'use_wandb') and self.use_wandb:
            self._log_extraction_to_wandb(user_messages, raw_responses, data.conversations)

        # ------------------------------------------------------------------
        # 4️⃣  Return updated dataset
        # ------------------------------------------------------------------
        return PropertyDataset(
            conversations=data.conversations,
            all_models=data.all_models,
            properties=properties,
            clusters=data.clusters,
            model_stats=data.model_stats,
        )

    # ----------------------------------------------------------------------
    # Helper methods
    # ----------------------------------------------------------------------

    # Legacy helpers removed in favor of centralized llm_utils

    def _default_prompt_builder(self, conversation) -> Union[str, List[Dict[str, Any]]]:
        """
        Default prompt builder for side-by-side comparisons, with multimodal support.

        Args:
            conversation: ConversationRecord

        Returns:
            - If no images present: a plain string prompt (backwards compatible)
            - If images present: a full OpenAI messages list including a single
              user turn with ordered text/image parts (and a system turn)
        """
        # Check if this is a side-by-side comparison or single model
        if isinstance(conversation.model, list) and len(conversation.model) == 2:
            # Side-by-side format
            model_a, model_b = conversation.model
            try:
                responses_a = conversation.responses[0]
                responses_b = conversation.responses[1]
            except Exception as e:
                raise ValueError(
                    f"Failed to access conversation responses for side-by-side format. "
                    f"Expected two response lists. Error: {str(e)}"
                )

            # Normalize both to our internal segments format
            conv_a = openai_messages_to_conv(responses_a) if isinstance(responses_a, list) else responses_a
            conv_b = openai_messages_to_conv(responses_b) if isinstance(responses_b, list) else responses_b

            has_images = self._conversation_has_images(conv_a) or self._conversation_has_images(conv_b)

            if has_images:
                return self._build_side_by_side_messages(model_a, model_b, conv_a, conv_b)

            # No images: keep string behavior for compatibility
            response_a = conv_to_str(responses_a)
            response_b = conv_to_str(responses_b)

            scores = conversation.scores

            # Handle list format [scores_a, scores_b]
            if isinstance(scores, list) and len(scores) == 2:
                scores_a, scores_b = scores[0], scores[1]
                winner = conversation.meta.get("winner")  # Winner stored in meta

                # Build the prompt with separate scores for each model
                prompt_parts = [
                    f"<beginning of Model A trace>\n {response_a}\n<end of Model A trace>\n\n--------------------------------\n\n"
                ]

                if self.include_scores_in_prompt and scores_a:
                    prompt_parts.append(f"<Quality scores on Model A trace>\n {scores_a}\n</Quality scores on Model A trace>\n\n")
                prompt_parts.append("--------------------------------")
                prompt_parts.append(f"<beginning of Model B trace>\n {response_b}\n<end of Model B trace>\n\n--------------------------------\n\n")

                if self.include_scores_in_prompt and scores_b:
                    prompt_parts.append(f"<Quality scores on Model B trace>\n {scores_b}\n</Quality scores on Model B trace>\n\n")

                if self.include_scores_in_prompt and winner:
                    prompt_parts.append(f"<Winner of side-by-side comparison>\n {winner}\n</Winner of side-by-side comparison>\n\n")

                return "\n\n".join(prompt_parts)
            else:
                # No scores available
                return (
                    f"<beginning of Model A trace>\n {response_a}\n<end of Model A trace>\n\n--------------------------------\n\n"
                    f"--------------------------------\n"
                    f"<beginning of Model B trace>\n {response_b}\n<end of Model B trace>\n\n--------------------------------\n\n"
                )
        elif isinstance(conversation.model, str):
            # Single model format
            model = conversation.model if isinstance(conversation.model, str) else str(conversation.model)
            responses = conversation.responses

            # Normalize to our internal segments format only to detect images
            conv_norm = openai_messages_to_conv(responses) if isinstance(responses, list) else responses
            if self._conversation_has_images(conv_norm):
                return self._build_single_user_messages(conv_norm)

            # No images: keep string behavior
            try:
                response = conv_to_str(responses)
            except Exception as e:
                raise ValueError(
                    f"Failed to convert conversation response to string format. "
                    f"Expected OpenAI conversation format (list of message dicts with 'role' and 'content' fields). "
                    f"Got: {type(responses)}. "
                    f"Error: {str(e)}"
                )
            scores = conversation.scores

            if not scores or not self.include_scores_in_prompt:
                return response
            return (
                f"{response}\n\n"
                f"<Quality scores on the trace>\n {scores}\n</Quality scores on the trace>\n\n"
            )
        else:
            raise ValueError(f"Invalid conversation format: {conversation}")

    def _conversation_has_images(self, conv_msgs: List[Dict[str, Any]]) -> bool:
        """Return True if any message contains an image segment in ordered segments format."""
        for msg in conv_msgs:
            content = msg.get("content", {})
            segs = content.get("segments") if isinstance(content, dict) else None
            if isinstance(segs, list):
                for seg in segs:
                    if isinstance(seg, dict) and seg.get("kind") == "image":
                        return True
        return False

    def _collapse_segments_to_openai_content(self, conv_msgs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Collapse ordered segments into an OpenAI multimodal content list with aggregated text.

        Algorithm:
        - Walk through messages in order, accumulating non-image content in a buffer
        - When an image is encountered, flush the buffer (convert to string via conv_to_str),
          then add the image as a separate item
        - Continue until all messages processed, then flush any remaining buffer

        This ensures consecutive non-image turns are aggregated into single text items,
        with images interspersed at their proper positions.

        Produces items like:
          - {"type": "text", "text": str}  (potentially aggregated from multiple turns)
          - {"type": "image_url", "image_url": {"url": str}}
        """
        content: List[Dict[str, Any]] = []
        message_buffer: List[Dict[str, Any]] = []  # Buffer for messages without images

        def flush_buffer():
            """Convert buffered messages to a single text string using conv_to_str."""
            if message_buffer:
                text_str = conv_to_str(message_buffer)
                if text_str and text_str.strip():
                    content.append({"type": "text", "text": text_str})
                message_buffer.clear()

        for msg in conv_msgs:
            # Extract segments from this message
            msg_content = msg.get("content", {})
            segs = msg_content.get("segments", []) if isinstance(msg_content, dict) else []

            # Check if this message contains any images
            images_in_msg: List[str] = []
            non_image_segments: List[Dict[str, Any]] = []

            for seg in segs:
                if not isinstance(seg, dict):
                    non_image_segments.append(seg)
                    continue

                kind = seg.get("kind")
                if kind == "image":
                    # Extract image URL
                    img = seg.get("image")
                    url: Optional[str] = None
                    if isinstance(img, str):
                        url = img
                    elif isinstance(img, dict):
                        if isinstance(img.get("url"), str):
                            url = img.get("url")
                        elif isinstance(img.get("image_url"), dict) and isinstance(img["image_url"].get("url"), str):
                            url = img["image_url"].get("url")
                        elif isinstance(img.get("source"), str):
                            url = img.get("source")
                    if url:
                        images_in_msg.append(url)
                else:
                    # Keep non-image segments (text, tool, etc.)
                    non_image_segments.append(seg)

            # Build a message dict with only non-image content for the buffer
            if non_image_segments:
                msg_for_buffer = dict(msg)  # Copy message structure
                msg_for_buffer["content"] = {
                    "segments": non_image_segments
                }
                message_buffer.append(msg_for_buffer)

            # If we encountered images, flush buffer then add images
            if images_in_msg:
                flush_buffer()
                for img_url in images_in_msg:
                    content.append({"type": "image_url", "image_url": {"url": img_url}})

        # Flush any remaining buffered messages at the end
        flush_buffer()

        return content

    def _build_single_user_messages(self, conv_msgs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Build a full messages list with system + single multimodal user turn."""
        content = self._collapse_segments_to_openai_content(conv_msgs)
        messages: List[Dict[str, Any]] = []
        if self.system_prompt:
            messages.append({"role": "system", "content": self.system_prompt})
        messages.append({"role": "user", "content": f"<beginning of Model trace>\n {content}\n<end of Model trace>\n\n"})
        return messages

    def _build_side_by_side_messages(
        self,
        model_a: str,
        model_b: str,
        conv_a: List[Dict[str, Any]],
        conv_b: List[Dict[str, Any]],
    ) -> List[Dict[str, Any]]:
        """Build a full messages list with system + single user turn containing A/B sections."""
        content: List[Dict[str, Any]] = []
        content += (
            [{"type": "text", "text": "<beginning of Model A trace>"}]
            + self._collapse_segments_to_openai_content(conv_a)
            + [{"type": "text", "text": "<end of Model A trace>\n\n--------------------------------\n\n<beginning of Model B trace>"}]
            + self._collapse_segments_to_openai_content(conv_b)
            + [{"type": "text", "text": "<end of Model B trace>"}]
        )

        messages: List[Dict[str, Any]] = []
        if self.system_prompt:
            messages.append({"role": "system", "content": self.system_prompt})
        messages.append({"role": "user", "content": content})
        return messages

    def _log_extraction_to_wandb(self, user_messages: List[Union[str, List[Dict[str, Any]]]], raw_responses: List[str | None], conversations):
        """Log extraction inputs/outputs to wandb."""
        try:
            import wandb
            # import weave

            # Create a table of inputs and outputs
            extraction_data = []
            for i, (msg, response, conv) in enumerate(zip(user_messages, raw_responses, conversations)):
                # Handle None responses (failed LLM calls)
                if response is None:
                    extraction_data.append({
                        "question_id": conv.question_id,
                        "system_prompt": self.system_prompt,
                        "input_message": msg,
                        "raw_response": "FAILED: None",
                        "response_length": 0,
                        "has_error": True,
                    })
                else:
                    extraction_data.append({
                        "question_id": conv.question_id,
                        "system_prompt": self.system_prompt,
                        "input_message": msg,
                        "raw_response": response,
                        "response_length": len(response),
                        "has_error": False,
                    })

            # Log extraction table (as table, not summary)
            self.log_wandb({
                "Property_Extraction/extraction_inputs_outputs": wandb.Table(
                    columns=["question_id", "system_prompt", "input_message", "raw_response", "response_length", "has_error"],
                    data=[[row[col] for col in ["question_id", "system_prompt", "input_message", "raw_response", "response_length", "has_error"]]
                          for row in extraction_data]
                )
            })

            # Log extraction metrics as summary metrics (not regular metrics)
            error_count = sum(1 for r in raw_responses if r is None)
            valid_responses = [r for r in raw_responses if r is not None]
            extraction_metrics = {
                "extraction_total_requests": len(raw_responses),
                "extraction_error_count": error_count,
                "extraction_success_rate": (len(raw_responses) - error_count) / len(raw_responses) if raw_responses else 0,
                "extraction_avg_response_length": sum(len(r) for r in valid_responses) / len(valid_responses) if valid_responses else 0,
            }
            self.log_wandb(extraction_metrics, is_summary=True)

        except Exception as e:
            self.log(f"Failed to log extraction to wandb: {e}", level="warning")        

run(data, progress_callback=None, **kwargs) async

Run OpenAI extraction for all conversations.

Each conversation is formatted with prompt_builder and sent to the OpenAI model in parallel using async. The raw LLM response is stored inside a placeholder Property object (one per conversation). Down-stream stages (LLMJsonParser) will parse these raw strings into fully-formed properties.

Parameters:

Name Type Description Default
data PropertyDataset

PropertyDataset with conversations to extract from

required
progress_callback Any

Optional callback(completed, total) for progress updates

None
Source code in stringsight/extractors/openai.py
async def run(self, data: PropertyDataset, progress_callback: Any = None, **kwargs: Any) -> PropertyDataset:
    """Run OpenAI extraction for all conversations.

    Each conversation is formatted with ``prompt_builder`` and sent to the
    OpenAI model in parallel using async.  The raw LLM response is
    stored inside a *placeholder* ``Property`` object (one per
    conversation).  Down-stream stages (``LLMJsonParser``) will parse these
    raw strings into fully-formed properties.

    Args:
        data: PropertyDataset with conversations to extract from
        progress_callback: Optional callback(completed, total) for progress updates
    """

    n_conv = len(data.conversations)
    if n_conv == 0:
        self.log("No conversations found – skipping extraction")
        return data

    self.log(f"Extracting properties from {n_conv} conversations using {self.model}")


    # ------------------------------------------------------------------
    # 1️⃣  Build user messages for every conversation (in parallel)
    # ------------------------------------------------------------------
    user_messages: List[Union[str, List[Dict[str, Any]]]] = [""] * len(data.conversations)

    def _build_prompt(idx: int, conv):
        return idx, self.prompt_builder(conv)

    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        futures = {executor.submit(_build_prompt, idx, conv): idx
                  for idx, conv in enumerate(data.conversations)}
        for future in as_completed(futures):
            idx, prompt = future.result()
            user_messages[idx] = prompt

    # ------------------------------------------------------------------
    # 2️⃣  Call the OpenAI API in parallel batches via shared async LLM utils
    # ------------------------------------------------------------------
    raw_responses = await parallel_completions_async(
        user_messages,
        model=self.model,
        system_prompt=self.system_prompt,
        max_workers=self.max_workers,
        temperature=self.temperature,
        top_p=self.top_p,
        max_tokens=self.max_tokens,
        show_progress=True,
        progress_desc="Property extraction",
        progress_callback=progress_callback
    )

    # ------------------------------------------------------------------
    # 3️⃣  Wrap raw responses in placeholder Property objects (filter None)
    # ------------------------------------------------------------------
    properties: List[Property] = []
    skipped_count = 0
    for conv, raw in zip(data.conversations, raw_responses):
        # Skip failed LLM calls (None responses)
        if raw is None:
            skipped_count += 1
            continue

        # We don't yet know which model(s) the individual properties will
        # belong to; the parser will figure it out from the model label in
        # each extracted property JSON.
        #
        # Important for side-by-side: preserve the model pair on the
        # placeholder property so `LLMJsonParser` can map "Model A"/"Model B"
        # (or equivalent) onto the correct concrete model name.
        model_name = conv.model
        prop = Property(
            id=str(uuid.uuid4()),
            question_id=conv.question_id,
            model=model_name,
            raw_response=raw,
        )
        properties.append(prop)

    if skipped_count > 0:
        self.log(f"Skipped {skipped_count} conversations due to failed LLM calls", level="warning")

    self.log(f"Received {len(properties)} valid LLM responses")


    # Log to wandb if enabled
    if hasattr(self, 'use_wandb') and self.use_wandb:
        self._log_extraction_to_wandb(user_messages, raw_responses, data.conversations)

    # ------------------------------------------------------------------
    # 4️⃣  Return updated dataset
    # ------------------------------------------------------------------
    return PropertyDataset(
        conversations=data.conversations,
        all_models=data.all_models,
        properties=properties,
        clusters=data.clusters,
        model_stats=data.model_stats,
    )

Clusterers

get_clusterer()

get_clusterer(method='hdbscan', min_cluster_size=None, embedding_model='sentence-transformers/all-MiniLM-L6-v2', assign_outliers=False, include_embeddings=False, cluster_positive=True, **kwargs)

Factory function to get the appropriate clusterer.

Parameters:

Name Type Description Default
method str

Clustering method ("hdbscan", "dummy")

'hdbscan'
min_cluster_size int | None

Minimum cluster size

None
embedding_model str

Embedding model to use

'sentence-transformers/all-MiniLM-L6-v2'
assign_outliers bool

Whether to assign outliers to nearest clusters

False
include_embeddings bool

Whether to include embeddings in output

False
use_gpu

Enable GPU acceleration for embeddings and HDBSCAN. None (default) = auto-detect based on CUDA availability.

required
cluster_positive bool

If False and groupby_column is "behavior_type", skip clustering positive behaviors. Defaults to True.

True
**kwargs

Additional configuration

{}

Returns:

Type Description
PipelineStage

Configured clusterer stage

Source code in stringsight/clusterers/__init__.py
def get_clusterer(
    method: str = "hdbscan",
    min_cluster_size: int | None = None,
    embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2",
    assign_outliers: bool = False,
    include_embeddings: bool = False,
    cluster_positive: bool = True,
    **kwargs
) -> PipelineStage:
    """
    Factory function to get the appropriate clusterer.

    Args:
        method: Clustering method ("hdbscan", "dummy")
        min_cluster_size: Minimum cluster size
        embedding_model: Embedding model to use
        assign_outliers: Whether to assign outliers to nearest clusters
        include_embeddings: Whether to include embeddings in output
        use_gpu: Enable GPU acceleration for embeddings and HDBSCAN.
                None (default) = auto-detect based on CUDA availability.
        cluster_positive: If False and groupby_column is "behavior_type", skip clustering positive behaviors.
                         Defaults to True.
        **kwargs: Additional configuration

    Returns:
        Configured clusterer stage
    """

    if method == "hdbscan":
        from .hdbscan import HDBSCANClusterer
        return HDBSCANClusterer(
            min_cluster_size=min_cluster_size,
            embedding_model=embedding_model,
            assign_outliers=assign_outliers,
            include_embeddings=include_embeddings,
            cluster_positive=cluster_positive,
            **kwargs
        )
    # 'hdbscan_stratified' alias has been removed; users should pass
    # `method="hdbscan"` and supply `groupby_column` if stratification is
    # desired.
    elif method == "dummy":
        from .dummy_clusterer import DummyClusterer
        return DummyClusterer(**kwargs)
    else:
        raise ValueError(f"Unknown clustering method: {method}")

HDBSCANClusterer

HDBSCANClusterer

Bases: BaseClusterer

HDBSCAN clustering stage.

This stage migrates the hdbscan_cluster_categories function from clustering/hierarchical_clustering.py into the pipeline architecture.

Source code in stringsight/clusterers/hdbscan.py
class HDBSCANClusterer(BaseClusterer):
    """
    HDBSCAN clustering stage.

    This stage migrates the hdbscan_cluster_categories function from
    clustering/hierarchical_clustering.py into the pipeline architecture.
    """

    def __init__(
        self,
        min_cluster_size: int | None = None,
        embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2",
        include_embeddings: bool = False,
        use_wandb: bool = False,
        wandb_project: str | None = None,
        output_dir: str | None = None,
        # Additional explicit configuration parameters\
        min_samples: int | None = None,
        cluster_selection_epsilon: float = 0.0,
        disable_dim_reduction: bool = False,
        dim_reduction_method: str = "adaptive",
        context: str | None = None,
        groupby_column: str | None = None,
        parallel_clustering: bool = True,
        cluster_positive: bool = True,
        precomputed_embeddings: Any | None = None,
        cache_embeddings: bool = True,
        input_model_name: str | None = None,
        summary_model: str = "gpt-4.1",
        cluster_assignment_model: str = "gpt-4.1-mini",
        verbose: bool = True,
        llm_max_workers: int = DEFAULT_MAX_WORKERS,
        **kwargs,
    ):
        """Initialize the HDBSCAN clusterer with explicit, overridable parameters.

        """
        super().__init__(
            output_dir=output_dir,
            include_embeddings=include_embeddings,
            use_wandb=use_wandb,
            wandb_project=wandb_project,
            **kwargs,
        )

        # Build a unified ClusterConfig (no hardcoded values)
        self.config = ClusterConfig(
            # core
            min_cluster_size=min_cluster_size,
            verbose=verbose,
            include_embeddings=include_embeddings,
            context=context,
            precomputed_embeddings=precomputed_embeddings,
            disable_dim_reduction=disable_dim_reduction,
            input_model_name=input_model_name,
            min_samples=min_samples,
            cluster_selection_epsilon=cluster_selection_epsilon,
            cache_embeddings=cache_embeddings,
            # models
            embedding_model=embedding_model,
            summary_model=summary_model,
            cluster_assignment_model=cluster_assignment_model,
            llm_max_workers=llm_max_workers,
            # dim reduction
            dim_reduction_method=dim_reduction_method,
            # groupby
            groupby_column=groupby_column,
            parallel_clustering=parallel_clustering,
            cluster_positive=cluster_positive,
            # wandb
            use_wandb=use_wandb,
            wandb_project=wandb_project,
        )


    async def cluster(self, data: PropertyDataset, column_name: str, progress_callback=None) -> pd.DataFrame:
        """Cluster the dataset.

        If ``self.config.groupby_column`` is provided and present in the data, the
        input DataFrame is first partitioned by that column and each partition is
        clustered independently (stratified clustering). Results are then
        concatenated back together. Otherwise, the entire dataset is clustered
        at once.
        """

        df = data.to_dataframe(type="properties")

        # `to_dataframe(type="properties")` may include conversation rows without any extracted
        # properties (i.e., missing/NaN `column_name`). Those rows cannot be clustered and can
        # also coerce cluster id dtypes to float downstream, which breaks group metadata mapping.
        if column_name in df.columns:
            df = df[df[column_name].notna()].copy()

        if getattr(self, "verbose", False):
            logger.debug(f"DataFrame shape after to_dataframe: {df.shape}")
            logger.debug(f"DataFrame columns: {list(df.columns)}")
            logger.debug(f"DataFrame head:")
            logger.debug(df.head())

        if column_name in df.columns:
            if getattr(self, "verbose", False):
                logger.debug(f"{column_name} unique values: {df[column_name].nunique()}")
                logger.debug(f"{column_name} value counts:")
                logger.debug(df[column_name].value_counts())
                logger.debug(f"Sample {column_name} values: {df[column_name].head().tolist()}")
        else:
            logger.error(f"Column '{column_name}' not found in DataFrame!")

        group_col = getattr(self.config, "groupby_column", None)
        cluster_positive = getattr(self.config, "cluster_positive", True)

        # Filter out positive behaviors if cluster_positive is False and groupby_column is behavior_type
        positive_mask = None
        positive_df = None
        if group_col == "behavior_type" and not cluster_positive and "behavior_type" in df.columns:
            positive_mask = df["behavior_type"] == "Positive"
            positive_df = df[positive_mask].copy()
            df = df[~positive_mask].copy()
            if len(positive_df) > 0:
                self.log(f"Filtering out {len(positive_df)} positive behaviors from clustering (cluster_positive=False)")
            if len(df) == 0:
                self.log("All behaviors are positive and cluster_positive=False - skipping clustering")

        # Handle case where all behaviors were filtered out
        if len(df) == 0:
            # If we have positive behaviors, return them with special cluster assignment
            if positive_df is not None and len(positive_df) > 0:
                id_col = f"{column_name}_cluster_id"
                label_col = f"{column_name}_cluster_label"
                positive_df[id_col] = -2
                positive_df[label_col] = "Positive (not clustered)"
                if "meta" not in positive_df.columns:
                    positive_df["meta"] = [{} for _ in range(len(positive_df))]
                return positive_df
            # Otherwise return empty DataFrame with required columns
            id_col = f"{column_name}_cluster_id"
            label_col = f"{column_name}_cluster_label"
            df_empty = pd.DataFrame(columns=[column_name, id_col, label_col, "question_id", "id", "meta"])
            return df_empty

        if group_col is not None and group_col in df.columns:
            parallel_clustering = getattr(self.config, "parallel_clustering", False)

            if parallel_clustering:
                # Parallelize clustering per group for better performance (using async)
                groups = list(df.groupby(group_col))

                async def _cluster_group_async(group_info):
                    group, group_df = group_info
                    if getattr(self, "verbose", False):
                        logger.info(f"--------------------------------\nClustering group {group}\n--------------------------------")
                    part = await hdbscan_cluster_categories(
                        group_df,
                        column_name=column_name,
                        config=self.config,
                    )
                    return group, part

                # Process groups in parallel using async
                clustered_parts = []
                max_workers = min(len(groups), getattr(self.config, 'llm_max_workers', DEFAULT_MAX_WORKERS))

                # Create coroutines (not tasks yet - we're not in event loop)
                coros = [_cluster_group_async(group_info) for group_info in groups]

                # Use gather to run them all - this works even if not in event loop yet
                # We'll iterate with as_completed for progress tracking
                tasks = [asyncio.ensure_future(coro) for coro in coros]

                # Add progress bar for parallel clustering
                total_groups = len(groups)
                completed_groups = 0
                with tqdm(total=total_groups, desc=f"Clustering {total_groups} groups in parallel", disable=not getattr(self, "verbose", False)) as pbar:
                    for task in asyncio.as_completed(tasks):
                        group, part = await task
                        clustered_parts.append(part)
                        pbar.update(1)
                        completed_groups += 1
                        if progress_callback:
                            try:
                                progress_callback(completed_groups / total_groups)
                            except Exception:
                                pass
                clustered_df = pd.concat(clustered_parts, ignore_index=True)
            else:
                # Process groups sequentially (default behavior)
                clustered_parts = []
                groups = list(df.groupby(group_col))

                # Add progress bar for sequential clustering
                total_groups = len(groups)
                for i, (group, group_df) in enumerate(tqdm(groups, desc=f"Clustering {len(groups)} groups sequentially", disable=not getattr(self, "verbose", False))):
                    if getattr(self, "verbose", False):
                        logger.info(f"--------------------------------\nClustering group {group}\n--------------------------------")
                    part = await hdbscan_cluster_categories(
                        group_df,
                        column_name=column_name,
                        config=self.config,
                    )
                    clustered_parts.append(part)
                    if progress_callback:
                        try:
                            progress_callback((i + 1) / total_groups)
                        except Exception:
                            pass
                clustered_df = pd.concat(clustered_parts, ignore_index=True)
        else:
            clustered_df = await hdbscan_cluster_categories(
                df,
                column_name=column_name,
                config=self.config,
            )

        # Add back positive behaviors with special cluster assignment if they were filtered out
        if positive_df is not None and len(positive_df) > 0:
            id_col = f"{column_name}_cluster_id"
            label_col = f"{column_name}_cluster_label"

            # Assign special cluster ID and label for positive behaviors that weren't clustered
            positive_df[id_col] = -2
            positive_df[label_col] = "Positive (not clustered)"
            if "meta" not in positive_df.columns:
                positive_df["meta"] = [{} for _ in range(len(positive_df))]

            # Concatenate back with clustered results
            clustered_df = pd.concat([clustered_df, positive_df], ignore_index=True)
            self.log(f"Added back {len(positive_df)} positive behaviors with cluster_id=-2 (not clustered)")

        return clustered_df

    async def postprocess_clustered_df(self, df: pd.DataFrame, column_name: str, prettify_labels: bool = False) -> pd.DataFrame:
        """Standard post-processing plus stratified ID re-assignment when needed."""

        label_col = f"{column_name}_cluster_label"
        id_col = f"{column_name}_cluster_id"

        # Ensure no NaN values in label column (causes downstream issues in metrics)
        if label_col in df.columns and df[label_col].isna().any():
            n_nans = df[label_col].isna().sum()
            self.log(f"Found {n_nans} properties with NaN cluster labels. Assigning to 'Outliers'.")
            df[label_col] = df[label_col].fillna("Outliers")
            df.loc[df[label_col] == "Outliers", id_col] = -1

        df = await super().postprocess_clustered_df(df, label_col, prettify_labels)

        # 1️⃣  Move tiny clusters to Outliers
        label_counts = df[label_col].value_counts()
        min_size_threshold = int((getattr(self.config, "min_cluster_size", 1) or 1))
        too_small_labels = label_counts[label_counts < min_size_threshold].index
        for label in too_small_labels:
            mask = df[label_col] == label
            cid = df.loc[mask, id_col].iloc[0] if not df.loc[mask].empty else None
            self.log(
                f"Assigning cluster {cid} (label '{label}') to Outliers because it has {label_counts[label]} items"
            )

            # Check if we're using groupby and assign group-specific outlier labels
            group_col = getattr(self.config, "groupby_column", None)
            if group_col is not None and group_col in df.columns:
                # Assign group-specific outlier labels
                for group_value in df.loc[mask, group_col].unique():
                    group_mask = mask & (df[group_col] == group_value)
                    outlier_label = f"Outliers - {group_value}"
                    df.loc[group_mask, label_col] = outlier_label
                    df.loc[group_mask, id_col] = -1
            else:
                # Standard outlier assignment
                df.loc[mask, label_col] = "Outliers"
                df.loc[mask, id_col] = -1

        # 2️⃣  For stratified mode: ensure cluster IDs are unique across partitions
        group_col = getattr(self.config, "groupby_column", None)
        if group_col is not None and group_col in df.columns:
            # Handle group-specific outlier labels
            outlier_mask = df[label_col].str.startswith("Outliers - ") if df[label_col].dtype == 'object' else df[label_col] == "Outliers"
            non_outlier_mask = ~outlier_mask

            unique_pairs = (
                df.loc[non_outlier_mask, [group_col, label_col]]
                .drop_duplicates()
                .reset_index(drop=True)
            )
            pair_to_new_id = {
                (row[group_col], row[label_col]): idx for idx, row in unique_pairs.iterrows()
            }
            for (gval, lbl), new_id in pair_to_new_id.items():
                pair_mask = (df[group_col] == gval) & (df[label_col] == lbl) & non_outlier_mask
                df.loc[pair_mask, id_col] = new_id

            # Handle group-specific outliers: assign unique IDs to each outlier group
            if outlier_mask.any():
                # Get unique outlier labels
                unique_outlier_labels = df.loc[outlier_mask, label_col].unique()

                # Assign unique IDs to each outlier group, starting from a high negative number
                # to avoid conflicts with regular cluster IDs
                outlier_id_start = -1000
                for i, outlier_label in enumerate(unique_outlier_labels):
                    outlier_label_mask = df[label_col] == outlier_label
                    unique_outlier_id = outlier_id_start - i
                    df.loc[outlier_label_mask, id_col] = unique_outlier_id

        return df

    # ------------------------------------------------------------------
    # 🏷️  Cluster construction helper with group metadata
    # ------------------------------------------------------------------
    def _build_clusters_from_df(self, df: pd.DataFrame, column_name: str):
        """Build clusters and, in stratified mode, add group info to metadata."""

        clusters = super()._build_clusters_from_df(df, column_name)

        group_col = getattr(self.config, "groupby_column", None)
        if group_col is not None and group_col in df.columns:
            id_col = f"{column_name}_cluster_id"

            # Pandas may upcast cluster ids to floats if there are any NaNs in the column.
            # Normalize ids to integers before converting to string keys (Cluster.id is a str).
            id_group_df = df.loc[df[id_col].notna(), [id_col, group_col]].dropna().copy()
            if not id_group_df.empty:
                # If ids are floats like 0.0, cast back to int safely.
                id_group_df[id_col] = id_group_df[id_col].astype(float).astype(int)

            id_to_group = (
                id_group_df.groupby(id_col)[group_col]
                .agg(lambda s: s.iloc[0])
                .to_dict()
            )
            id_to_group = {str(int(k)): v for k, v in id_to_group.items()}

            for c in clusters:
                cid = getattr(c, "id", None)
                if cid in id_to_group:
                    c.meta = dict(c.meta or {})
                    group_val = id_to_group[cid]
                    # Frontend expects `metadata.group` for chart categorization.
                    c.meta["group"] = group_val
                    # Also store under the actual grouping column name (e.g. "behavior_type")
                    # for explicitness and easier debugging.
                    c.meta[group_col] = group_val

        return clusters

Metrics

get_metrics()

get_metrics(method, **kwargs)

Factory function for metrics stages.

Parameters:

Name Type Description Default
method str

"side_by_side", "single_model", "functional", or "single_model_legacy"

required
**kwargs

Additional configuration for the metrics stage

{}

Returns:

Type Description
PipelineStage

Configured metrics stage

Source code in stringsight/metrics/__init__.py
def get_metrics(method: str, **kwargs) -> "PipelineStage":
    """
    Factory function for metrics stages.

    Args:
        method: "side_by_side", "single_model", "functional", or "single_model_legacy"
        **kwargs: Additional configuration for the metrics stage

    Returns:
        Configured metrics stage
    """
    # Remap legacy flag name for wandb to the functional parameter
    if "use_wandb" in kwargs and "log_to_wandb" not in kwargs:
        kwargs["log_to_wandb"] = kwargs.pop("use_wandb")

    if method == "side_by_side":
        return SideBySideMetrics(**kwargs)
    elif method == "single_model":
        # NEW: Default to functional metrics for single_model
        return SingleModelMetrics(**kwargs)
    # elif method == "functional":
    #     return FunctionalMetrics(**kwargs)
    else:
        raise ValueError(f"Unknown metrics method: {method}. Available: 'side_by_side', 'single_model', 'functional', 'single_model_legacy'") 

Utilities

Format Detection

detect_method(columns)

Return the best-matching method based on available columns.

Parameters:

Name Type Description Default
columns List[str]

List of column names present in the dataset

required

Returns:

Type Description
Optional[Method]

"single_model" | "side_by_side" if a set of required columns is satisfied,

Optional[Method]

otherwise None.

Source code in stringsight/formatters/traces.py
def detect_method(columns: List[str]) -> Optional[Method]:
    """Return the best-matching method based on available columns.

    Args:
        columns: List of column names present in the dataset

    Returns:
        "single_model" | "side_by_side" if a set of required columns is satisfied,
        otherwise None.
    """
    col_set = set(columns)
    if set(REQUIRED_COLUMNS["side_by_side"]).issubset(col_set):
        return "side_by_side"
    if set(REQUIRED_COLUMNS["single_model"]).issubset(col_set):
        return "single_model"
    return None

Validation

validate_required_columns(df, method)

Return the list of missing required columns for the given method.

Empty list indicates the DataFrame satisfies the requirement.

Source code in stringsight/formatters/traces.py
def validate_required_columns(df: pd.DataFrame, method: Method) -> List[str]:
    """Return the list of missing required columns for the given method.

    Empty list indicates the DataFrame satisfies the requirement.
    """
    required = set(REQUIRED_COLUMNS[method])
    missing = [c for c in REQUIRED_COLUMNS[method] if c not in df.columns]
    return missing