Skip to content

The Pipeline Class#

A practical guide to building, fitting, transforming, deploying, and moving preprocessing logic with tdprepview.


Overview#

What is Pipeline?

A declarative, SQL-generating preprocessing pipeline for Teradata Vantage. You describe what to do; Pipeline builds a DAG and emits a single, reviewable SQL query (or a view).

How it differs from scikit-learn pipelines
  • In-DB pushdown: work happens in Vantage, not in Python.
  • DAG, not just a list: supports fan-in/fan-out nodes.
  • SQL output: get a query or create a view.
  • No Python at runtime: once deployed as SQL/view/JSON, serving doesn’t need a Python process.

Why a pipeline#

  • Reproducible: one object defines selection, transforms, and naming.
  • Auditable: SQL is reviewable by DBAs; JSON captures full logic.
  • Portable: use as a DataFrame, a SQL string, a database view, or serialized JSON.
  • Clear fit vs transform: statistics computed during fit, pure SQL during transform.

Core concepts#

Step#

A step is a tuple: (input selector, preprocessor(s), optional rename).

from tdprepview import Pipeline
from tdprepview import Impute, Scale, OneHotEncoder, PCA, Cast
pl = Pipeline([
    (["age", "salary"], Impute(kind="median")),          # (selector, preprocessor)
    ({"dtype_include": ["int"]}, Scale(kind="zscore")),  # dict selector
    ("city", OneHotEncoder(categories="auto"), {"prefix": "oh_"})  # with rename
])

Column selectors#

  • str → one column
  • list[str] → explicit columns
  • dict with any of:

    • prefix, suffix, pattern
    • dtype_include, dtype_exclude
    • columns_exclude

Selectors compose left-to-right inside the dict (filtering down the current leaves).

({"prefix": "num_", "dtype_exclude": ["varchar"]}, Scale(kind="minmax"))

Preprocessors#

  • Per-column vs combined-inputs (e.g., PCA). Combined inputs create fan-in nodes.
  • Single-output vs multi-output (affects generated column names).

Renaming options#

Add {"prefix": "...", "suffix": "..."} to control output names. This is especially useful for chaining:

({"prefix": "num_"},
    [Scale(kind="zscore"), PCA(n_components=3)],
{"prefix": "pca_"})

DAG model#

Internally a DAG of ColumnNode and PreProcesserNode is built. A WITH AS query assembles intermediate steps; leaves are the final output columns.


Building a pipeline (syntax patterns)#

pl = Pipeline([
    ("age", Impute(kind="median"))
])
pl = Pipeline([
    (["height", "weight"], Scale(kind="zscore"))
])
pl = Pipeline([
    ({"pattern": "num_"}, Scale(kind="minmax")),
    ({"suffix": "_cat"}, OneHotEncoder(categories="auto"), {"suffix": "_oh"})
])
pl = Pipeline([
    (["income"], [Impute(kind="median"), Scale(kind="zscore")], {"suffix": "_scaled"})
])
Step expansion rule (when you pass a list of preprocessors + options)
  • The first preprocessor runs with your provided prefix/suffix.
  • Subsequent preprocessors are auto-rewired to take the renamed outputs as inputs, and run without re-applying the rename.
  • For dict selectors, prefix/suffix are composed when chaining (pattern is removed to avoid ambiguity).

Input validation (what will assert)#

  • Steps must be tuples of length 2 or 3.
  • Selectors must be str | list[str] | dict with only allowed keys.
  • Lists must be deduplicated (["a","a"] will assert).
  • Preprocessors must be instances of Preprocessor (or a list of them).
  • Rename options accept only "prefix" / "suffix" strings.

Naming conventions#

Prefer early, explicit renames to avoid collisions and to make downstream selectors stable (e.g., prefix PCA outputs with pca_…).


Depp Dive into Fitting: what actually happens#

  1. Requirements

    • A live teradataml context.
    • Provide either DF or schema_name + table_name.
  2. DAG growth

    • For each step, the selector resolves against current leaves.
    • Nodes are appended; combined-input preprocessors create fan-in.
  3. Statistics collection (only if needed)

    • Triggered by preprocessor.necessary_statistics.
    • Sources:

      • In-DB: UnivariateStatistics (mean, std, median, percentiles), top values, top tokens.
      • sklearn helpers (when applicable): e.g., IterativeImputer params, PCA loadings, decision tree bin edges, power transform lambdas.
  4. Stored after fit

    • The constructed DAG and any fitted statistics per node.

