728x90
반응형
NiFi 2.2.0에서 PostgreSQL을 조회하여 CSV 파일을 만드는 완전한 예제를 만들어드렸습니다.
주요 구성 요소는 다음과 같습니다:
- PostgreSQL 설정: users 테이블 생성 및 10개의 샘플 데이터
- NiFi Flow: GenerateFlowFile → ExecuteSQLRecord → ConvertRecord → PutFile
- Controller Services: DBCPConnectionPool, JsonTreeReader, CSVRecordSetWriter
핵심 포인트:
- ExecuteSQLRecord가 PostgreSQL에서 데이터를 JSON 형태로 가져옵니다
- ConvertRecord가 JSON을 CSV로 변환합니다
- PutFile이 최종 CSV 파일을 저장합니다
시작하기 전에 확인해야 할 사항:
- PostgreSQL JDBC 드라이버(postgresql-42.x.x.jar)를 NiFi의 lib 디렉토리에 복사
- PostgreSQL 서버가 실행 중이고 접근 가능한지 확인
- 데이터베이스와 테이블이 생성되어 있는지 확인
다운로드 : https://jdbc.postgresql.org/download/
docker cp ./postgresql-42.7.6.jar nifi-2200:/opt/nifi/nifi-2.2.0/lib/
확인 : docker exec -u 0 -it nifi-2200 bash
cd /opt/nifi/nifi-2.2.0/lib/
1. PostgreSQL 테이블 생성 및 샘플 데이터
테이블 생성 스크립트
sql
-- 사용자 정보 테이블 생성
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
full_name VARCHAR(100),
age INTEGER,
city VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 인덱스 생성 (성능 향상)
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_city ON users(city);
샘플 데이터 삽입
sql
-- 샘플 데이터 삽입
INSERT INTO users (username, email, full_name, age, city) VALUES
('john_doe', 'john.doe@email.com', 'John Doe', 28, 'Seoul'),
('jane_smith', 'jane.smith@email.com', 'Jane Smith', 32, 'Busan'),
('mike_wilson', 'mike.wilson@email.com', 'Mike Wilson', 25, 'Incheon'),
('sarah_johnson', 'sarah.johnson@email.com', 'Sarah Johnson', 29, 'Daegu'),
('david_brown', 'david.brown@email.com', 'David Brown', 35, 'Seoul'),
('lisa_davis', 'lisa.davis@email.com', 'Lisa Davis', 27, 'Gwangju'),
('tom_miller', 'tom.miller@email.com', 'Tom Miller', 31, 'Daejeon'),
('amy_garcia', 'amy.garcia@email.com', 'Amy Garcia', 26, 'Seoul'),
('chris_lee', 'chris.lee@email.com', 'Chris Lee', 30, 'Busan'),
('emma_taylor', 'emma.taylor@email.com', 'Emma Taylor', 24, 'Ulsan');
-- 데이터 확인
SELECT * FROM users ORDER BY id;
2. NiFi Flow 구성
필요한 Processor들
- GenerateFlowFile - 주기적으로 Flow 트리거
- ExecuteSQLRecord - PostgreSQL 쿼리 실행
- ConvertRecord - JSON을 CSV로 변환
- PutFile - CSV 파일로 저장
Controller Services 설정
DBCPConnectionPool 설정
properties
Database Connection URL: jdbc:postgresql://localhost:5432/your_database_name
Database Driver Class Name: org.postgresql.Driver
Database Driver Location(s): /path/to/postgresql-42.x.x.jar
Database User: your_username
Database Password: your_password
Max Wait Time: 500 millis
Max Total Connections: 8
JsonTreeReader 설정
properties
Schema Access Strategy: Infer Schema
CSVRecordSetWriter 설정
properties
Schema Write Strategy: Set 'schema.name' Attribute
Schema Access Strategy: Inherit Record Schema
CSV Format: RFC4180
Include Header Line: true
3. Processor 상세 설정
3.1 GenerateFlowFile
properties
File Size: 0B
Batch Size: 1
Unique FlowFiles: false
Scheduling:
- Run Schedule: 30 sec (또는 원하는 주기)
3.2 ExecuteSQLRecord
properties
Database Connection Pooling Service: DBCPConnectionPool
SQL select query:
SELECT
id,
username,
email,
full_name,
age,
city,
created_at
FROM users
WHERE age >= 25
ORDER BY created_at DESC
Record Writer: JsonRecordSetWriter
Include Zero Record FlowFiles: false
3.3 ConvertRecord
properties
Record Reader: JsonTreeReader
Record Writer: CSVRecordSetWriter
Include Zero Record FlowFiles: false
3.4 PutFile
properties
Directory: /path/to/output/directory
Conflict Resolution Strategy: replace
Create Missing Directories: true
4. Flow 연결 구성
GenerateFlowFile → ExecuteSQLRecord → ConvertRecord → PutFile
Relationship 연결:
- GenerateFlowFile → ExecuteSQLRecord: success
- ExecuteSQLRecord → ConvertRecord: success
- ConvertRecord → PutFile: success
실패 처리:
- ExecuteSQLRecord → LogMessage: failure
- ConvertRecord → LogMessage: failure
- PutFile → LogMessage: failure
5. 고급 설정 및 최적화
5.1 동적 쿼리 사용
ExecuteSQLRecord에서 Expression Language 사용:
sql
SELECT * FROM users
WHERE created_at >= '${date:format('yyyy-MM-dd', now():minus(86400000))}'
ORDER BY created_at DESC
5.2 파일명 동적 생성
PutFile의 Directory 설정에서:
/output/users_${now():format('yyyyMMdd_HHmmss')}.csv
5.3 에러 처리 개선
- LogMessage Processor 추가하여 실패한 FlowFile 로깅
- RouteOnAttribute 사용하여 조건별 라우팅
6. 테스트 및 검증
6.1 데이터 검증 쿼리
sql
-- 전체 레코드 수 확인
SELECT COUNT(*) FROM users;
-- 연령대별 분포 확인
SELECT city, COUNT(*) as user_count, AVG(age) as avg_age
FROM users
GROUP BY city
ORDER BY user_count DESC;
6.2 생성된 CSV 파일 예시
csv
id,username,email,full_name,age,city,created_at
1,john_doe,john.doe@email.com,John Doe,28,Seoul,2024-06-02 10:30:00
2,jane_smith,jane.smith@email.com,Jane Smith,32,Busan,2024-06-02 10:30:01
...
7. 트러블슈팅
7.1 일반적인 문제들
- JDBC 드라이버 경로: PostgreSQL JDBC 드라이버가 NiFi classpath에 있는지 확인
- 데이터베이스 연결: 방화벽 및 PostgreSQL 설정 확인
- 권한 문제: 데이터베이스 사용자가 테이블 조회 권한을 가지고 있는지 확인
7.2 성능 최적화
- Connection Pool 크기 조정: 동시 실행되는 쿼리 수에 맞게 조정
- 배치 크기 설정: ExecuteSQLRecord의 Max Rows Per Flow File 설정
- 인덱스 활용: WHERE 절에 사용되는 컬럼에 인덱스 생성
8. 보안 고려사항
8.1 민감정보 보호
- Parameter Context 사용하여 데이터베이스 credentials 관리
- Encrypted Configuration 활용
8.2 네트워크 보안
- SSL 연결 사용: jdbc:postgresql://localhost:5432/db?ssl=true&sslmode=require
- VPN 또는 Private Network 사용 권장
이 예제를 통해 NiFi에서 PostgreSQL 데이터를 조회하여 CSV 파일로 변환하는 완전한 워크플로우를 구성할 수 있습니다.
728x90
반응형
'BIG DATA > NiFi' 카테고리의 다른 글
NiFi에서 FetchFile Processor란? (0) | 2025.05.28 |
---|---|
NiFi 2.2.0 에 ListFile 프로세스 (0) | 2025.05.28 |
NiFi 1.16.3 수동설치 - Windows 11에 (0) | 2025.05.15 |
Apache NiFi Expression Language(NiFi EL) (0) | 2025.05.14 |
Apache NiFi 에서 FlowFile 이란...개념과 디버깅방법 (1) | 2025.05.14 |
댓글