Решение ДЗ
-- в базе данных источнике
alter table public.staff add column delated timestamp null;
-- в базе данных хранилища
alter table film_src.staff add column deleted timestamp options(column_name 'deleted') null;
alter table core.dim_staff add unique(staff_id);
-- изменения в скрипте полного пересоздания хранилища:
create table staging.staff (
staff_id int4 NOT NULL,
first_name varchar(45) NOT NULL,
last_name varchar(45) NOT NULL,
store_id int2 NOT null,
deleted timestamp null
);create or replace procedure staging.staff_load()
as $$
declare
last_update_dt timestamp;
begin
last_update_dt = coalesce(
(
select
max(update_dt)
from
staging.last_update
where
table_name = 'staging.staff'
),
'1900-01-01'::date
);
delete from staging.staff;
insert into staging.staff
(
staff_id,
first_name,
last_name,
store_id,
deleted
)
select
staff_id,
first_name,
last_name,
store_id,
deleted
from
film_src.staff s
where
s.last_update >= last_update_dt
or s.deleted >= last_update_dt;
INSERT INTO staging.last_update
(
table_name,
update_dt
)
VALUES(
'staging.staff',
now()
);
end;
$$
language plpgsql;create table core.dim_staff (
staff_pk serial primary key,
staff_id integer not null unique,
first_name varchar(45) not null,
last_name varchar(45) not null,
address varchar(50) not null,
district varchar(20) not null,
city_name varchar(50) not null
);create or replace procedure core.load_staff()
as $$
begin
delete from core.dim_staff s
where s.staff_id in (
select
st.staff_id
from
staging.staff st
where
st.deleted is not null
);
with cte as (
select
s.staff_id,
s.first_name,
s.last_name,
a.address,
a.district,
c.city as city_name
from
staging.staff s
join staging.store st using (store_id)
join staging.address a using (address_id)
join staging.city c using (city_id)
where
s.deleted is null
)
insert into core.dim_staff
(
staff_id,
first_name,
last_name,
address,
district,
city_name
)
select
s.staff_id,
s.first_name,
s.last_name,
s.address,
s.district,
s.city_name
from
cte s
on conflict (staff_id) do update
set
first_name = excluded.first_name,
last_name = excluded.last_name,
address = excluded.address,
district = excluded.district,
city_name = excluded.city_name;
end;
$$
language plpgsql;