Depp Dive into Transforming: what actually happens#

  • Inputs: DF or schema/table (same checks as fit).

  • Outputs via return_type:

    • "df"tdml.DataFrame (default)
    • "str" → SQL string
    • None → side-effects only (e.g., create view)
  • Column forwarding & resilience

    • Columns present in DF but not in DAG are forwarded unchanged.
    • Columns expected by DAG but missing in DF are skipped.
  • Deterministic as long as schema and dtypes match what you fitted on.

DF_out = pl.transform(schema_name="scoring_schema", table_name="raw_view")
sql = pl.transform(DF=DF_in, return_type="str")
pl.transform(
    DF=DF_in,
    return_type=None,
    create_replace_view=True,
    output_schema_name="prod_features",
    output_view_name="customer_prepared"
)
# later:
DF_out = tdml.DataFrame(tdml.in_schema("prod_features", "customer_prepared"))

Deploying preprocessing#

Think of three persistence strategies:

  1. Persist a single run Materialize or create a view once; downstream jobs just read it.

  2. Persist the query Use create_replace_view=True as a "deploy button." The logic becomes a database view that always reflects the current base table.

  3. Persist the logic Serialize to JSON. Python needed to reconstruct query at serve time; you can store the JSON in a CLOB for governance and later reconstruct the pipeline.

# 1) Get SQL once

sql = pl.transform(schema_name="s", table_name="t", return_type="str")

# 2) Deploy as view

pl.transform(DF=DF_in, return_type=None, create_replace_view=True,
output_schema_name="prod", output_view_name="features_v")

# 3) Store logic as JSON

pl.to_json("pipeline.json")

# ... store file content in a CLOB column for versioning

restored = Pipeline.from_json("pipeline.json")
Versioning ideas
  • Store each JSON in a table with metadata (hash, author, created_at).

Visualization#

Use a Sankey diagram to inspect lineage and fan-in/fan-out.

fig = pl.plot_sankey()
fig.update_layout(height=900)
fig.show()
Sankey Output


Understand the produced query#

Assuming this is the input data

Input Data
customerid exited surname creditscore geography gender age tenure balance hascrcard isactivemember estimatedsalary bank_products
15768104 0 Wright 788 Spain Male 37.0 8 141541.25 0 0 66013.27 RetirementAccount
15809826 1 Craigie 728 France Female 46.0 2 109705.52 1 0 20276.87 PersonalLoan
15717736 0 Dr. Shen 639 Germany Female 46.0 10 110031.09 1 1 133995.59 CertificateOfDeposit,CheckingAccount
15748589 0 Winter 736 France Female 30.0 9 0.0 1 0 34180.33 CreditCard,HomeEquityLoan
15704053 1 T'ang 710 Spain Male 62.0 3 131078.42 1 0 119348.76 RetirementAccount,PersonalLoan
15806808 1 Hope 834 Germany Female None 8 112281.60 1 0 140225.14 CheckingAccount,AutoLoan,RetirementAccount
15694530 0 Porter 672 France Male 28.0 4 167268.98 1 1 169469.30 HomeEquityLoan
15712903 0 Diaz 499 France Female 21.0 3 176511.08 1 1 153920.22 InvestmentFund
15791045 0 Boni 568 France Female 38.0 3 132951.92 0 1 124486.28 RetirementAccount
15803790 0 Allen 638 Germany Male 37.0 2 89728.86 1 1 37294.88 CertificateOfDeposit,PersonalLoan

and assuming this is the Pipeline plan we have produced

Pipeline steps
import tdprepview
steps = [
    (['creditscore', 'age', 'tenure', 'balance', 'estimatedsalary'],
        tdprepview.SimpleImputer(strategy='mean')),

    (['geography', 'gender', 'bank_products'],
        tdprepview.ImputeText(kind='mode')),

    (['hascrcard', 'isactivemember'],
        tdprepview.SimpleImputer(strategy='most_frequent')),

    (['creditscore', 'age', 'tenure', 'balance', 'estimatedsalary'],
        tdprepview.MinMaxScaler()),

    (['age'],
        tdprepview.PowerTransformer(method='yeo-johnson')),

    (['geography'],
        tdprepview.OneHotEncoder(max_categories=20)),

    (['gender'],
        tdprepview.LabelEncoder(elements='TOP1')),

    (['bank_products'],
        tdprepview.MultiLabelBinarizer(delimiter=',', max_categories=20)),

    ({'columns_exclude': ['customerid', 'exited', 'surname']},
        tdprepview.Cast(new_type='FLOAT'))
]

pl = tdprepview.Pipeline(steps)

After fitting, we get this query. Hover over the annotations to understand where each piece comes from.

