개발하는 두부

Logstash Pipeline-to-Pipeline Communication 적용하기

by 뚜부니

Logstash Pipeline-to-Pipeline Communucation을 사용하면 길이가 너무 긴 설정 파일을 줄일 수 있지 않을까 싶어 사용해 봤어요. 그리고 설정을 다른 input에서 재사용하기도 좋을 것 같아서..! 한 번 사용해 보았습니다. 🥰

헤헿 열심히 정리했어요

00. 실습 환경

  • Windows 10
  • Elasticsearch 7.10.2
  • Logstash 7.10.2
  • Kibana 7.10.2
  • Filebeat 7.10.2
  • Git Bash

Elasticsearch와 Kibana가 실행된 상태로 진행했어요. Elasticsearch와 Kibana 설치 및 실행에 대한 내용은 여기에 포함되어 있지 않아요.

 

01. 실습 파일 준비하기

실습을 진행하기 위해 임의로 로그 파일을 생성했어요.

각 로그 파일을 처리하여 Elasticsearch에 저장하는 Logstash 파일도 준비했어요. 이 파일을 사용에 Pipeline-to-Pipeline Communication을 적용해보려고 합니다.

그 전에, Logstash로 데이터를 전달하기 위한 Filebeat 설정부터 했어요. access.log, error.log, transaction.log 각각에 대해 설정을 추가했고, multiline을 설정하여 각 로그를 날짜를 기준으로 묶어 보낼 수 있게 설정했습니다.

multiline_defaults: &multiline_defaults
  multiline:
    pattern: '^\d{4}-\d{2}-\d{2}'
    negate: true
    match: after
filebeat.inputs:
- type: log
  enabled: true
  paths:
     - E:/personal/Workspace/TIL/practice/elastic-stack-posting/post009/access.log
  multiline:
    pattern: '^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$|^(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}$'
    negate: false
    match: after
  fields:
    log_type: access

- type: log
  enabled: true
  paths:
     - E:/personal/Workspace/TIL/practice/elastic-stack-posting/post009/error.log
  <<: *multiline_defaults
  fields:
    log_type: error

- type: log
  enabled: true
  paths:
     - E:/personal/Workspace/TIL/practice/elastic-stack-posting/post009/transaction.log
  <<: *multiline_defaults
  fields:
    log_type: transaction

output.logstash:
  hosts: ["localhost:5044"]

이제 실습을 위한 모든 준비가 끝났습니다!! Pipeline-to-Pipeline Communucation을 적용하러 가볼까요?!

실습 출바알!! 가보자고!!

 

02. Pipeline 설정하기

먼저 Pipeline을 설정하며 logstash.conf 파일을 어떻게 분리할지 정해봤어요.

현재 logstash.conf 파일을 Filebeat를 통해 파일을 읽어오고, Elasticsearch로 파일을 내보냅니다. 읽어온 파일은 log_type을 기준으로 각각 다르게 처리되며, log_typetransaction이면서 로그 메시지가 REQUEST로 시작하는 경우에 대해서는 또 다른 방식으로 데이터를 처리합니다. 그리고 모든 데이터는 Elasticsearch로 내보내기 전, 로그에 찍힌 날짜를 data 플러그인을 통해 기본 날짜/시간 포맷으로 변경합니다.

전체 흐름을 정리하면 다음과 같습니다.

image

이 흐름을 바탕으로 pipline.yml에 필요한 Config 파일을 작성했습니다.

- pipeline.id: inputstream
  path.config: "../config/pipelines/inputstream.conf"

- pipeline.id: accesspipe
  path.config: "../config/pipelines/accesspipe.conf"

- pipeline.id: errorpipe
  path.config: "../config/pipelines/errorpipe.conf"

- pipeline.id: transactionpipe
  path.config: "../config/pipelines/transactionpipe.conf"

- pipeline.id: requestpipe
  path.config: "../config/pipelines/requestpipe.conf"

- pipeline.id: datepipe
  path.config: "../config/pipelines/datepipe.conf"

- pipeline.id: outputstream
  path.config: "../config/pipelines/outputstream.conf"

 

03. Config 파일 분리하기

이제 흐름에 맞게 logstash.conf 파일을 분리하도록 하겠습니다. 이때, outputstream을 제외하고 나머지 Config는 다음 Pipeline으로 데이터를 전달하는 구조로 작성해야 합니다. Pipeline을 사용할 때는, sent_to를 사용하여 다음 Pipeline으로 데이터를 전달하고 address를 사용하여 데이터를 받아옵니다.

먼저 inputstream.conf을 다음과 같이 작성했어요. log_type에 따라 각각 다른 Pipeline으로 보낼 수 있게 if-else문으로 구성했습니다.

input {
  beats {
    port => 5044
  }
}

output {
  if [fields][log_type] == "access" {
    pipeline { send_to => accesslog }
  } else if [fields][log_type] == "error" {
    pipeline { send_to => errorlog }
  } else {
    pipeline { send_to => [transactionlog] }
  }
}

그다음 accesspipe.confsend_to로 보냈던 값을 그대로 address에 넣으면 데이터를 받아옵니다. errorpipe.conftransactionpipe.conf는 이와 유사하게 작성하였는데, 내용이 길어질 것 같아 생략했어요. input과 output은 같고 filter가 다르게 구성되어 있어요.

input {
  pipeline { address => accesslog }
}

filter {
  dissect {
    mapping => {
      "message" => "%{ip} - - [%{log_datetime} %{}] %{message}"
    }
  }
}

output {
  pipeline { send_to => [dateplugin] }
}

이런 식으로 각 파일을 분리한 다음, datepipe.conf에서 데이터 설정을 합니다. 참고로, accesspipe, errorpipe, transactionpipe는 모두 output이 동일하게 datepipe로 보내요!

input {
  pipeline { address => dateplugin }
}

filter {
  date {
    match => ["log_datetime", "YYYY-MM-dd HH:mm:ss.SSS", "dd/MMM/YYYY:HH:mm:ss"]
    target => "datetime"
    timezone => "Asia/Seoul"
  }
}

output {
  pipeline { send_to => [outputdata] }
}

그다음 Elasticsearch로 내보내는 outputstream.conf는 다음과 같이 작성했어요.

input {
  pipeline { address => outputdata }
}

filter {
  ruby {
    code => "event.set('index_day', event.get('datetime').time.localtime('+09:00').strftime('%Y%m%d'))"
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[fields][log_type]}-%{index_day}"
  }
}

 

04. 실행하기

작성한 내용이 정상적으로 동작하는지 확인하기 위해 Logstash와 Filebeat를 실행합니다. 여기서는 실행 명령어에 대해서 생략했어요.

잘 동작하는지 확인하기 위해 Kibana 대시보드에 접속하여 DevTools를 열고 아래 명령어를 입력합니다.

GET _cat/indices/*-20230810?v

명령어를 실행하면 아래 이미지와 같이 정상적으로 인덱스가 생성된 것을 확인할 수 있습니다.

image

전체 Config 파일 내용은 Github에서 확인하실 수 있습니다.

 

Logstash Pipeline-to-Pipeline Communucation을 적용하실 때 도움이 되셨으면 좋겠네요.

❤️🧡💛💚💙💜

 

참고

블로그의 정보

개발하는 두부

뚜부니

활동하기