Первоначальный скрипт пересоздания staging и core слоев:
https://drive.google.com/file/d/1qLtjbnbPOV0lqelcCiMuyZUmfQ75Fo_p/view?usp=sharing


-- создание staging слоя

-- создание таблиц staging слоя


drop table if exists staging.film;


create table staging.film (

film_id int not null,

title varchar(255) not null,

description text null,

release_year int2 null,

language_id int2 not null,

rental_duration int2 not null,

rental_rate numeric(4,2) not null,

length int2 null,

replacement_cost numeric(5,2) not null,

rating varchar(10) null,

last_update timestamp not null,

special_features _text null,

fulltext tsvector not null

);


drop table if exists staging.inventory;


create table staging.inventory (

inventory_id int4 not null,

film_id int2 not null,

store_id int2 not null

);


drop table if exists staging.rental;


create table staging.rental (

rental_id int4 not null,

rental_date timestamp not null,

inventory_id int4 not null,

customer_id int2 not null,

return_date timestamp null,

staff_id int2 not null

);


drop table if exists staging.payment;


create table staging.payment (

payment_id int4 not null,

customer_id int2 not null,

staff_id int2 not null,

rental_id int4 not null,

amount numeric(5,2) not null,

payment_date timestamp not null

);


drop table if exists staging.staff;


create table staging.staff (

staff_id int4 NOT NULL,

first_name varchar(45) NOT NULL,

last_name varchar(45) NOT NULL,

store_id int2 NOT NULL

);


drop table if exists staging.address;


create table staging.address (

address_id int4 NOT NULL,

address varchar(50) NOT NULL,

district varchar(20) NOT NULL,

city_id int2 NOT NULL

);


drop table if exists staging.city;


CREATE TABLE staging.city (

city_id int4 NOT NULL,

city varchar(50) NOT NULL

);


drop table if exists staging.store;


CREATE TABLE staging.store (

store_id integer NOT NULL,

address_id int2 NOT NULL

);


-- создание процедур загрузки данных в staging слой


create or replace procedure staging.film_load()

 as $$

begin

delete from staging.film;


insert

into

staging.film

(film_id,

title,

description,

release_year,

language_id,

rental_duration,

rental_rate,

length,

replacement_cost,

rating,

last_update,

special_features,

fulltext)

select 

film_id,

title,

description,

release_year,

language_id,

rental_duration,

rental_rate,

length,

replacement_cost,

rating,

last_update,

special_features,

fulltext

from

film_src.film;

end;

$$ language plpgsql;


create or replace procedure staging.inventory_load()

as $$

begin

delete from staging.inventory;


insert into staging.inventory

(

inventory_id, 

film_id, 

store_id

)

select 

inventory_id, 

film_id, 

store_id

from

film_src.inventory i;

end;

$$ language plpgsql;


create or replace procedure staging.rental_load()

as $$

begin

delete from staging.rental;


insert into staging.rental

(

rental_id, 

rental_date, 

inventory_id, 

customer_id, 

return_date, 

staff_id

)

select 

rental_id, 

rental_date, 

inventory_id, 

customer_id, 

return_date, 

staff_id

from

film_src.rental;

end;


$$ language plpgsql;


create or replace procedure staging.payment_load()

as $$

begin

delete from staging.payment;


insert into staging.payment

(

payment_id, 

customer_id, 

staff_id, 

rental_id, 

amount, 

payment_date

)

select

payment_id, 

customer_id, 

staff_id, 

rental_id, 

amount, 

payment_date

from

film_src.payment;

end;

$$ language plpgsql;


create or replace procedure staging.staff_load()

as $$

begin 

delete from staging.staff;

insert into staging.staff

(

staff_id,

first_name,

last_name,

store_id

)

select

staff_id,

first_name,

last_name,

store_id 

from

film_src.staff s;

end;

$$ language plpgsql;



create or replace procedure staging.address_load()

as $$

begin 

delete from staging.address;

insert into staging.address

(

address_id,

address,

district,

city_id

)

select

address_id,

address,

district,

city_id

from 

film_src.address;

end;

$$ language plpgsql;


create or replace procedure staging.city_load()

as $$

begin 

delete from staging.city;

insert into staging.city

(

city_id,

city

)

select

city_id,

city

from

film_src.city;


end;

$$ language plpgsql;


create or replace procedure staging.store_load()

as $$

begin 

delete from staging.store;

insert into staging.store

(

store_id,

address_id

)

select

store_id,

address_id

from

film_src.store;


end;

$$ language plpgsql;


-- создание тадблиц core слоя


drop table if exists core.fact_payment;

drop table if exists core.fact_rental;

drop TABLE if exists core.dim_date;

drop table if exists core.dim_inventory;

drop table if exists core.dim_staff;



create table core.dim_date

