[AWS] Centralized Log System 구축하기

Objective

- webserver with filebeat + logstash + s3 + lambda + aws elasticsearch 구축하기

note)
- AWS 권한은 IAM을 사용(EC2/S3 full access) - access_key, secret key 안쓸라고
- Security Group에 기본적으로 5044(logstash), 9600(es) 포트는 개방해 둬야함



Install Java 8 in Amazon Linux

링크 참고: https://alamis-infostash.blogspot.com/2018/11/aws-amazon-linux-java-18.html

Install Filebeat in Amazon Linux

# Elasticsearch GPG KEY
$> sudo rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
# elastic.repo 파일 생성
$> sudo vi /etc/yum.repos.d/elastic.repo
  
# elastic.repo 내용
[elastic-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
$> sudo yum install filebeat
  
# run in all run-level
$> sudo chkconfig --add filebeat


Install Logstash in Amazon Linux

# Elasticsearch GPG KEY
$> sudo rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
# logstash.repo 파일 생성
$> sudo vi /etc/yum.repos.d/logstash.repo
  
# logstash.repo 내용
[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
$> sudo yum install logstash

Install Logstash Plugins(logstash-input-beats / logstash-outpu-s3)

# plugin listing
/usr/share/logstash/bin$> sudo ./logstash-plugin list
  
# filebeat input plugin 설치
/usr/share/logstash/bin$> sudo ./logstash-plugin install logstash-input-beats
# s3 output plugin 설치
/usr/share/logstash/bin$> sudo ./logstash-plugin install logstash-output-s3

Filebeat Setting

/etc/filebeat/filebeat.yml
#=========================== Filebeat inputs =============================
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
- type: log
# Change to true to enable this input configuration.
enabled: true

# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/*.log
#- c:\programdata\elasticsearch\logs\*

...

#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
# Array of hosts to connect to.
#hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "elastic"
#password: "changeme"
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["ip:port"]
  1. log enabled
  2. set log file path
  3. elasticsearch output off
  4. logstash output on

Logstash Setting

/etc/logstash/conf.d/logstash.conf
input {
beats {
port => 5044
}
}

'# filter 생략

output {
s3 {
region => "리전명"
bucket => "버켓명"
}
}


Run Logstash


# 옵션: -w [쓰레드 숫자] 로 thread 늘려서 병렬 처리 가능함.
#       filter 가 많이 걸리면 고려해 볼것.
/usr/share/logstash/bin$> sudo ./logstash --path.settings /etc/logstash

* Logstash autostart 하기

# logstash 활성화 여부
$> systemctl status logstash
# logstash enable 방법
$> sudo systemctl enable logstash
$> sudo reboot 또는 $> sudo systemctl start logstash
# logstash diable 방법
$> sudo systemctl disable logstash

Run Filebeat

/usr/bin$> sudo ./filebeat -e -d "publish"
  
# 혹은 chkconfig 로 service 등록 되어 있으면
$> sudo service filebeat (start | stop | restart)

S3에 아래 형식으로 파일이 만들어 지는지 확인(만들어지는 delay 조금 있음)




Lambda function for S3 PUT event

s3 to aws es emitter

/*
 * S3 -> Lambda -> ES for web server logging.
 */
/* Imports */
var AWS = require('aws-sdk');
var LineStream = require('byline').LineStream;
var path = require('path');
var stream = require('stream');
/* Globals */
var esDomain = {
    endpoint: 'abcd.example.es.amazonaws.com', // input es endpoint
    region: 'example', // input region name
    index: 'weblog',   // input es index name
    doctype: 'tomcat'  // input es document name
};
var endpoint =  new AWS.Endpoint(esDomain.endpoint);
var s3 = new AWS.S3();
var totLogLines = 0;    // Total number of log lines in the file
var numDocsAdded = 0;   // Number of log lines added to ES so far
/*
 * The AWS credentials are picked up from the environment.
 * They belong to the IAM role assigned to the Lambda function.
 * Since the ES requests are signed using these credentials,
 * make sure to apply a policy that permits ES domain operations
 * to the role.
 */
var creds = new AWS.EnvironmentCredentials('AWS');
/*
 * Get the log file from the given S3 bucket and key.  Parse it and add
 * each log record to the ES domain.
 */
function s3LogsToES(bucket, key, context, lineStream, recordStream) {
    var s3Stream = s3.getObject({Bucket: bucket, Key: key}).createReadStream();
    // Flow: S3 file stream -> Log Line stream -> record extractor -> ES
    s3Stream
        .pipe(lineStream)
        .pipe(recordStream)
        .on('data', function(record) {
            postDocumentToES(record, context);
    });
    s3Stream.on('error', function() {
        console.log(
            'Error getting object "' + key + '" from bucket "' + bucket + '".  ' +
            'Make sure they exist and your bucket is in the same region as this function.');
        context.fail();
    });
}
/*
 * Add the given document to the ES domain.
 * If all records are successfully added, indicate success to lambda
 * (using the "context" parameter).
 */
function postDocumentToES(doc, context) {
    var req = new AWS.HttpRequest(endpoint);
    req.method = 'POST';
    req.path = path.join('/', esDomain.index, esDomain.doctype);
    req.region = esDomain.region;
    req.body = doc;
    req.headers['presigned-expires'] = false;
    req.headers['Host'] = endpoint.host;
    req.headers['Content-Type'] = 'application/json';
    console.log('request to ES: ', JSON.stringify(req, null, 2));
    // Sign the request (Sigv4)
    var signer = new AWS.Signers.V4(req, 'es');
    signer.addAuthorization(creds, new Date());
    // Post document to ES
    var send = new AWS.NodeHttpClient();
    send.handleRequest(req, null, function (httpResp) {
        httpResp.on('data', function (chunk) {
            console.log('on data: ' + chunk);
        });
        httpResp.on('end', function (chunk) {
            console.log('on end: ' + chunk);
            numDocsAdded++;
            if (numDocsAdded === totLogLines) {
                // Mark lambda success.  If not done so, it will be retried.
                console.log('All ' + numDocsAdded + ' log records added to ES.');
                // reset counter
                numDocsAdded = 0;
                totLogLines = 0;
                context.succeed();
            }
        });
    }, function (err) {
        console.log('Error: ' + err);
        console.log(numDocsAdded + 'of ' + totLogLines + ' log records added to ES.');
        context.fail();
    });
}
/* Lambda "main": Execution starts here */
exports.handler = function(event, context) {
    console.log('Received event: ', JSON.stringify(event, null, 2));
    /* == Streams ==
    * To avoid loading an entire (typically large) log file into memory,
    * this is implemented as a pipeline of filters, streaming log data from S3 to ES.
    * Flow: S3 file stream -> Log Line stream -> Log extractor -> ES
    */
    // exclude filebeat date and ip address.
    var recordStream = new stream.Transform({objectMode: true})
    recordStream._transform = function(line, encoding, done) {
        var str,
            startIdxOfLog,
            logRecord;
        str = line.toString();
        startIdxOfLog = str.indexOf("{\"@timestamp\"");
        logRecord = str.substr(startIdxOfLog);
        console.log("log record: " + logRecord);
        this.push(logRecord);
        totLogLines ++;
        done();
    };
    event.Records.forEach(function(record) {
        var bucket = record.s3.bucket.name;
        var objKey = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' '));
        s3LogsToES(bucket, objKey, context, new LineStream(), recordStream);
    });
}

