Cloud/Aws

Amazon RDS PostgreSQL 로그를 Amazon S3로 자동 내보내기

HemMu 2024. 6. 20. 16:16

RDS for PostgreSQL의 로그를 Amazon S3로 복사하거나 아카이브해야 하는 상황이 있다.

보통 보안 감사때문인데, 데이터베이스에서 모든 DDL 또는 DML 활동을 감사해야 하는 규제 요구사항이 있다.

 

Amazon RDS for PostgreSQL(버전 12.5+) 및 Amazon Aurora PostgreSQL(버전 12.6+)에서는 pg_cron, log_fdw 및 aws_s3와 같은 확장을 도입하여 PostgreSQL 데이터베이스 로그를 S3로 자동으로 내보내는 것이 훨씬 쉬워졌으니 아래에서 확인해보자.

사전 준비 사항

  1. AWS 계정 및 접근 권한
  2. AWS CLI 설치 및 구성
  3. RDS PostgreSQL 인스턴스(버전 12.5+)
  4. 데이터베이스 인스턴스에 접근할 수 있는 psql 클라이언트가 설치 및 구성된 Amazon EC2 인스턴스
  5. 로그를 저장할 S3 버킷
  6. S3 버킷에 접근할 수 있는 IAM 역할 및 정책

사용자 지정 파라미터 그룹 생성 및 확장 지원 허용

먼저, RDS PostgreSQL 인스턴스를 위한 파라미터 그룹을 생성하여 로그를 S3로 자동으로 내보낼 수 있도록 설정해야 한다.

사용중인 파라미터 그룹을 수정하여 pg_cron 및 log_fdw를 지원하도록 한다.

IAM 역할 및 정책 생성

Amazon RDS가 Amazon S3에 접근할 수 있도록 하는 IAM 역할과 정책을 설정하는데, 최소 권한 원칙을 준수하여 보안을 강화한다.

해당 예제에서는 s3-export-role이라는 역할을 생성하겠다.