WITH preprocessing_steps AS
(
SELECT
   customerid AS c_i_0, -- (1)!
   exited AS c_i_1,
   surname AS c_i_2,
   creditscore AS c_i_3,
   geography AS c_i_4,
   gender AS c_i_5,
   age AS c_i_6,
   tenure AS c_i_7,
   balance AS c_i_8,
   hascrcard AS c_i_9,
   isactivemember AS c_i_10,
   estimatedsalary AS c_i_11,
   bank_products AS c_i_12,
   COALESCE( c_i_3 , 650.7018 ) AS c_i_13, -- (2)!
   COALESCE( c_i_4 , 'France' ) AS c_i_18, -- (3)!
   COALESCE( c_i_5 , 'Male' ) AS c_i_19,
   COALESCE( c_i_6 , 38.87352 ) AS c_i_14,
   COALESCE( c_i_7 , 4.9726 ) AS c_i_15,
   COALESCE( c_i_8 , 75582.44 ) AS c_i_16,
   COALESCE( c_i_9 , 1.0 ) AS c_i_21, -- (4)!
   COALESCE( c_i_10 , 1.0 ) AS c_i_22,
   COALESCE( c_i_11 , 100690.6 ) AS c_i_17,
   COALESCE( c_i_12 , 'RetirementAccount' ) AS c_i_20,
   ( ( c_i_13 )  - 350.0 ) / NULLIF( 500.0 , 0)  AS c_i_23, -- (5)!
   CASE c_i_18 WHEN 'France' THEN 1 ELSE 0 END  AS c_i_29, -- (6)!
   CASE c_i_18 WHEN 'Germany' THEN 1 ELSE 0 END  AS c_i_30,
   CASE c_i_18 WHEN 'Spain' THEN 1 ELSE 0 END  AS c_i_31,
   CASE WHEN (c_i_18) IS NOT IN ('France', 'Germany', 'Spain') THEN 1 ELSE 0 END  AS c_i_32,
   CASE c_i_19 WHEN 'Male' THEN 1 ELSE 0 END  AS c_i_33, -- (7)!
   ( ( c_i_14 )  - 18.0 ) / NULLIF( 74.0 , 0)  AS c_i_24,
   ( ( c_i_15 )  - 0.0 ) / NULLIF( 10.0 , 0)  AS c_i_25,
   ( ( c_i_16 )  - 0.0 ) / NULLIF( 250898.1 , 0)  AS c_i_26,
   CAST( (c_i_21) AS FLOAT ) AS c_i_53, 
   CAST( (c_i_22) AS FLOAT ) AS c_i_54,
   ( ( c_i_17 )  - 11.58 ) / NULLIF( 199941.8 , 0)  AS c_i_27,
   CASE WHEN (POSITION(',MortgageLoan,' IN ','||c_i_20||','))>0 THEN 1 ELSE 0 END AS c_i_34, -- (9)!
   CASE WHEN (POSITION(',RetirementAccount,' IN ','||c_i_20||','))>0 THEN 1 ELSE 0 END AS c_i_35,
   CASE WHEN (POSITION(',CreditCard,' IN ','||c_i_20||','))>0 THEN 1 ELSE 0 END AS c_i_36,
   CASE WHEN (POSITION(',InvestmentFund,' IN ','||c_i_20||','))>0 THEN 1 ELSE 0 END AS c_i_37,
   CASE WHEN (POSITION(',HomeEquityLoan,' IN ','||c_i_20||','))>0 THEN 1 ELSE 0 END AS c_i_38,
   CASE WHEN (POSITION(',PersonalLoan,' IN ','||c_i_20||','))>0 THEN 1 ELSE 0 END AS c_i_39,
   CASE WHEN (POSITION(',CheckingAccount,' IN ','||c_i_20||','))>0 THEN 1 ELSE 0 END AS c_i_40,
   CASE WHEN (POSITION(',SavingsAccount,' IN ','||c_i_20||','))>0 THEN 1 ELSE 0 END AS c_i_41,
   CASE WHEN (POSITION(',CertificateOfDeposit,' IN ','||c_i_20||','))>0 THEN 1 ELSE 0 END AS c_i_42,
   CASE WHEN (POSITION(',AutoLoan,' IN ','||c_i_20||','))>0 THEN 1 ELSE 0 END AS c_i_43,
   CAST( (c_i_23) AS FLOAT ) AS c_i_44, -- (10)!
   CAST( (c_i_29) AS FLOAT ) AS c_i_45,
   CAST( (c_i_30) AS FLOAT ) AS c_i_46,
   CAST( (c_i_31) AS FLOAT ) AS c_i_47,
   CAST( (c_i_32) AS FLOAT ) AS c_i_48,
   CAST( (c_i_33) AS FLOAT ) AS c_i_49,
   CASE WHEN c_i_24 >= 0.0 THEN (POWER(c_i_24 + 1 ,-1.968491)-1)/(-1.968491) ELSE -(POWER(-c_i_24 + 1,2-(-1.968491))-1)/(2-(-1.968491)) END AS c_i_28, -- (8)!
   CAST( (c_i_25) AS FLOAT ) AS c_i_51,
   CAST( (c_i_26) AS FLOAT ) AS c_i_52,
   CAST( (c_i_27) AS FLOAT ) AS c_i_55,
   CAST( (c_i_34) AS FLOAT ) AS c_i_56,
   CAST( (c_i_35) AS FLOAT ) AS c_i_57,
   CAST( (c_i_36) AS FLOAT ) AS c_i_58,
   CAST( (c_i_37) AS FLOAT ) AS c_i_59,
   CAST( (c_i_38) AS FLOAT ) AS c_i_60,
   CAST( (c_i_39) AS FLOAT ) AS c_i_61,
   CAST( (c_i_40) AS FLOAT ) AS c_i_62,
   CAST( (c_i_41) AS FLOAT ) AS c_i_63,
   CAST( (c_i_42) AS FLOAT ) AS c_i_64,
   CAST( (c_i_43) AS FLOAT ) AS c_i_65,
   CAST( (c_i_28) AS FLOAT ) AS c_i_50
FROM
   demo_user.order_raw_trainig t
)