cf) https://github.com/aws-samples/amazon-elasticsearch-lambda-samples 수정한 버전


S3 bucket 에 event trigger 추가
  • log file 도착시 event 발생시켜 lambda로 밀어주기
  • s3 bucket → properties → events
  • add notification → 위에 작성한 lambda 연결
  • prefix 와 suffix 로 bucket 에 있는 파일중 event 발생 시킬 파일명 필터링 할 수 있음



AWS ES Domain 구성


aws es vpc 구성시 주의점

  • vpc 로 구성시 es 의 endpoint 가 subnet 안으로 들어가게 됨 (아래 그림 참조)
  • 그래서 lambda 를 같은 subnet 안에 넣지 않으면 endpoint resolve 되지 않아 es 찾지 못함
  • lambda 를 es 와 같은 vpc 의 subnet 선택 하도록 수정
  • lambda 가 subnet 안으로 들어 갔으므로 global 에 있는 s3 를 찾지 못해 PUT object 가져 오지 못함
  • vpc → endpoint 에서 s3 endpoint 를 lambda 가 있는 subnet 에 만들어 주면 해당 subnet 의 routing table 이 s3 로 갈수 있도록 업데이트 됨


endpoint 및 routing table update



es 가 vpc 에 있을 경우 kibana 접속

1) localhost 를 vpc endpoint 로 port forwarding
port forwarding
$> ssh -i ~/.ssh/<pem file> ec2-user@ec2-xxx-xxx-xxx-xxx.<region>.compute.amazonaws.com -N -L 9200:<es vpc endpoing>:443
2) 브라우져에서 https://localhost:9200/_plugin/kibana 접속

vpc 개념와 endpoint 개념




댓글

이 블로그의 인기 게시물

[Protocol] WIEGAND 통신

Orange for Oracle에서 한글 깨짐 해결책

[URL] 대소문자를 구분하나?