IAM 신뢰관계

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "rds.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

 

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "s3export",
            "Action": [
                "s3:PutObject",
                "s3:AbortMultipartUpload"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::hamster-test-s3/*",
                "arn:aws:s3:::hamster-test-s3"
            ]
        }
    ]
}


당연히 위의 Resoure 섹션은 사용자의 S3를 사용하는걸로

 

그리고 해당 역할을 RDS  PostgreSQL에 연결하자.

PostgreSQL에 접속하기

다음 명령어로 PostgreSQL에 연결한다

psql -host={RDS_HOST} --port=5432 --username={USER} {DB} --password

예시:

psql --host=hamster1-instance-1.abcde10abcde.us-east-1.rds.amazonaws.com --port=5432 --user=hamster postgres --password

이제 로그를 내보낼 데이터베이스와 기존 PostgreSQL 데이터베이스에서 작업해야 한다.

로그 추출 작업 설정

로그를 추출할 데이터베이스에서 작업을 시작하는데, 여기선 예제로 hamsterdb를 사용하겠다.

먼저 확장을 설치하고 확인한다:

CREATE EXTENSION aws_s3 CASCADE;
CREATE EXTENSION log_fdw;
\dx

확장이 로드되면 PostgreSQL DB 로그 파일을 테이블로 로드하는 함수를 생성하는데,

아래 함수 정의는 GitHub에서 확인할 수 있다.

PostgreSQL 로그 파일 로드 함수 생성

CREATE OR REPLACE FUNCTION public.load_postgres_log_files(v_schema_name TEXT DEFAULT 'logs', v_table_name TEXT DEFAULT 'postgres_logs', v_prefer_csv BOOLEAN DEFAULT TRUE)
RETURNS TEXT
AS
$BODY$
-- 함수 내용 생략 (GitHub에서 확인 가능)
$BODY$
LANGUAGE plpgsql;

링크 타고 들어가기 귀찮은 사람들을 위해 복사도 해놨다. ^^

더보기

-- Yaser Raja
-- AWS Professional Services
--
-- This function uses log_fdw to load all the available RDS / Aurora PostgreSQL DB log files as a table.
--
-- Usage:
--    1) Create this function
--    2) Run the following to load all the log files
--          SELECT public.load_postgres_log_files();
--    3) Start looking at the logs
--          SELECT * FROM logs.postgres_logs;
--
-- Here are the key features:
--   - By default, a table named "postgres_logs" is created in schema "logs".
--   - The schema name and table name can be changed via arguments.
--   - If the table already exists, it will be DROPPED
--   - If the schema 'logs' does not exist, it will be created.
--   - Each log file is loaded as a foreign table and then made child of table logs.postgres_logs
--   - By default, CSV file format is preferred, it can be changed via argument v_prefer_csv
--   - Daily, hourly and minute-based log file name formats are supported for CSV and non-CSV output files
--       - postgresql.log.YYYY-MM-DD-HHMI
--       - postgresql.log.YYYY-MM-DD-HH
--       - postgresql.log.YYYY-MM-DD
--   - Supports the scenario where log files list consist of both the file name formats
--   - When CSV format is used, a check-constraint is added to the child table created for each log file
--
CREATE OR REPLACE FUNCTION public.load_postgres_log_files(v_schema_name TEXT DEFAULT 'logs', v_table_name TEXT DEFAULT 'postgres_logs', v_prefer_csv BOOLEAN DEFAULT TRUE)
RETURNS TEXT
AS
$BODY$
DECLARE
    v_csv_supported INT := 0;
    v_hour_pattern_used INT := 0;
    v_filename TEXT;
    v_dt timestamptz;
    v_dt_max timestamptz;
    v_partition_name TEXT;
    v_ext_exists INT := 0;
    v_server_exists INT := 0;
    v_table_exists INT := 0;
    v_server_name TEXT := 'log_server';
    v_filelist_sql TEXT;
    v_enable_csv BOOLEAN := TRUE;
BEGIN
    EXECUTE FORMAT('SELECT count(1) FROM pg_catalog.pg_extension WHERE extname=%L', 'log_fdw') INTO v_ext_exists;
    IF v_ext_exists = 0 THEN
        CREATE EXTENSION log_fdw;
    END IF;

    EXECUTE 'SELECT count(1) FROM pg_catalog.pg_foreign_server WHERE srvname=$1' INTO v_server_exists USING v_server_name;
    IF v_server_exists = 0 THEN
        EXECUTE FORMAT('CREATE SERVER %s FOREIGN DATA WRAPPER log_fdw', v_server_name);
    END IF;

    EXECUTE FORMAT('CREATE SCHEMA IF NOT EXISTS %I', v_schema_name);

    -- Set the search path to make sure the tables are created in dblogs schema
    EXECUTE FORMAT('SELECT set_config(%L, %L, TRUE)', 'search_path', v_schema_name);

    -- The db log files are in UTC timezone so that date extracted from filename will also be UTC.
    --    Setting timezone to get correct table constraints.
    EXECUTE FORMAT('SELECT set_config(%L, %L, TRUE)', 'timezone', 'UTC');

    -- Check the parent table exists
    EXECUTE 'SELECT count(1) FROM information_schema.tables WHERE table_schema=$1 AND table_name=$2' INTO v_table_exists USING v_schema_name, v_table_name;
    IF v_table_exists = 1 THEN
        RAISE NOTICE 'Table % already exists. It will be dropped.', v_table_name;
        EXECUTE FORMAT('SELECT set_config(%L, %L, TRUE)', 'client_min_messages', 'WARNING');
        EXECUTE FORMAT('DROP TABLE %I CASCADE', v_table_name);
        EXECUTE FORMAT('SELECT set_config(%L, %L, TRUE)', 'client_min_messages', 'NOTICE');
        v_table_exists = 0;
    END IF;

    -- Check the pg log format
    SELECT 1 INTO v_csv_supported FROM pg_catalog.pg_settings WHERE name='log_destination' AND setting LIKE '%csvlog%';
    IF v_csv_supported = 1 AND v_prefer_csv = TRUE THEN
        RAISE NOTICE 'CSV log format will be used.';
        v_filelist_sql = FORMAT('SELECT file_name FROM public.list_postgres_log_files() WHERE file_name LIKE %L ORDER BY 1 DESC', '%.csv');
    ELSE
        RAISE NOTICE 'Default log format will be used.';
        v_filelist_sql = FORMAT('SELECT file_name FROM public.list_postgres_log_files() WHERE file_name NOT LIKE %L ORDER BY 1 DESC', '%.csv');
        v_enable_csv = FALSE;
    END IF;

    FOR v_filename IN EXECUTE (v_filelist_sql)
    LOOP
        RAISE NOTICE 'Processing log file - %', v_filename;

        IF v_enable_csv = TRUE THEN
            -- Dynamically checking the file name pattern so that both allowed file names patters are parsed
            IF v_filename like 'postgresql.log.____-__-__-____.csv' THEN
                v_dt=substring(v_filename from 'postgresql.log.#"%#"-____.csv' for '#')::timestamp + INTERVAL '1 HOUR' * (substring(v_filename from 'postgresql.log.____-__-__-#"%#"__.csv' for '#')::int);
                v_dt_max = v_dt + INTERVAL '1 HOUR';
                v_dt=substring(v_filename from 'postgresql.log.#"%#"-____.csv' for '#')::timestamp + INTERVAL '1 HOUR' * (substring(v_filename from 'postgresql.log.____-__-__-#"%#"__.csv' for '#')::int) + INTERVAL '1 MINUTE' * (substring(v_filename from 'postgresql.log.____-__-__-__#"%#".csv' for '#')::int);
            ELSIF v_filename like 'postgresql.log.____-__-__-__.csv' THEN
                v_dt=substring(v_filename from 'postgresql.log.#"%#"-__.csv' for '#')::timestamp + INTERVAL '1 HOUR' * (substring(v_filename from 'postgresql.log.____-__-__-#"%#".csv' for '#')::int);
                v_dt_max = v_dt + INTERVAL '1 HOUR';
            ELSIF v_filename like 'postgresql.log.____-__-__.csv' THEN
                v_dt=substring(v_filename from 'postgresql.log.#"%#".csv' for '#')::timestamp;
                v_dt_max = v_dt + INTERVAL '1 DAY';
            ELSE
                RAISE NOTICE '        Skipping file';
                CONTINUE;
            END IF;
        ELSE
            IF v_filename like 'postgresql.log.____-__-__-____' THEN
                v_dt=substring(v_filename from 'postgresql.log.#"%#"-____' for '#')::timestamp + INTERVAL '1 HOUR' * (substring(v_filename from 'postgresql.log.____-__-__-#"%#"__' for '#')::int) + INTERVAL '1 MINUTE' * (substring(v_filename from 'postgresql.log.____-__-__-__#"%#"' for '#')::int);
            ELSIF v_filename like 'postgresql.log.____-__-__-__' THEN
                v_dt=substring(v_filename from 'postgresql.log.#"%#"-__' for '#')::timestamp + INTERVAL '1 HOUR' * (substring(v_filename from 'postgresql.log.____-__-__-#"%#"' for '#')::int);
            ELSIF v_filename like 'postgresql.log.____-__-__' THEN
                v_dt=substring(v_filename from 'postgresql.log.#"%#"' for '#')::timestamp;
            ELSE
                RAISE NOTICE '        Skipping file';
                CONTINUE;
            END IF;
        END IF;
        v_partition_name=CONCAT(v_table_name, '_', to_char(v_dt, 'YYYYMMDD_HH24MI'));
        EXECUTE FORMAT('SELECT public.create_foreign_table_for_log_file(%L, %L, %L)', v_partition_name, v_server_name, v_filename);

        IF v_table_exists = 0 THEN
            EXECUTE FORMAT('CREATE TABLE %I (LIKE %I INCLUDING ALL)', v_table_name, v_partition_name);
            v_table_exists = 1;
        END IF;

        EXECUTE FORMAT('ALTER TABLE %I INHERIT %I', v_partition_name, v_table_name);

        IF v_enable_csv = TRUE THEN
            EXECUTE FORMAT('ALTER TABLE %I ADD CONSTRAINT check_date_range CHECK (log_time>=%L and log_time < %L)', v_partition_name, v_dt, v_dt_max);
        END IF;

    END LOOP;

    RETURN FORMAT('Postgres logs loaded to table %I.%I', v_schema_name, v_table_name);
END;
$BODY$
LANGUAGE plpgsql;

함수 생성 후 다음 명령어로 PostgreSQL 로그를 데이터베이스에 로드할 수 있다.

SELECT public.load_postgres_log_files();

다음 PostgreSQL 데이터베이스로 전환하여 pg_cron 확장을 설치하고 확인한다

\c postgres
CREATE EXTENSION pg_cron;
\dx

크론 작업 설정

크론 스케줄러를 설정하여 로그를 S3로 매 시간 내보내도록 한다.

다음은 한 시간 단위로 내보내는 예시이다.

SELECT cron.schedule(
    'postgres-s3-log-uploads',
    '0 * * * *',
    $BODY$
    DO $$
    BEGIN
        PERFORM public.load_postgres_log_files();
        PERFORM aws_s3.query_export_to_s3(
            'SELECT * FROM logs.postgres_logs WHERE log_time >= NOW() - INTERVAL ''1 hour'' ORDER BY log_time DESC;',
            'hamster-test-s3',
            'logs_exported',
            'us-east-1'
        );
    END;
    $$;
    $BODY$
) AS jobid;


이 작업이 실행되지 않으면 다음 쿼리로 로그를 확인할 수 있다.

SELECT * FROM cron.job_run_details
WHERE jobid IN (SELECT jobid FROM cron.job WHERE jobname = 'postgres-s3-log-uploads')
ORDER BY start_time DESC
LIMIT 5;

동적 파일 이름 설정

동일한 이름의 파일이 생성되어 S3로 내보낼 때 덮어쓰기가 발생하지 않도록 동적 파일 이름을 설정하는 함수도 생성할 수 있다.

\c your_database

CREATE OR REPLACE FUNCTION export_logs_to_s3_dynamic() RETURNS void AS $$
DECLARE
    dynamic_filename TEXT;
BEGIN
    -- 동적 파일 이름 생성
    dynamic_filename := 'logs_exported_' || to_char(now(), 'YYYYMMDD_HH24MI');

    -- 로그 로드 및 내보내기
    PERFORM public.load_postgres_log_files();
    PERFORM aws_s3.query_export_to_s3(
        'SELECT * FROM logs.postgres_logs WHERE log_time >= NOW() - INTERVAL ''1 hour'' ORDER BY log_time DESC;',
        aws_commons.create_s3_uri('hamster-test-s3', dynamic_filename, 'us-east-1')
    );
END;
$$ LANGUAGE plpgsql;

이제 PostgreSQL 데이터베이스에서 크론 작업을 설정하여 위의 함수를 사용하면 되겠다.

\c postgres

SELECT cron.schedule(
    'postgres-s3-log-uploads',
    '0 * * * *',
    $$SELECT export_logs_to_s3_dynamic();$$
) AS jobid;

참고 자료

Automate PostgreSQL Log Exports to Amazon S3 Using Extensions