Код полного пересоздания слоев staging, core, report вместе с инкрементальной загрузкой в core.dim_staff


-- в базе данных источнике

alter table public.staff add column delated timestamp null;



-- в базе данных хранилища

alter table film_src.staff add column deleted timestamp options(column_name 'deleted') null;

alter table core.dim_staff add unique(staff_id);


-- изменения в скрипте полного пересоздания хранилища:

create table staging.staff (

staff_id int4 NOT NULL,

first_name varchar(45) NOT NULL,

last_name varchar(45) NOT NULL,

store_id int2 NOT null,

deleted timestamp null

);

create or replace procedure staging.staff_load()

as $$

declare 

last_update_dt timestamp;

begin 

last_update_dt = coalesce( 

(

select

max(update_dt)

from

staging.last_update

where 

table_name = 'staging.staff'

),

'1900-01-01'::date

);

delete from staging.staff;

insert into staging.staff

(

staff_id,

first_name,

last_name,

store_id,

deleted

)

select

staff_id,

first_name,

last_name,

store_id,

deleted

from

film_src.staff s

where 

s.last_update >= last_update_dt

or s.deleted >= last_update_dt;

INSERT INTO staging.last_update

(

table_name, 

update_dt

)

VALUES(

'staging.staff', 

now()

);

end;

$$ language plpgsql;


create table core.dim_staff (

staff_pk serial primary key,

staff_id integer not null unique,

first_name varchar(45) not null,

last_name varchar(45) not null,

address varchar(50) not null,

district varchar(20) not null,

city_name varchar(50) not null

);


create or replace procedure core.load_staff()

as $$

begin 

delete from core.dim_staff s

where s.staff_id in (

select 

st.staff_id 

from 

staging.staff st

where 

st.deleted is not null

);

with cte as (

select

s.staff_id,

s.first_name,

s.last_name,

a.address,

a.district,

c.city as city_name

from

staging.staff s

join staging.store st using (store_id)

join staging.address a using (address_id)

join staging.city c using (city_id)

where 

s.deleted is null

)

insert into core.dim_staff

(

staff_id,

first_name,

last_name,

address,

district,

city_name

)

select

s.staff_id,

s.first_name,

s.last_name,

s.address,

s.district,

s.city_name 

from

cte s

on conflict (staff_id) do update

set

first_name = excluded.first_name,

last_name = excluded.last_name,

address = excluded.address,

    district = excluded.district,

    city_name = excluded.city_name;

end;

$$ language plpgsql;


Last modified: Saturday, 13 August 2022, 9:10 PM