본문 바로가기
BIG DATA/NiFi

NiFi 2.2.0 PostgreSQL to CSV 예제

by 골든크랩 2025. 6. 2.
728x90
반응형

NiFi 2.2.0에서 PostgreSQL을 조회하여 CSV 파일을 만드는 완전한 예제를 만들어드렸습니다.

주요 구성 요소는 다음과 같습니다:

  1. PostgreSQL 설정: users 테이블 생성 및 10개의 샘플 데이터
  2. NiFi Flow: GenerateFlowFile → ExecuteSQLRecord → ConvertRecord → PutFile
  3. Controller Services: DBCPConnectionPool, JsonTreeReader, CSVRecordSetWriter

핵심 포인트:

  • ExecuteSQLRecord가 PostgreSQL에서 데이터를 JSON 형태로 가져옵니다
  • ConvertRecord가 JSON을 CSV로 변환합니다
  • PutFile이 최종 CSV 파일을 저장합니다

시작하기 전에 확인해야 할 사항:

  1. PostgreSQL JDBC 드라이버(postgresql-42.x.x.jar)를 NiFi의 lib 디렉토리에 복사
  2. PostgreSQL 서버가 실행 중이고 접근 가능한지 확인
  3. 데이터베이스와 테이블이 생성되어 있는지 확인

 

다운로드 : https://jdbc.postgresql.org/download/

 

docker cp ./postgresql-42.7.6.jar nifi-2200:/opt/nifi/nifi-2.2.0/lib/

 

확인 : docker exec -u 0 -it nifi-2200 bash

          cd /opt/nifi/nifi-2.2.0/lib/

1. PostgreSQL 테이블 생성 및 샘플 데이터

테이블 생성 스크립트

 
sql
-- 사용자 정보 테이블 생성
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    full_name VARCHAR(100),
    age INTEGER,
    city VARCHAR(50),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 인덱스 생성 (성능 향상)
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_city ON users(city);

샘플 데이터 삽입

 
sql
-- 샘플 데이터 삽입
INSERT INTO users (username, email, full_name, age, city) VALUES
('john_doe', 'john.doe@email.com', 'John Doe', 28, 'Seoul'),
('jane_smith', 'jane.smith@email.com', 'Jane Smith', 32, 'Busan'),
('mike_wilson', 'mike.wilson@email.com', 'Mike Wilson', 25, 'Incheon'),
('sarah_johnson', 'sarah.johnson@email.com', 'Sarah Johnson', 29, 'Daegu'),
('david_brown', 'david.brown@email.com', 'David Brown', 35, 'Seoul'),
('lisa_davis', 'lisa.davis@email.com', 'Lisa Davis', 27, 'Gwangju'),
('tom_miller', 'tom.miller@email.com', 'Tom Miller', 31, 'Daejeon'),
('amy_garcia', 'amy.garcia@email.com', 'Amy Garcia', 26, 'Seoul'),
('chris_lee', 'chris.lee@email.com', 'Chris Lee', 30, 'Busan'),
('emma_taylor', 'emma.taylor@email.com', 'Emma Taylor', 24, 'Ulsan');

-- 데이터 확인
SELECT * FROM users ORDER BY id;

2. NiFi Flow 구성

필요한 Processor들

  1. GenerateFlowFile - 주기적으로 Flow 트리거
  2. ExecuteSQLRecord - PostgreSQL 쿼리 실행
  3. ConvertRecord - JSON을 CSV로 변환
  4. PutFile - CSV 파일로 저장

Controller Services 설정

DBCPConnectionPool 설정

 
properties
Database Connection URL: jdbc:postgresql://localhost:5432/your_database_name
Database Driver Class Name: org.postgresql.Driver
Database Driver Location(s): /path/to/postgresql-42.x.x.jar
Database User: your_username
Database Password: your_password
Max Wait Time: 500 millis
Max Total Connections: 8

JsonTreeReader 설정

 
properties
Schema Access Strategy: Infer Schema

CSVRecordSetWriter 설정

 
properties
Schema Write Strategy: Set 'schema.name' Attribute
Schema Access Strategy: Inherit Record Schema
CSV Format: RFC4180
Include Header Line: true

