[AWS] Centralized Log System 구축하기
Objective
- webserver with filebeat + logstash + s3 + lambda + aws elasticsearch 구축하기note)
- AWS 권한은 IAM을 사용(EC2/S3 full access) - access_key, secret key 안쓸라고
- Security Group에 기본적으로 5044(logstash), 9600(es) 포트는 개방해 둬야함
Install Java 8 in Amazon Linux
링크 참고: https://alamis-infostash.blogspot.com/2018/11/aws-amazon-linux-java-18.htmlInstall Filebeat in Amazon Linux
# Elasticsearch GPG KEY $> sudo rpm -- import https: //packages .elastic.co /GPG-KEY-elasticsearch # elastic.repo 파일 생성 $> sudo vi /etc/yum .repos.d /elastic .repo # elastic.repo 내용 [elastic-6.x] name=Elastic repository for 6.x packages baseurl=https: //artifacts .elastic.co /packages/6 .x /yum gpgcheck=1 gpgkey=https: //artifacts .elastic.co /GPG-KEY-elasticsearch enabled=1 autorefresh=1 type =rpm-md $> sudo yum install filebeat # run in all run-level $> sudo chkconfig --add filebeat |
Install Logstash in Amazon Linux
# Elasticsearch GPG KEY $> sudo rpm -- import https: //packages .elastic.co /GPG-KEY-elasticsearch # logstash.repo 파일 생성 $> sudo vi /etc/yum .repos.d /logstash .repo # logstash.repo 내용 [logstash-6.x] name=Elastic repository for 6.x packages baseurl=https: //artifacts .elastic.co /packages/6 .x /yum gpgcheck=1 gpgkey=https: //artifacts .elastic.co /GPG-KEY-elasticsearch enabled=1 autorefresh=1 type =rpm-md $> sudo yum install logstash |
Install Logstash Plugins(logstash-input-beats / logstash-outpu-s3)
# plugin listing /usr/share/logstash/bin $> sudo . /logstash-plugin list # filebeat input plugin 설치 /usr/share/logstash/bin $> sudo . /logstash-plugin install logstash-input-beats # s3 output plugin 설치 /usr/share/logstash/bin $> sudo . /logstash-plugin install logstash-output-s3 |
Filebeat Setting
/etc/filebeat/filebeat.yml
#=========================== Filebeat inputs =============================
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
- type: log
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/*.log
#- c:\programdata\elasticsearch\logs\*
...
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
# Array of hosts to connect to.
#hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "elastic"
#password: "changeme"
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["ip:port"]
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
- type: log
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/*.log
#- c:\programdata\elasticsearch\logs\*
...
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
# Array of hosts to connect to.
#hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "elastic"
#password: "changeme"
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["ip:port"]
- log enabled
- set log file path
- elasticsearch output off
- logstash output on
Logstash Setting
/etc/logstash/conf.d/logstash.conf
input {
beats {
port => 5044
}
}
'# filter 생략
output {
s3 {
region => "리전명"
bucket => "버켓명"
}
}
beats {
port => 5044
}
}
'# filter 생략
output {
s3 {
region => "리전명"
bucket => "버켓명"
}
}
Run Logstash
# 옵션: -w [쓰레드 숫자] 로 thread 늘려서 병렬 처리 가능함. # filter 가 많이 걸리면 고려해 볼것. /usr/share/logstash/bin $> sudo . /logstash --path.settings /etc/logstash |
* Logstash autostart 하기
# logstash 활성화 여부
$> systemctl status logstash
# logstash enable 방법
$>
sudo
systemctl
enable
logstash
$>
sudo
reboot 또는 $>
sudo
systemctl start logstash
# logstash diable 방법
$>
sudo
systemctl disable logstash
Run Filebeat
/usr/bin $> sudo . /filebeat -e -d "publish" # 혹은 chkconfig 로 service 등록 되어 있으면 $> sudo service filebeat (start | stop | restart) |
S3에 아래 형식으로 파일이 만들어 지는지 확인(만들어지는 delay 조금 있음)
Lambda function for S3 PUT event
s3 to aws es emitter
/* * S3 -> Lambda -> ES for web server logging. */ /* Imports */ var AWS = require( 'aws-sdk' ); var LineStream = require( 'byline' ).LineStream; var path = require( 'path' ); var stream = require( 'stream' ); /* Globals */ var esDomain = { endpoint: 'abcd.example.es.amazonaws.com' , // input es endpoint region: 'example' , // input region name index: 'weblog' , // input es index name doctype: 'tomcat' // input es document name }; var endpoint = new AWS.Endpoint(esDomain.endpoint); var s3 = new AWS.S3(); var totLogLines = 0; // Total number of log lines in the file var numDocsAdded = 0; // Number of log lines added to ES so far /* * The AWS credentials are picked up from the environment. * They belong to the IAM role assigned to the Lambda function. * Since the ES requests are signed using these credentials, * make sure to apply a policy that permits ES domain operations * to the role. */ var creds = new AWS.EnvironmentCredentials( 'AWS' ); /* * Get the log file from the given S3 bucket and key. Parse it and add * each log record to the ES domain. */ function s3LogsToES(bucket, key, context, lineStream, recordStream) { var s3Stream = s3.getObject({Bucket: bucket, Key: key}).createReadStream(); // Flow: S3 file stream -> Log Line stream -> record extractor -> ES s3Stream .pipe(lineStream) .pipe(recordStream) .on( 'data' , function (record) { postDocumentToES(record, context); }); s3Stream.on( 'error' , function () { console.log( 'Error getting object "' + key + '" from bucket "' + bucket + '". ' + 'Make sure they exist and your bucket is in the same region as this function.' ); context.fail(); }); } /* * Add the given document to the ES domain. * If all records are successfully added, indicate success to lambda * (using the "context" parameter). */ function postDocumentToES(doc, context) { var req = new AWS.HttpRequest(endpoint); req.method = 'POST' ; req.path = path.join( '/' , esDomain.index, esDomain.doctype); req.region = esDomain.region; req.body = doc; req.headers[ 'presigned-expires' ] = false ; req.headers[ 'Host' ] = endpoint.host; req.headers[ 'Content-Type' ] = 'application/json' ; console.log( 'request to ES: ' , JSON.stringify(req, null , 2)); // Sign the request (Sigv4) var signer = new AWS.Signers.V4(req, 'es' ); signer.addAuthorization(creds, new Date()); // Post document to ES var send = new AWS.NodeHttpClient(); send.handleRequest(req, null , function (httpResp) { httpResp.on( 'data' , function (chunk) { console.log( 'on data: ' + chunk); }); httpResp.on( 'end' , function (chunk) { console.log( 'on end: ' + chunk); numDocsAdded++; if (numDocsAdded === totLogLines) { // Mark lambda success. If not done so, it will be retried. console.log( 'All ' + numDocsAdded + ' log records added to ES.' ); // reset counter numDocsAdded = 0; totLogLines = 0; context.succeed(); } }); }, function (err) { console.log( 'Error: ' + err); console.log(numDocsAdded + 'of ' + totLogLines + ' log records added to ES.' ); context.fail(); }); } /* Lambda "main": Execution starts here */ exports.handler = function (event, context) { console.log( 'Received event: ' , JSON.stringify(event, null , 2)); /* == Streams == * To avoid loading an entire (typically large) log file into memory, * this is implemented as a pipeline of filters, streaming log data from S3 to ES. * Flow: S3 file stream -> Log Line stream -> Log extractor -> ES */ // exclude filebeat date and ip address. var recordStream = new stream.Transform({objectMode: true }) recordStream._transform = function (line, encoding, done) { var str, startIdxOfLog, logRecord; str = line.toString(); startIdxOfLog = str.indexOf( "{\"@timestamp\"" ); logRecord = str.substr(startIdxOfLog); console.log( "log record: " + logRecord); this .push(logRecord); totLogLines ++; done(); }; event.Records.forEach( function (record) { var bucket = record.s3.bucket.name; var objKey = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' ' )); s3LogsToES(bucket, objKey, context, new LineStream(), recordStream); }); } |
cf) https://github.com/aws-samples/amazon-elasticsearch-lambda-samples 수정한 버전
S3 bucket 에 event trigger 추가
- log file 도착시 event 발생시켜 lambda로 밀어주기
- s3 bucket → properties → events
- add notification → 위에 작성한 lambda 연결
- prefix 와 suffix 로 bucket 에 있는 파일중 event 발생 시킬 파일명 필터링 할 수 있음
AWS ES Domain 구성
- aws elasticsearch service domain 생성 참고
- 보안을 위해 vpc 에 도메인 구성
- vpc 에 구성시 아래 참조
aws es vpc 구성시 주의점
- vpc 로 구성시 es 의 endpoint 가 subnet 안으로 들어가게 됨 (아래 그림 참조)
- 그래서 lambda 를 같은 subnet 안에 넣지 않으면 endpoint resolve 되지 않아 es 찾지 못함
- lambda 를 es 와 같은 vpc 의 subnet 선택 하도록 수정
- lambda 가 subnet 안으로 들어 갔으므로 global 에 있는 s3 를 찾지 못해 PUT object 가져 오지 못함
- vpc → endpoint 에서 s3 endpoint 를 lambda 가 있는 subnet 에 만들어 주면 해당 subnet 의 routing table 이 s3 로 갈수 있도록 업데이트 됨
endpoint 및 routing table update
es 가 vpc 에 있을 경우 kibana 접속
1) localhost 를 vpc endpoint 로 port forwarding
port forwarding
$> ssh -i ~/. ssh /<pem file > ec2-user@ec2-xxx-xxx-xxx-xxx.<region>.compute.amazonaws.com -N -L 9200:<es vpc endpoing>:443 |
댓글
댓글 쓰기