(

  date_dim_pk INT primary key,

  date_actual DATE not null,

  epoch BIGINT not null,

  day_suffix VARCHAR(4) not null,

  day_name VARCHAR(11) not null,

  day_of_week INT not null,

  day_of_month INT not null,

  day_of_quarter INT not null,

  day_of_year INT not null,

  week_of_month INT not null,

  week_of_year INT not null,

  week_of_year_iso CHAR(10) not null,

  month_actual INT not null,

  month_name VARCHAR(9) not null,

  month_name_abbreviated CHAR(3) not null,

  quarter_actual INT not null,

  quarter_name VARCHAR(9) not null,

  year_actual INT not null,

  first_day_of_week DATE not null,

  last_day_of_week DATE not null,

  first_day_of_month DATE not null,

  last_day_of_month DATE not null,

  first_day_of_quarter DATE not null,

  last_day_of_quarter DATE not null,

  first_day_of_year DATE not null,

  last_day_of_year DATE not null,

  mmyyyy CHAR(6) not null,

  mmddyyyy CHAR(10) not null,

  weekend_indr BOOLEAN not null

);


create index dim_date_date_actual_idx

  on

core.dim_date(date_actual);


create table core.dim_inventory (

inventory_pk serial primary key,

inventory_id integer not null,

film_id integer not null,

title varchar(255) not null,

rental_duration int2 not null,

rental_rate numeric(4,2) not null,

length int2,

rating varchar(10)

);


create table core.dim_staff (

staff_pk serial primary key,

staff_id integer not null,

first_name varchar(45) not null,

last_name varchar(45) not null,

address varchar(50) not null,

district varchar(20) not null,

city_name varchar(50) not null

);


create table core.fact_payment (

payment_pk serial primary key,

payment_id integer not null,

amount numeric(7,2) not null,

payment_date_fk integer not null references core.dim_date(date_dim_pk),

inventory_fk integer not null references core.dim_inventory(inventory_pk),

staff_fk integer not null references core.dim_staff(staff_pk)

);


create table core.fact_rental (

rental_pk serial primary key,

rental_id integer not null,

inventory_fk integer not null references core.dim_inventory(inventory_pk),

staff_fk integer not null references core.dim_staff(staff_pk),

rental_date_fk integer not null references core.dim_date(date_dim_pk),

return_date_fk integer references core.dim_date(date_dim_pk),

cnt int2 not null,

amount numeric(7,2)

);


create or replace procedure core.load_date(sdate date, nm integer)

as $$

begin

SET lc_time = 'ru_RU';

INSERT INTO core.dim_date

SELECT TO_CHAR(datum, 'yyyymmdd')::INT AS date_dim_id,

      datum AS date_actual,

      EXTRACT(EPOCH FROM datum) AS epoch,

      TO_CHAR(datum, 'fmDDth') AS day_suffix,

      TO_CHAR(datum, 'TMDay') AS day_name,

      EXTRACT(ISODOW FROM datum) AS day_of_week,

      EXTRACT(DAY FROM datum) AS day_of_month,

      datum - DATE_TRUNC('quarter', datum)::DATE + 1 AS day_of_quarter,

      EXTRACT(DOY FROM datum) AS day_of_year,

      TO_CHAR(datum, 'W')::INT AS week_of_month,

      EXTRACT(WEEK FROM datum) AS week_of_year,

      EXTRACT(ISOYEAR FROM datum) || TO_CHAR(datum, '"-W"IW-') || EXTRACT(ISODOW FROM datum) AS week_of_year_iso,

      EXTRACT(MONTH FROM datum) AS month_actual,

      TO_CHAR(datum, 'TMMonth') AS month_name,

      TO_CHAR(datum, 'Mon') AS month_name_abbreviated,

      EXTRACT(QUARTER FROM datum) AS quarter_actual,

      CASE

          WHEN EXTRACT(QUARTER FROM datum) = 1 THEN 'First'

          WHEN EXTRACT(QUARTER FROM datum) = 2 THEN 'Second'

          WHEN EXTRACT(QUARTER FROM datum) = 3 THEN 'Third'

          WHEN EXTRACT(QUARTER FROM datum) = 4 THEN 'Fourth'

          END AS quarter_name,

      EXTRACT(YEAR FROM datum) AS year_actual,

      datum + (1 - EXTRACT(ISODOW FROM datum))::INT AS first_day_of_week,

      datum + (7 - EXTRACT(ISODOW FROM datum))::INT AS last_day_of_week,

      datum + (1 - EXTRACT(DAY FROM datum))::INT AS first_day_of_month,

      (DATE_TRUNC('MONTH', datum) + INTERVAL '1 MONTH - 1 day')::DATE AS last_day_of_month,

      DATE_TRUNC('quarter', datum)::DATE AS first_day_of_quarter,

      (DATE_TRUNC('quarter', datum) + INTERVAL '3 MONTH - 1 day')::DATE AS last_day_of_quarter,

      TO_DATE(EXTRACT(YEAR FROM datum) || '-01-01', 'YYYY-MM-DD') AS first_day_of_year,

      TO_DATE(EXTRACT(YEAR FROM datum) || '-12-31', 'YYYY-MM-DD') AS last_day_of_year,

      TO_CHAR(datum, 'mmyyyy') AS mmyyyy,

      TO_CHAR(datum, 'mmddyyyy') AS mmddyyyy,

      CASE

          WHEN EXTRACT(ISODOW FROM datum) IN (6, 7) THEN TRUE

          ELSE FALSE

          END AS weekend_indr

