Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
797d9ca
implement source_is_single_batch and disable_hwm parameters for (mult…
tkiehn Feb 3, 2025
0c5330a
fix ma_sat_v0 source_cte-variable assignment
tkiehn Feb 4, 2025
be56f22
move source_cte value assignment
tkiehn Feb 6, 2025
9f626e5
fix case-sensitivity for hashing standardization and change all multi…
tkiehn Feb 7, 2025
5061c55
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 7, 2025
db5d924
exasol__attribute_standardise, remove unnecessary escape character
tkiehn Feb 11, 2025
352eba1
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 11, 2025
a3d59ad
fix fabric (multi active) hash standardization for varbinary and varchar
tkiehn Feb 11, 2025
89bee8b
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 11, 2025
0049965
unify synapse hash standardization
tkiehn Feb 11, 2025
2eb5641
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 11, 2025
4b23f5d
oracle unify hash standardization
tkiehn Feb 11, 2025
807b219
bigquery: fix conversion of base64 encoded md5 hashes to strings
tkiehn Feb 11, 2025
5e0f893
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 11, 2025
afdac9e
remove brackets
tkiehn Feb 11, 2025
db4ef38
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 11, 2025
cc81a7d
add source_is_single_batch for ref_sats
tkiehn Feb 12, 2025
6cbc494
add source_is_single_batch to ma_sat_v0 where missing
tkiehn Feb 12, 2025
cab89c0
add source_is_single_batch to sat_v0 where missing
tkiehn Feb 12, 2025
d07834c
make satellite hwm safe against outside truncate where missing
tkiehn Feb 12, 2025
ff06f4d
Merge branch '176-ma-sat-v0-add-disable_hwm-and-source_is_single_batc…
tkiehn Feb 12, 2025
e406ee1
add missing if
tkiehn Feb 12, 2025
3e3cddd
Merge remote-tracking branch 'origin/176-ma-sat-v0-add-disable_hwm-an…
tkiehn Feb 12, 2025
138f2dc
databricks: change sort order for multi active hashdiffs
tkiehn Feb 12, 2025
b487b01
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 12, 2025
f7675fb
databricks: replicate list_agg for multi-active hashdiffs
tkiehn Feb 12, 2025
6f7289b
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 12, 2025
7c88a54
databricks: add support for multiple multi-active keys for multi_acti…
tkiehn Feb 13, 2025
713901b
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 13, 2025
bbef06f
extend ghost_records-macro hash_default_values call with the datatype
tkiehn Feb 17, 2025
b92d5fc
extend exasol hash default values with distinction of hashtype and ch…
tkiehn Feb 17, 2025
807699e
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 17, 2025
4f0b1cd
align oracle hash concat string with other adapters
tkiehn Feb 17, 2025
46ddfba
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 17, 2025
893da90
align redshift hash concat string with other adapters
tkiehn Feb 17, 2025
b11b1fb
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 17, 2025
829a4dc
lower hash output for fabric varchars
tkiehn Feb 17, 2025
b8817eb
Merge branch 'unify-multi-active-hash-standardization' into ma_sat_hw…
tkiehn Feb 17, 2025
3ceee5d
Merge remote-tracking branch 'origin/main' into release_hash_unificat…
tkiehn Feb 18, 2025
a3bcb76
fabric: change ledts calculation to use -1 microsecond
tkiehn Feb 18, 2025
a1978d9
synapse_ma_sat_v1: change subtraction of ledts from 1 millisecond to …
tkiehn Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions macros/supporting/ghost_record_per_datatype.sql
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@
{%- set error_value__STRING = var('datavault4dbt.error_value__STRING', '(error)') -%}
{%- set unknown_value_alt__STRING = var('datavault4dbt.unknown_value_alt__STRING', 'u') -%}
{%- set error_value_alt__STRING = var('datavault4dbt.error_value_alt__STRING', 'e') -%}

{%- set hash = datavault4dbt.hash_method() -%}
{%- set hash_default_values = fromjson(datavault4dbt.hash_default_values(hash_function=hash)) -%}
{%- set hash_dtype = var('datavault4dbt.hash_datatype', 'HASHTYPE') -%}
{%- set hash_default_values = fromjson(datavault4dbt.hash_default_values(hash_function=hash,hash_datatype=hash_dtype)) -%}
{%- set hash_alg= hash_default_values['hash_alg'] -%}
{%- set unknown_value__HASHTYPE = hash_default_values['unknown_key'] -%}
{%- set error_value__HASHTYPE = hash_default_values['error_key'] -%}
Expand Down Expand Up @@ -260,7 +262,8 @@
{%- set error_value_alt__STRING = var('datavault4dbt.error_value_alt__STRING', 'e') -%}

{%- set hash = datavault4dbt.hash_method() -%}
{%- set hash_default_values = fromjson(datavault4dbt.hash_default_values(hash_function=hash)) -%}
{%- set hash_dtype = var('datavault4dbt.hash_datatype', 'BINARY(16)') -%}
{%- set hash_default_values = fromjson(datavault4dbt.hash_default_values(hash_function=hash,hash_datatype=hash_dtype)) -%}
{%- set hash_alg= hash_default_values['hash_alg'] -%}

{%- set unknown_value__HASHTYPE = hash_default_values['unknown_key'] -%}
Expand Down Expand Up @@ -459,7 +462,8 @@
{%- set error_value__numeric = var('datavault4dbt.error_value__numeric', -2) -%}

{%- set hash = datavault4dbt.hash_method() -%}
{%- set hash_default_values = fromjson(datavault4dbt.hash_default_values(hash_function=hash)) -%}
{%- set hash_dtype = var('datavault4dbt.hash_datatype', 'VARBINARY(16)') -%}
{%- set hash_default_values = fromjson(datavault4dbt.hash_default_values(hash_function=hash,hash_datatype=hash_dtype)) -%}
{%- set hash_alg= hash_default_values['hash_alg'] -%}
{%- set unknown_value__HASHTYPE = hash_default_values['unknown_key'] -%}
{%- set error_value__HASHTYPE = hash_default_values['error_key'] -%}
Expand Down
4 changes: 2 additions & 2 deletions macros/supporting/hash.sql
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@
{%- macro redshift__hash(columns, alias, is_hashdiff, multi_active_key, main_hashkey_column) -%}

{%- set hash = var('datavault4dbt.hash', 'MD5') -%}
{%- set concat_string = var('concat_string', '|') -%}
{%- set concat_string = var('concat_string', '||') -%}
{%- set quote = var('quote', '"') -%}
{%- set null_placeholder_string = var('null_placeholder_string', '^^') -%}

Expand Down Expand Up @@ -514,7 +514,7 @@
{%- macro oracle__hash(columns, alias, is_hashdiff, multi_active_key, main_hashkey_column) -%}

{%- set hash = var('datavault4dbt.hash', 'MD5') -%}
{%- set concat_string = var('concat_string', '|') -%}
{%- set concat_string = var('concat_string', '||') -%}
{%- set quote = var('quote', '"') -%}
{%- set null_placeholder_string = var('null_placeholder_string', '^^') -%}

Expand Down
26 changes: 22 additions & 4 deletions macros/supporting/hash_default_values.sql
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,34 @@
{%- set unknown_key = '' -%}
{%- set error_key = '' -%}

{%- if hash_function == 'MD5' -%}
{%- set hash_alg = 'HASHTYPE_MD5' -%}
{%- if 'MD5' in hash_function -%}

{%- if 'HASHTYPE' in hash_datatype -%}
{%- set hash_alg = 'HASHTYPE_MD5' -%}
{%- elif 'CHAR' in hash_datatype -%}
{%- set hash_alg = 'HASH_MD5' -%}
{%- endif -%}

{%- set unknown_key = '!00000000000000000000000000000000' -%}
{%- set error_key = '!ffffffffffffffffffffffffffffffff' -%}
{%- elif (hash_function == 'SHA' or hash_function == 'SHA1') -%}
{%- set hash_alg = 'HASHTYPE_SHA1' -%}

{%- if 'HASHTYPE' in hash_datatype -%}
{%- set hash_alg = 'HASHTYPE_SHA1' -%}
{%- elif 'CHAR' in hash_datatype -%}
{%- set hash_alg = 'HASH_SHA1' -%}
{%- endif -%}

{%- set unknown_key = '!0000000000000000000000000000000000000000' -%}
{%- set error_key = '!ffffffffffffffffffffffffffffffffffffffff' -%}
{%- elif (hash_function == 'SHA2' or hash_function == 'SHA256') -%}
{%- set hash_alg = 'HASHTYPE_SHA256' -%}

{%- if 'HASHTYPE' in hash_datatype -%}
{%- set hash_alg = 'HASHTYPE_SHA256' -%}
{%- elif 'CHAR' in hash_datatype -%}
{%- set hash_alg = 'HASH_SHA256' -%}
{%- endif -%}

{%- set unknown_key = '!0000000000000000000000000000000000000000000000000000000000000000' -%}
{%- set error_key = '!ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff' -%}
{%- endif -%}
Expand Down
318 changes: 184 additions & 134 deletions macros/supporting/hash_standardization.sql

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion macros/tables/bigquery/eff_sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ source_data AS (
{%- if is_incremental() and not disable_hwm %}
AND src.{{ src_ldts }} > (
SELECT
MAX({{ src_ldts }})
COALESCE(MAX({{ src_ldts }}), {{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }})
FROM {{ this }}
WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
)
Expand Down
36 changes: 21 additions & 15 deletions macros/tables/bigquery/ma_sat_v0.sql
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
{%- macro default__ma_sat_v0(parent_hashkey, src_hashdiff, src_ma_key, src_payload, src_ldts, src_rsrc, source_model) -%}
{%- macro default__ma_sat_v0(parent_hashkey, src_hashdiff, src_ma_key, src_payload, src_ldts, src_rsrc, source_model, disable_hwm, source_is_single_batch) -%}

{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%}
{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%}
{%- set timestamp_format = datavault4dbt.timestamp_format() -%}

{%- set ns=namespace(src_hashdiff="", hdiff_alias="") %}
{%- if src_hashdiff is mapping and src_hashdiff is not none -%}
{% set ns.src_hashdiff = src_hashdiff["source_column"] %}
{% set ns.hdiff_alias = src_hashdiff["alias"] %}
{% else %}
{% set ns.src_hashdiff = src_hashdiff %}
{% set ns.hdiff_alias = src_hashdiff %}
{%- set ns.src_hashdiff = src_hashdiff["source_column"] -%}
{%- set ns.hdiff_alias = src_hashdiff["alias"] -%}
{%- else -%}
{% set ns.src_hashdiff = src_hashdiff -%}
{% set ns.hdiff_alias = src_hashdiff -%}
{%- endif -%}

{%- set source_cols = datavault4dbt.expand_column_list(columns=[src_rsrc, src_ldts, src_ma_key, src_payload]) -%}

{%- set source_relation = ref(source_model) -%}

{{ datavault4dbt.prepend_generated_by() }}

WITH

{# Selecting all source data, that is newer than latest data in sat if incremental #}
{# Selecting all source data, that is newer than latest data in sat if incremental and hwm not disabled -#}
source_data AS (

SELECT
Expand All @@ -29,14 +30,15 @@ source_data AS (
{{ datavault4dbt.print_list(source_cols) }}
FROM {{ source_relation }}

{%- if is_incremental() %}
{%- if is_incremental() and not disable_hwm %}
WHERE {{ src_ldts }} > (
SELECT
MAX({{ src_ldts }}) FROM {{ this }}
COALESCE(MAX({{ src_ldts }}), {{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }}) FROM {{ this }}
WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
)
{%- endif %}

{%- set source_cte = 'source_data' -%}
),

{# Get the latest record for each parent hashkey in existing sat, if incremental. #}
Expand All @@ -52,6 +54,7 @@ latest_entries_in_sat AS (
),
{%- endif %}

{% if not source_is_single_batch %}
{# Get a list of all distinct hashdiffs that exist for each parent_hashkey. #}
deduped_row_hashdiff AS (

Expand Down Expand Up @@ -79,21 +82,24 @@ deduped_rows AS (
AND {{ datavault4dbt.multikey(src_ldts, prefix=['source_data', 'deduped_row_hashdiff'], condition='=') }}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['source_data', 'deduped_row_hashdiff'], condition='=') }}


{%- set source_cte = 'deduped_rows' -%}
),
{% endif %}

records_to_insert AS (

SELECT
deduped_rows.{{ parent_hashkey }},
deduped_rows.{{ ns.hdiff_alias }},
{{ datavault4dbt.alias_all(columns=source_cols, prefix='deduped_rows') }}
FROM deduped_rows
{{ source_cte }}.{{ parent_hashkey }},
{{ source_cte }}.{{ ns.hdiff_alias }},
{{ datavault4dbt.alias_all(columns=source_cols, prefix=source_cte) }}
FROM {{ source_cte }}
{%- if is_incremental() %}
WHERE NOT EXISTS (
SELECT 1
FROM latest_entries_in_sat
WHERE {{ datavault4dbt.multikey(parent_hashkey, prefix=['latest_entries_in_sat', 'deduped_rows'], condition='=') }}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduped_rows'], condition='=') }}
WHERE {{ datavault4dbt.multikey(parent_hashkey, prefix=['latest_entries_in_sat', source_cte], condition='=') }}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', source_cte], condition='=') }}
)
{%- endif %}

Expand Down
2 changes: 1 addition & 1 deletion macros/tables/bigquery/nh_sat.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ source_data AS (
{%- if is_incremental() %}
WHERE {{ src_ldts }} > (
SELECT
MAX({{ src_ldts }}) FROM {{ this }}
COALESCE(MAX({{ src_ldts }}), {{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }}) FROM {{ this }}
WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
)
{%- endif %}
Expand Down
19 changes: 12 additions & 7 deletions macros/tables/bigquery/ref_sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ source_data AS (
{%- if is_incremental() and not disable_hwm %}
WHERE {{ src_ldts }} > (
SELECT
MAX({{ src_ldts }}) FROM {{ this }}
COALESCE(MAX({{ src_ldts }}), {{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }}) FROM {{ this }}
WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
)
{%- endif %}
{%- set source_cte = 'source_data' -%}
),

{# Get the latest record for each parent ref key combination in existing sat, if incremental. #}
Expand All @@ -58,7 +59,7 @@ latest_entries_in_sat AS (
QUALIFY ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }} DESC) = 1
),
{%- endif %}

{%- if not source_is_single_batch %}
{#
Deduplicate source by comparing each hashdiff to the hashdiff of the previous record, for each parent ref key combination.
Additionally adding a row number based on that order, if incremental.
Expand All @@ -80,8 +81,9 @@ deduplicated_numbered_source AS (
WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) THEN FALSE
ELSE TRUE
END
{%- set source_cte = 'deduplicated_numbered_source' -%}
),

{% endif -%}
{#
Select all records from the previous CTE. If incremental, compare the oldest incoming entry to
the existing records in the satellite.
Expand All @@ -94,17 +96,20 @@ records_to_insert AS (
{% endfor %}
{{ ns.hdiff_alias }},
{{ datavault4dbt.print_list(source_cols) }}
FROM deduplicated_numbered_source
FROM {{ source_cte }}
{%- if is_incremental() %}
WHERE NOT EXISTS (
SELECT 1
FROM latest_entries_in_sat
WHERE 1=1
{% for ref_key in parent_ref_keys %}
AND {{ datavault4dbt.multikey(ref_key, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}
AND {{ datavault4dbt.multikey(ref_key, prefix=['latest_entries_in_sat', source_cte], condition='=') }}
{% endfor %}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}
AND deduplicated_numbered_source.rn = 1)
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', source_cte], condition='=') }}
{%- if not source_is_single_batch %}
AND {{ source_cte }}.rn = 1
{%- endif %}
)
{%- endif %}

)
Expand Down
24 changes: 18 additions & 6 deletions macros/tables/bigquery/sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ source_data AS (
{{ datavault4dbt.print_list(source_cols) }}
FROM {{ source_relation }}

{%- if is_incremental() %}
{%- if is_incremental() and not disable_hwm %}
WHERE {{ src_ldts }} > (
SELECT
MAX({{ src_ldts }}) FROM {{ this }}
COALESCE(MAX({{ src_ldts }}), {{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }}) FROM {{ this }}
WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
)
{%- endif %}

{%- set source_cte = 'source_data' -%}

),

{# Get the latest record for each parent hashkey in existing sat, if incremental. #}
Expand All @@ -53,6 +56,7 @@ latest_entries_in_sat AS (
),
{%- endif %}

{%- if not source_is_single_batch %}
{#
Deduplicate source by comparing each hashdiff to the hashdiff of the previous record, for each hashkey.
Additionally adding a row number based on that order, if incremental.
Expand All @@ -72,8 +76,13 @@ deduplicated_numbered_source AS (
WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }}) THEN FALSE
ELSE TRUE
END

{%- set source_cte = 'deduplicated_numbered_source' -%}

),

{% endif -%}

{#
Select all records from the previous CTE. If incremental, compare the oldest incoming entry to
the existing records in the satellite.
Expand All @@ -84,14 +93,17 @@ records_to_insert AS (
{{ parent_hashkey }},
{{ ns.hdiff_alias }},
{{ datavault4dbt.print_list(source_cols) }}
FROM deduplicated_numbered_source
FROM {{ source_cte }}
{%- if is_incremental() %}
WHERE NOT EXISTS (
SELECT 1
FROM latest_entries_in_sat
WHERE {{ datavault4dbt.multikey(parent_hashkey, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}
AND deduplicated_numbered_source.rn = 1)
WHERE {{ datavault4dbt.multikey(parent_hashkey, prefix=['latest_entries_in_sat', source_cte], condition='=') }}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', source_cte], condition='=') }}
{%- if not source_is_single_batch %}
AND {{ source_cte }}.rn = 1
{%- endif %}
)
{%- endif %}

)
Expand Down
2 changes: 1 addition & 1 deletion macros/tables/databricks/eff_sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ source_data AS (
{%- if is_incremental() and not disable_hwm %}
AND src.{{ src_ldts }} > (
SELECT
MAX({{ src_ldts }})
COALESCE(MAX({{ src_ldts }}), {{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }})
FROM {{ this }}
WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
)
Expand Down
24 changes: 15 additions & 9 deletions macros/tables/databricks/ma_sat_v0.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro databricks__ma_sat_v0(parent_hashkey, src_hashdiff, src_ma_key, src_payload, src_ldts, src_rsrc, source_model) -%}
{%- macro databricks__ma_sat_v0(parent_hashkey, src_hashdiff, src_ma_key, src_payload, src_ldts, src_rsrc, source_model, disable_hwm, source_is_single_batch) -%}

{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%}
{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%}
Expand All @@ -17,6 +17,7 @@

{%- set source_relation = ref(source_model) -%}

{{ datavault4dbt.prepend_generated_by() }}

WITH

Expand All @@ -29,13 +30,15 @@ source_data AS (
{{ datavault4dbt.print_list(source_cols) }}
FROM {{ source_relation }}

{%- if is_incremental() %}
{%- if is_incremental() and not disable_hwm %}
WHERE {{ src_ldts }} > (
SELECT
MAX({{ src_ldts }}) FROM {{ this }}
COALESCE(MAX({{ src_ldts }}), {{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }}) FROM {{ this }}
WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
)
{%- endif %}

{% set source_cte = 'source_data' %}

),

Expand All @@ -52,6 +55,7 @@ latest_entries_in_sat AS (
),
{%- endif %}

{%- if not source_is_single_batch -%}
{# Get a list of all distinct hashdiffs that exist for each parent_hashkey. #}
deduped_row_hashdiff AS (

Expand Down Expand Up @@ -79,21 +83,23 @@ deduped_rows AS (
AND {{ datavault4dbt.multikey(src_ldts, prefix=['source_data', 'deduped_row_hashdiff'], condition='=') }}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['source_data', 'deduped_row_hashdiff'], condition='=') }}

{%- set source_cte = 'deduped_rows' -%}
),
{%- endif %}

records_to_insert AS (

SELECT
deduped_rows.{{ parent_hashkey }},
deduped_rows.{{ ns.hdiff_alias }},
{{ datavault4dbt.alias_all(columns=source_cols, prefix='deduped_rows') }}
FROM deduped_rows
{{ source_cte }}.{{ parent_hashkey }},
{{ source_cte }}.{{ ns.hdiff_alias }},
{{ datavault4dbt.alias_all(columns=source_cols, prefix=source_cte) }}
FROM {{ source_cte }}
{%- if is_incremental() %}
WHERE NOT EXISTS (
SELECT 1
FROM latest_entries_in_sat
WHERE {{ datavault4dbt.multikey(parent_hashkey, prefix=['latest_entries_in_sat', 'deduped_rows'], condition='=') }}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduped_rows'], condition='=') }}
WHERE {{ datavault4dbt.multikey(parent_hashkey, prefix=['latest_entries_in_sat', source_cte], condition='=') }}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', source_cte], condition='=') }}
)
{%- endif %}

Expand Down
Loading