SELECT
   c_i_0 AS customerid, -- (11)!
   c_i_1 AS exited,
   c_i_2 AS surname,
   c_i_44 AS creditscore,
   c_i_45 AS geography__OHE_1_France, 
   c_i_46 AS geography__OHE_2_Germany,
   c_i_47 AS geography__OHE_3_Spain,
   c_i_48 AS geography__OHE_0_otherwise,
   c_i_49 AS gender,
   c_i_50 AS age,
   c_i_51 AS tenure,
   c_i_52 AS balance,
   c_i_53 AS hascrcard,
   c_i_54 AS isactivemember,
   c_i_55 AS estimatedsalary,
   c_i_56 AS bank_products__MLB_1_MortgageLoan, -- (12)!
   c_i_57 AS bank_products__MLB_2_RetirementAccount,
   c_i_58 AS bank_products__MLB_3_CreditCard,
   c_i_59 AS bank_products__MLB_4_InvestmentFund,
   c_i_60 AS bank_products__MLB_5_HomeEquityLoan,
   c_i_61 AS bank_products__MLB_6_PersonalLoan,
   c_i_62 AS bank_products__MLB_7_CheckingAccount,
   c_i_63 AS bank_products__MLB_8_SavingsAccount,
   c_i_64 AS bank_products__MLB_9_CertificateOfDeposit,
   c_i_65 AS bank_products__MLB_10_AutoLoan
FROM
   preprocessing_steps t
  1. aliasing of possibly lengthy column names!
  2. SimpleImputer(strategy='mean') on numeric columns (creditscore, age, tenure, balance, estimatedsalary) becomes COALESCE(value, learned_mean)c_i_13..17.
  3. ImputeText(kind='mode') on geography, gender, bank_products: COALESCE(text_col, learned_mode)c_i_18..20.
  4. SimpleImputer(strategy='most_frequent') on hascrcard, isactivemember: COALESCE(int_col, mode)c_i_21..22.
  5. MinMaxScaler() on numeric set: (x - min) / NULLIF(range, 0) using fitted stats → c_i_23 (creditscore shown), and similarly c_i_24..27 for age, tenure, balance, estimatedsalary.
  6. OneHotEncoder(max_categories=20) on geography: one column per top category plus “otherwise” bucket → c_i_29..32.
  7. LabelEncoder(elements='TOP1') on gender: binary indicator for the top element (here 'Male' as 1, else 0) → c_i_33.
  8. PowerTransformer(method='yeo-johnson') on scaled age (c_i_24): applies learned λ (here ≈ −1.9685) to produce c_i_28.
  9. MultiLabelBinarizer(delimiter=',') on bank_products: token presence via POSITION(',Token,' IN ','||col||',')c_i_34..43.
  10. Cast(new_type='FLOAT') on all features except excluded identifiers: ensures numeric dtypes for ML-serving (c_i_44..65, c_i_50, c_i_53..55).
  11. Forwarding untouched inputs (no preprocessors applied): customerid, exited, surname are passed through unchanged.
  12. Final naming in SELECT: stable, human-readable feature names with suffixes that reflect the preprocessor (__OHE_*, __MLB_*, raw names for scaled/casted numerics).