Logstash Pipeline-to-Pipeline Communication 적용하기
by 뚜부니Logstash Pipeline-to-Pipeline Communucation을 사용하면 길이가 너무 긴 설정 파일을 줄일 수 있지 않을까 싶어 사용해 봤어요. 그리고 설정을 다른 input에서 재사용하기도 좋을 것 같아서..! 한 번 사용해 보았습니다. 🥰
00. 실습 환경
- Windows 10
- Elasticsearch 7.10.2
- Logstash 7.10.2
- Kibana 7.10.2
- Filebeat 7.10.2
- Git Bash
Elasticsearch와 Kibana가 실행된 상태로 진행했어요. Elasticsearch와 Kibana 설치 및 실행에 대한 내용은 여기에 포함되어 있지 않아요.
01. 실습 파일 준비하기
실습을 진행하기 위해 임의로 로그 파일을 생성했어요.
각 로그 파일을 처리하여 Elasticsearch에 저장하는 Logstash 파일도 준비했어요. 이 파일을 사용에 Pipeline-to-Pipeline Communication을 적용해보려고 합니다.
그 전에, Logstash로 데이터를 전달하기 위한 Filebeat 설정부터 했어요. access.log
, error.log
, transaction.log
각각에 대해 설정을 추가했고, multiline을 설정하여 각 로그를 날짜를 기준으로 묶어 보낼 수 있게 설정했습니다.
multiline_defaults: &multiline_defaults
multiline:
pattern: '^\d{4}-\d{2}-\d{2}'
negate: true
match: after
filebeat.inputs:
- type: log
enabled: true
paths:
- E:/personal/Workspace/TIL/practice/elastic-stack-posting/post009/access.log
multiline:
pattern: '^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$|^(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}$'
negate: false
match: after
fields:
log_type: access
- type: log
enabled: true
paths:
- E:/personal/Workspace/TIL/practice/elastic-stack-posting/post009/error.log
<<: *multiline_defaults
fields:
log_type: error
- type: log
enabled: true
paths:
- E:/personal/Workspace/TIL/practice/elastic-stack-posting/post009/transaction.log
<<: *multiline_defaults
fields:
log_type: transaction
output.logstash:
hosts: ["localhost:5044"]
이제 실습을 위한 모든 준비가 끝났습니다!! Pipeline-to-Pipeline Communucation을 적용하러 가볼까요?!
02. Pipeline 설정하기
먼저 Pipeline을 설정하며 logstash.conf
파일을 어떻게 분리할지 정해봤어요.
현재 logstash.conf
파일을 Filebeat를 통해 파일을 읽어오고, Elasticsearch로 파일을 내보냅니다. 읽어온 파일은 log_type
을 기준으로 각각 다르게 처리되며, log_type
이 transaction
이면서 로그 메시지가 REQUEST
로 시작하는 경우에 대해서는 또 다른 방식으로 데이터를 처리합니다. 그리고 모든 데이터는 Elasticsearch로 내보내기 전, 로그에 찍힌 날짜를 data
플러그인을 통해 기본 날짜/시간 포맷으로 변경합니다.
전체 흐름을 정리하면 다음과 같습니다.
이 흐름을 바탕으로 pipline.yml
에 필요한 Config 파일을 작성했습니다.
- pipeline.id: inputstream
path.config: "../config/pipelines/inputstream.conf"
- pipeline.id: accesspipe
path.config: "../config/pipelines/accesspipe.conf"
- pipeline.id: errorpipe
path.config: "../config/pipelines/errorpipe.conf"
- pipeline.id: transactionpipe
path.config: "../config/pipelines/transactionpipe.conf"
- pipeline.id: requestpipe
path.config: "../config/pipelines/requestpipe.conf"
- pipeline.id: datepipe
path.config: "../config/pipelines/datepipe.conf"
- pipeline.id: outputstream
path.config: "../config/pipelines/outputstream.conf"
03. Config 파일 분리하기
이제 흐름에 맞게 logstash.conf
파일을 분리하도록 하겠습니다. 이때, outputstream
을 제외하고 나머지 Config는 다음 Pipeline으로 데이터를 전달하는 구조로 작성해야 합니다. Pipeline을 사용할 때는, sent_to
를 사용하여 다음 Pipeline으로 데이터를 전달하고 address
를 사용하여 데이터를 받아옵니다.
먼저 inputstream.conf
을 다음과 같이 작성했어요. log_type
에 따라 각각 다른 Pipeline으로 보낼 수 있게 if-else문으로 구성했습니다.
input {
beats {
port => 5044
}
}
output {
if [fields][log_type] == "access" {
pipeline { send_to => accesslog }
} else if [fields][log_type] == "error" {
pipeline { send_to => errorlog }
} else {
pipeline { send_to => [transactionlog] }
}
}
그다음 accesspipe.conf
는 send_to
로 보냈던 값을 그대로 address
에 넣으면 데이터를 받아옵니다. errorpipe.conf
와 transactionpipe.conf
는 이와 유사하게 작성하였는데, 내용이 길어질 것 같아 생략했어요. input과 output은 같고 filter가 다르게 구성되어 있어요.
input {
pipeline { address => accesslog }
}
filter {
dissect {
mapping => {
"message" => "%{ip} - - [%{log_datetime} %{}] %{message}"
}
}
}
output {
pipeline { send_to => [dateplugin] }
}
이런 식으로 각 파일을 분리한 다음, datepipe.conf
에서 데이터 설정을 합니다. 참고로, accesspipe, errorpipe, transactionpipe는 모두 output이 동일하게 datepipe로 보내요!
input {
pipeline { address => dateplugin }
}
filter {
date {
match => ["log_datetime", "YYYY-MM-dd HH:mm:ss.SSS", "dd/MMM/YYYY:HH:mm:ss"]
target => "datetime"
timezone => "Asia/Seoul"
}
}
output {
pipeline { send_to => [outputdata] }
}
그다음 Elasticsearch로 내보내는 outputstream.conf
는 다음과 같이 작성했어요.
input {
pipeline { address => outputdata }
}
filter {
ruby {
code => "event.set('index_day', event.get('datetime').time.localtime('+09:00').strftime('%Y%m%d'))"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[fields][log_type]}-%{index_day}"
}
}
04. 실행하기
작성한 내용이 정상적으로 동작하는지 확인하기 위해 Logstash와 Filebeat를 실행합니다. 여기서는 실행 명령어에 대해서 생략했어요.
잘 동작하는지 확인하기 위해 Kibana 대시보드에 접속하여 DevTools
를 열고 아래 명령어를 입력합니다.
GET _cat/indices/*-20230810?v
명령어를 실행하면 아래 이미지와 같이 정상적으로 인덱스가 생성된 것을 확인할 수 있습니다.
전체 Config 파일 내용은 Github에서 확인하실 수 있습니다.
Logstash Pipeline-to-Pipeline Communucation을 적용하실 때 도움이 되셨으면 좋겠네요.
참고
'Elastic Stack (ELK)' 카테고리의 다른 글
Elastic Stack 또는 ELK에 OpenDistro Security 적용하기 (0) | 2024.11.13 |
---|---|
Logstash에서 Elasticsearch 인덱스 한국 시간(KST) 추가해서 생성하기 (0) | 2024.11.11 |
Elastic Stack 또는 ELK에 Search Guard 적용하기 (0) | 2024.11.10 |
CentOS에서 Elasticsearch 운영 환경 설정하기 (0) | 2024.11.09 |
Elasticsearch Cluster 설정하기 (0) | 2023.06.11 |
블로그의 정보
개발하는 두부
뚜부니