FROM (SELECT sdate + SEQUENCE.DAY AS datum

      FROM GENERATE_SERIES(0, nm - 1) AS SEQUENCE (DAY)

      ORDER BY SEQUENCE.day) DQ

ORDER BY 1;


end;

$$ language plpgsql;



create or replace procedure core.load_inventory()

as $$

begin 

delete from core.dim_inventory;

insert

into

core.dim_inventory

(

inventory_id,

film_id,

title,

rental_duration,

rental_rate,

length,

rating

)

select

i.inventory_id,

i.film_id,

f.title,

f.rental_duration,

f.rental_rate,

f.length,

f.rating 

from

staging.inventory i

join staging.film f using(film_id);


end;

$$ language plpgsql;


create or replace procedure core.load_staff()

as $$

begin 

delete from core.dim_staff;

insert into core.dim_staff

(

staff_id,

first_name,

last_name,

address,

district,

city_name

)

select

s.staff_id,

s.first_name,

s.last_name,

a.address,

a.district,

c.city 

from

staging.staff s

join staging.store st using (store_id)

join staging.address a using (address_id)

join staging.city c using (city_id);

end;

$$ language plpgsql;


create or replace procedure core.load_payment()

as $$

begin

delete from core.fact_payment;

insert into core.fact_payment

(

payment_id,

amount,

payment_date_fk,

inventory_fk,

staff_fk

)

select

p.payment_id,

p.amount,

dt.date_dim_pk as payment_date_fk,

di.inventory_pk as inventory_fk,

ds.staff_pk as staff_fk

from

staging.payment p

join staging.rental r using (rental_id)

join core.dim_inventory di using (inventory_id)

join core.dim_staff ds on p.staff_id = ds.staff_id

join core.dim_date dt on dt.date_actual = p.payment_date::date;


end;

$$ language plpgsql;


create or replace procedure core.load_rental()

as $$

begin 

delete from core.fact_rental;

insert into core.fact_rental

(

rental_id,

inventory_fk,

staff_fk,

rental_date_fk,

return_date_fk,

amount,

cnt

)

select

r.rental_id,

i.inventory_pk as inventory_fk,

s.staff_pk as staff_fk,

dt_rental.date_dim_pk as rental_date_fk,

dt_return.date_dim_pk as return_date_fk,

sum(p.amount) as amount,

count(*) as cnt

from

staging.rental r

join core.dim_inventory i using (inventory_id)

join core.dim_staff s on s.staff_id = r.staff_id

join core.dim_date dt_rental on dt_rental.date_actual = r.rental_date::date

left join staging.payment p using (rental_id)

left join core.dim_date dt_return on dt_return.date_actual = r.return_date::date

group by

r.rental_id,

i.inventory_pk,

s.staff_pk,

dt_rental.date_dim_pk,

dt_return.date_dim_pk;


end

$$ language plpgsql;



create or replace procedure core.fact_delete()

as $$

begin

delete from core.fact_payment;

delete from core.fact_rental;

end

$$ language plpgsql;


-- создание data mart слоя


drop table if exists report.sales_date;


create table report.sales_date (

date_title varchar(20) not null,

amount numeric(7,2) not null,

date_sort integer not null

);


create or replace procedure report.sales_date_calc()

as $$

begin 

delete from report.sales_date;

insert

into

report.sales_date

(

date_title, --'1 сентября 2022'

amount,

date_sort

)

select

dt.day_of_month || ' ' || dt.month_name || ' ' || dt.year_actual as date_title,

sum(fp.amount) as amount,

dt.date_dim_pk as date_sort

from

core.fact_payment fp

join core.dim_date dt

on fp.payment_date_fk = dt.date_dim_pk

group by

dt.day_of_month || ' ' || dt.month_name || ' ' || dt.year_actual,

dt.date_dim_pk;


end

$$ language plpgsql;




--


create or replace procedure full_load()

as $$

begin

call staging.film_load();

call staging.inventory_load();

call staging.rental_load();

call staging.payment_load();

call staging.staff_load();

call staging.address_load();

call staging.city_load();

call staging.store_load();

call core.fact_delete();

call core.load_inventory();

call core.load_staff();

call core.load_payment();

call core.load_rental();

call report.sales_date_calc();

end;

$$ language plpgsql;



call core.load_date('2007-01-01'::date, 5843);

call full_load();




Код с занятия в одном файле: 
https://docs.google.com/document/d/1QKaMxQKVcq4JEcBK1fIkmT2DzgXpfur2FlLRDZgi5RY/edit?usp=sharing 

Last modified: Sunday, 31 July 2022, 3:58 PM