Amazon RDS PostgreSQL 로그를 Amazon S3로 자동 내보내기
RDS for PostgreSQL의 로그를 Amazon S3로 복사하거나 아카이브해야 하는 상황이 있다.
보통 보안 감사때문인데, 데이터베이스에서 모든 DDL 또는 DML 활동을 감사해야 하는 규제 요구사항이 있다.
Amazon RDS for PostgreSQL(버전 12.5+) 및 Amazon Aurora PostgreSQL(버전 12.6+)에서는 pg_cron, log_fdw 및 aws_s3와 같은 확장을 도입하여 PostgreSQL 데이터베이스 로그를 S3로 자동으로 내보내는 것이 훨씬 쉬워졌으니 아래에서 확인해보자.
사전 준비 사항
- AWS 계정 및 접근 권한
- AWS CLI 설치 및 구성
- RDS PostgreSQL 인스턴스(버전 12.5+)
- 데이터베이스 인스턴스에 접근할 수 있는 psql 클라이언트가 설치 및 구성된 Amazon EC2 인스턴스
- 로그를 저장할 S3 버킷
- S3 버킷에 접근할 수 있는 IAM 역할 및 정책
사용자 지정 파라미터 그룹 생성 및 확장 지원 허용
먼저, RDS PostgreSQL 인스턴스를 위한 파라미터 그룹을 생성하여 로그를 S3로 자동으로 내보낼 수 있도록 설정해야 한다.
사용중인 파라미터 그룹을 수정하여 pg_cron 및 log_fdw를 지원하도록 한다.
IAM 역할 및 정책 생성
Amazon RDS가 Amazon S3에 접근할 수 있도록 하는 IAM 역할과 정책을 설정하는데, 최소 권한 원칙을 준수하여 보안을 강화한다.
해당 예제에서는 s3-export-role이라는 역할을 생성하겠다.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "rds.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3export",
"Action": [
"s3:PutObject",
"s3:AbortMultipartUpload"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::hamster-test-s3/*",
"arn:aws:s3:::hamster-test-s3"
]
}
]
}
당연히 위의 Resoure 섹션은 사용자의 S3를 사용하는걸로
그리고 해당 역할을 RDS PostgreSQL에 연결하자.
PostgreSQL에 접속하기
다음 명령어로 PostgreSQL에 연결한다
psql -host={RDS_HOST} --port=5432 --username={USER} {DB} --password
예시:
psql --host=hamster1-instance-1.abcde10abcde.us-east-1.rds.amazonaws.com --port=5432 --user=hamster postgres --password
이제 로그를 내보낼 데이터베이스와 기존 PostgreSQL 데이터베이스에서 작업해야 한다.
로그 추출 작업 설정
로그를 추출할 데이터베이스에서 작업을 시작하는데, 여기선 예제로 hamsterdb를 사용하겠다.
먼저 확장을 설치하고 확인한다:
CREATE EXTENSION aws_s3 CASCADE;
CREATE EXTENSION log_fdw;
\dx
확장이 로드되면 PostgreSQL DB 로그 파일을 테이블로 로드하는 함수를 생성하는데,
아래 함수 정의는 GitHub에서 확인할 수 있다.
PostgreSQL 로그 파일 로드 함수 생성
CREATE OR REPLACE FUNCTION public.load_postgres_log_files(v_schema_name TEXT DEFAULT 'logs', v_table_name TEXT DEFAULT 'postgres_logs', v_prefer_csv BOOLEAN DEFAULT TRUE)
RETURNS TEXT
AS
$BODY$
-- 함수 내용 생략 (GitHub에서 확인 가능)
$BODY$
LANGUAGE plpgsql;
링크 타고 들어가기 귀찮은 사람들을 위해 복사도 해놨다. ^^
-- Yaser Raja
-- AWS Professional Services
--
-- This function uses log_fdw to load all the available RDS / Aurora PostgreSQL DB log files as a table.
--
-- Usage:
-- 1) Create this function
-- 2) Run the following to load all the log files
-- SELECT public.load_postgres_log_files();
-- 3) Start looking at the logs
-- SELECT * FROM logs.postgres_logs;
--
-- Here are the key features:
-- - By default, a table named "postgres_logs" is created in schema "logs".
-- - The schema name and table name can be changed via arguments.
-- - If the table already exists, it will be DROPPED
-- - If the schema 'logs' does not exist, it will be created.
-- - Each log file is loaded as a foreign table and then made child of table logs.postgres_logs
-- - By default, CSV file format is preferred, it can be changed via argument v_prefer_csv
-- - Daily, hourly and minute-based log file name formats are supported for CSV and non-CSV output files
-- - postgresql.log.YYYY-MM-DD-HHMI
-- - postgresql.log.YYYY-MM-DD-HH
-- - postgresql.log.YYYY-MM-DD
-- - Supports the scenario where log files list consist of both the file name formats
-- - When CSV format is used, a check-constraint is added to the child table created for each log file
--
CREATE OR REPLACE FUNCTION public.load_postgres_log_files(v_schema_name TEXT DEFAULT 'logs', v_table_name TEXT DEFAULT 'postgres_logs', v_prefer_csv BOOLEAN DEFAULT TRUE)
RETURNS TEXT
AS
$BODY$
DECLARE
v_csv_supported INT := 0;
v_hour_pattern_used INT := 0;
v_filename TEXT;
v_dt timestamptz;
v_dt_max timestamptz;
v_partition_name TEXT;
v_ext_exists INT := 0;
v_server_exists INT := 0;
v_table_exists INT := 0;
v_server_name TEXT := 'log_server';
v_filelist_sql TEXT;
v_enable_csv BOOLEAN := TRUE;
BEGIN
EXECUTE FORMAT('SELECT count(1) FROM pg_catalog.pg_extension WHERE extname=%L', 'log_fdw') INTO v_ext_exists;
IF v_ext_exists = 0 THEN
CREATE EXTENSION log_fdw;
END IF;
EXECUTE 'SELECT count(1) FROM pg_catalog.pg_foreign_server WHERE srvname=$1' INTO v_server_exists USING v_server_name;
IF v_server_exists = 0 THEN
EXECUTE FORMAT('CREATE SERVER %s FOREIGN DATA WRAPPER log_fdw', v_server_name);
END IF;
EXECUTE FORMAT('CREATE SCHEMA IF NOT EXISTS %I', v_schema_name);
-- Set the search path to make sure the tables are created in dblogs schema
EXECUTE FORMAT('SELECT set_config(%L, %L, TRUE)', 'search_path', v_schema_name);
-- The db log files are in UTC timezone so that date extracted from filename will also be UTC.
-- Setting timezone to get correct table constraints.
EXECUTE FORMAT('SELECT set_config(%L, %L, TRUE)', 'timezone', 'UTC');
-- Check the parent table exists
EXECUTE 'SELECT count(1) FROM information_schema.tables WHERE table_schema=$1 AND table_name=$2' INTO v_table_exists USING v_schema_name, v_table_name;
IF v_table_exists = 1 THEN
RAISE NOTICE 'Table % already exists. It will be dropped.', v_table_name;
EXECUTE FORMAT('SELECT set_config(%L, %L, TRUE)', 'client_min_messages', 'WARNING');
EXECUTE FORMAT('DROP TABLE %I CASCADE', v_table_name);
EXECUTE FORMAT('SELECT set_config(%L, %L, TRUE)', 'client_min_messages', 'NOTICE');
v_table_exists = 0;
END IF;
-- Check the pg log format
SELECT 1 INTO v_csv_supported FROM pg_catalog.pg_settings WHERE name='log_destination' AND setting LIKE '%csvlog%';
IF v_csv_supported = 1 AND v_prefer_csv = TRUE THEN
RAISE NOTICE 'CSV log format will be used.';
v_filelist_sql = FORMAT('SELECT file_name FROM public.list_postgres_log_files() WHERE file_name LIKE %L ORDER BY 1 DESC', '%.csv');
ELSE
RAISE NOTICE 'Default log format will be used.';
v_filelist_sql = FORMAT('SELECT file_name FROM public.list_postgres_log_files() WHERE file_name NOT LIKE %L ORDER BY 1 DESC', '%.csv');
v_enable_csv = FALSE;
END IF;
FOR v_filename IN EXECUTE (v_filelist_sql)
LOOP
RAISE NOTICE 'Processing log file - %', v_filename;
IF v_enable_csv = TRUE THEN
-- Dynamically checking the file name pattern so that both allowed file names patters are parsed
IF v_filename like 'postgresql.log.____-__-__-____.csv' THEN
v_dt=substring(v_filename from 'postgresql.log.#"%#"-____.csv' for '#')::timestamp + INTERVAL '1 HOUR' * (substring(v_filename from 'postgresql.log.____-__-__-#"%#"__.csv' for '#')::int);
v_dt_max = v_dt + INTERVAL '1 HOUR';
v_dt=substring(v_filename from 'postgresql.log.#"%#"-____.csv' for '#')::timestamp + INTERVAL '1 HOUR' * (substring(v_filename from 'postgresql.log.____-__-__-#"%#"__.csv' for '#')::int) + INTERVAL '1 MINUTE' * (substring(v_filename from 'postgresql.log.____-__-__-__#"%#".csv' for '#')::int);
ELSIF v_filename like 'postgresql.log.____-__-__-__.csv' THEN
v_dt=substring(v_filename from 'postgresql.log.#"%#"-__.csv' for '#')::timestamp + INTERVAL '1 HOUR' * (substring(v_filename from 'postgresql.log.____-__-__-#"%#".csv' for '#')::int);
v_dt_max = v_dt + INTERVAL '1 HOUR';
ELSIF v_filename like 'postgresql.log.____-__-__.csv' THEN
v_dt=substring(v_filename from 'postgresql.log.#"%#".csv' for '#')::timestamp;
v_dt_max = v_dt + INTERVAL '1 DAY';
ELSE
RAISE NOTICE ' Skipping file';
CONTINUE;
END IF;
ELSE
IF v_filename like 'postgresql.log.____-__-__-____' THEN
v_dt=substring(v_filename from 'postgresql.log.#"%#"-____' for '#')::timestamp + INTERVAL '1 HOUR' * (substring(v_filename from 'postgresql.log.____-__-__-#"%#"__' for '#')::int) + INTERVAL '1 MINUTE' * (substring(v_filename from 'postgresql.log.____-__-__-__#"%#"' for '#')::int);
ELSIF v_filename like 'postgresql.log.____-__-__-__' THEN
v_dt=substring(v_filename from 'postgresql.log.#"%#"-__' for '#')::timestamp + INTERVAL '1 HOUR' * (substring(v_filename from 'postgresql.log.____-__-__-#"%#"' for '#')::int);
ELSIF v_filename like 'postgresql.log.____-__-__' THEN
v_dt=substring(v_filename from 'postgresql.log.#"%#"' for '#')::timestamp;
ELSE
RAISE NOTICE ' Skipping file';
CONTINUE;
END IF;
END IF;
v_partition_name=CONCAT(v_table_name, '_', to_char(v_dt, 'YYYYMMDD_HH24MI'));
EXECUTE FORMAT('SELECT public.create_foreign_table_for_log_file(%L, %L, %L)', v_partition_name, v_server_name, v_filename);
IF v_table_exists = 0 THEN
EXECUTE FORMAT('CREATE TABLE %I (LIKE %I INCLUDING ALL)', v_table_name, v_partition_name);
v_table_exists = 1;
END IF;
EXECUTE FORMAT('ALTER TABLE %I INHERIT %I', v_partition_name, v_table_name);
IF v_enable_csv = TRUE THEN
EXECUTE FORMAT('ALTER TABLE %I ADD CONSTRAINT check_date_range CHECK (log_time>=%L and log_time < %L)', v_partition_name, v_dt, v_dt_max);
END IF;
END LOOP;
RETURN FORMAT('Postgres logs loaded to table %I.%I', v_schema_name, v_table_name);
END;
$BODY$
LANGUAGE plpgsql;
함수 생성 후 다음 명령어로 PostgreSQL 로그를 데이터베이스에 로드할 수 있다.
SELECT public.load_postgres_log_files();
다음 PostgreSQL 데이터베이스로 전환하여 pg_cron 확장을 설치하고 확인한다
\c postgres
CREATE EXTENSION pg_cron;
\dx
크론 작업 설정
크론 스케줄러를 설정하여 로그를 S3로 매 시간 내보내도록 한다.
다음은 한 시간 단위로 내보내는 예시이다.
SELECT cron.schedule(
'postgres-s3-log-uploads',
'0 * * * *',
$BODY$
DO $$
BEGIN
PERFORM public.load_postgres_log_files();
PERFORM aws_s3.query_export_to_s3(
'SELECT * FROM logs.postgres_logs WHERE log_time >= NOW() - INTERVAL ''1 hour'' ORDER BY log_time DESC;',
'hamster-test-s3',
'logs_exported',
'us-east-1'
);
END;
$$;
$BODY$
) AS jobid;
이 작업이 실행되지 않으면 다음 쿼리로 로그를 확인할 수 있다.
SELECT * FROM cron.job_run_details
WHERE jobid IN (SELECT jobid FROM cron.job WHERE jobname = 'postgres-s3-log-uploads')
ORDER BY start_time DESC
LIMIT 5;
동적 파일 이름 설정
동일한 이름의 파일이 생성되어 S3로 내보낼 때 덮어쓰기가 발생하지 않도록 동적 파일 이름을 설정하는 함수도 생성할 수 있다.
\c your_database
CREATE OR REPLACE FUNCTION export_logs_to_s3_dynamic() RETURNS void AS $$
DECLARE
dynamic_filename TEXT;
BEGIN
-- 동적 파일 이름 생성
dynamic_filename := 'logs_exported_' || to_char(now(), 'YYYYMMDD_HH24MI');
-- 로그 로드 및 내보내기
PERFORM public.load_postgres_log_files();
PERFORM aws_s3.query_export_to_s3(
'SELECT * FROM logs.postgres_logs WHERE log_time >= NOW() - INTERVAL ''1 hour'' ORDER BY log_time DESC;',
aws_commons.create_s3_uri('hamster-test-s3', dynamic_filename, 'us-east-1')
);
END;
$$ LANGUAGE plpgsql;
이제 PostgreSQL 데이터베이스에서 크론 작업을 설정하여 위의 함수를 사용하면 되겠다.
\c postgres
SELECT cron.schedule(
'postgres-s3-log-uploads',
'0 * * * *',
$$SELECT export_logs_to_s3_dynamic();$$
) AS jobid;
참고 자료
Automate PostgreSQL Log Exports to Amazon S3 Using Extensions