3. Processor 상세 설정

3.1 GenerateFlowFile

 
properties
File Size: 0B
Batch Size: 1
Unique FlowFiles: false

Scheduling:

  • Run Schedule: 30 sec (또는 원하는 주기)

3.2 ExecuteSQLRecord

 
properties
Database Connection Pooling Service: DBCPConnectionPool
SQL select query: 
  SELECT 
    id,
    username,
    email,
    full_name,
    age,
    city,
    created_at
  FROM users 
  WHERE age >= 25 
  ORDER BY created_at DESC

Record Writer: JsonRecordSetWriter
Include Zero Record FlowFiles: false

3.3 ConvertRecord

 
properties
Record Reader: JsonTreeReader
Record Writer: CSVRecordSetWriter
Include Zero Record FlowFiles: false

3.4 PutFile

 
properties
Directory: /path/to/output/directory
Conflict Resolution Strategy: replace
Create Missing Directories: true

4. Flow 연결 구성

 
GenerateFlowFile → ExecuteSQLRecord → ConvertRecord → PutFile

Relationship 연결:

  • GenerateFlowFile → ExecuteSQLRecord: success
  • ExecuteSQLRecord → ConvertRecord: success
  • ConvertRecord → PutFile: success

실패 처리:

  • ExecuteSQLRecord → LogMessage: failure
  • ConvertRecord → LogMessage: failure
  • PutFile → LogMessage: failure

5. 고급 설정 및 최적화

5.1 동적 쿼리 사용

ExecuteSQLRecord에서 Expression Language 사용:

 
sql
SELECT * FROM users 
WHERE created_at >= '${date:format('yyyy-MM-dd', now():minus(86400000))}'
ORDER BY created_at DESC

5.2 파일명 동적 생성

PutFile의 Directory 설정에서:

 
/output/users_${now():format('yyyyMMdd_HHmmss')}.csv

5.3 에러 처리 개선

  • LogMessage Processor 추가하여 실패한 FlowFile 로깅
  • RouteOnAttribute 사용하여 조건별 라우팅

6. 테스트 및 검증

6.1 데이터 검증 쿼리

 
sql
-- 전체 레코드 수 확인
SELECT COUNT(*) FROM users;

-- 연령대별 분포 확인
SELECT city, COUNT(*) as user_count, AVG(age) as avg_age
FROM users 
GROUP BY city 
ORDER BY user_count DESC;

6.2 생성된 CSV 파일 예시

 
csv
id,username,email,full_name,age,city,created_at
1,john_doe,john.doe@email.com,John Doe,28,Seoul,2024-06-02 10:30:00
2,jane_smith,jane.smith@email.com,Jane Smith,32,Busan,2024-06-02 10:30:01
...

7. 트러블슈팅

7.1 일반적인 문제들

  • JDBC 드라이버 경로: PostgreSQL JDBC 드라이버가 NiFi classpath에 있는지 확인
  • 데이터베이스 연결: 방화벽 및 PostgreSQL 설정 확인
  • 권한 문제: 데이터베이스 사용자가 테이블 조회 권한을 가지고 있는지 확인

7.2 성능 최적화

  • Connection Pool 크기 조정: 동시 실행되는 쿼리 수에 맞게 조정
  • 배치 크기 설정: ExecuteSQLRecord의 Max Rows Per Flow File 설정
  • 인덱스 활용: WHERE 절에 사용되는 컬럼에 인덱스 생성

8. 보안 고려사항

8.1 민감정보 보호

  • Parameter Context 사용하여 데이터베이스 credentials 관리
  • Encrypted Configuration 활용

8.2 네트워크 보안

  • SSL 연결 사용: jdbc:postgresql://localhost:5432/db?ssl=true&sslmode=require
  • VPN 또는 Private Network 사용 권장

이 예제를 통해 NiFi에서 PostgreSQL 데이터를 조회하여 CSV 파일로 변환하는 완전한 워크플로우를 구성할 수 있습니다.

728x90
반응형

댓글