Openresty + Lua + Kafka實(shí)現實(shí)時(shí)日志采集
優(yōu)采云 發(fā)布時(shí)間: 2020-08-06 09:06簡(jiǎn)介
在許多數據采集方案中,Flume是高性能的日志采集工具. 我相信每個(gè)人都知道. 許多人認為Flume是一個(gè)組件,可以將它們中的大多數與Flume和Kafka的組合相關(guān)聯(lián)以進(jìn)行日志采集. 該解決方案具有許多優(yōu)勢,例如高性能,高吞吐量和數據可靠性. 但是,如果我們需要實(shí)時(shí)采集日志,顯然這不是一個(gè)好的解決方案. 原因如下:
目前,Flume可以支持對目錄中數據文件的實(shí)時(shí)監視. 某個(gè)目錄的文件采集完成后,將使用完成的符號進(jìn)行標記. 如果以后有數據輸入此文件,則不會(huì )檢測到Flume.
因此,我們通常使用這種方案進(jìn)行計時(shí)采集. 只要生成新的數據目錄,我們就會(huì )將數據文件采集到該目錄中.
然后,本文將向您介紹基于Openresty + Lua + Kafka的實(shí)時(shí)日志采集.
要求
很多時(shí)候,我們需要實(shí)時(shí)采集用戶(hù)的掩埋點(diǎn)數據,然后使用這些數據對用戶(hù)的行為進(jìn)行一些實(shí)時(shí)分析. 因此,第一步當然是解決如何實(shí)時(shí)采集數據.
我們在這里使用的解決方案是Openresty + Lua + Kafka.
原理介紹
那么什么是Openresty?這是官方報價(jià):
OpenResty是基于Nginx和Lua的高性能Web平臺. 它集成了許多復雜的Lua庫,第三方模塊及其大多數依賴(lài)項. 它用于方便地構建可處理超高并發(fā)性和高可伸縮性的動(dòng)態(tài)Web應用程序,Web服務(wù)和動(dòng)態(tài)網(wǎng)關(guān).
OpenResty通過(guò)融合各種精心設計的Nginx模塊,有效地將Nginx變成了功能強大的通用Web應用程序平臺. 這樣,Web開(kāi)發(fā)人員和系統工程師可以使用Lu腳本語(yǔ)言來(lái)調動(dòng)Nginx支持的各種C和Lua模塊,并快速構建一個(gè)具有10K甚至1000個(gè)以上的單機并發(fā)連接的高性能Web應用程序系統.
OpenResty的目標是使您的Web服務(wù)直接在Nginx服務(wù)內部運行,充分利用Nginx的非阻塞I / O模型,不僅用于HTTP客戶(hù)端請求,甚至用于遠程后端(例如MySQL,PostgreSQL) ,Memcached和Redis等具有一致的高性能響應.
簡(jiǎn)單來(lái)說(shuō),就是通過(guò)Nginx發(fā)送客戶(hù)端的請求(本文指的是用戶(hù)的行為日志),以將用戶(hù)的數據傳遞到我們指定的位置(卡夫卡),為了達到這一要求,我們使用Lua腳本,因為Openresty封裝了各種Lua模塊,其中之一是子安裝Kafka模塊,所以我們只需要編寫(xiě)一個(gè)簡(jiǎn)單的腳本即可通過(guò)Nginx將用戶(hù)數據轉發(fā)到Kafka,以便隨后使用數據.
以下是供大家理解的體系*敏*感*詞*:
以下是使用Openresty + Lua + Kafka的優(yōu)點(diǎn)的簡(jiǎn)要摘要:
1. 支持多種業(yè)務(wù)數據,不同的業(yè)務(wù)數據,只需要配置不同的Lua腳本,就可以將不同的業(yè)務(wù)數據發(fā)送到Kafka中的不同主題.
2. 實(shí)時(shí)采集用戶(hù)觸發(fā)的埋藏點(diǎn)數據
3. 高度可靠的集群. 由于Openresty基于Nginx,因此其群集具有非常高的性能和穩定性.
4. 高并發(fā). 與tomcat,apache和其他Web服務(wù)器相比,Nginx的并發(fā)性比其他兩個(gè)要高得多. 在正常情況下處理數萬(wàn)個(gè)并發(fā)并不難.
接下來(lái)讓我們做一些實(shí)際的工作.
安裝Openresty
此示例使用獨立部署表單. 成功完成獨立部署后,將在獨立計算機上構建集群,只是在不同的計算機上執行相同的步驟.
注意: 本實(shí)驗基于centos7.0操作系統
1. 下載Openresty依賴(lài)項:
yum install readline-devel pcre-devel openssl-devel gcc
2. 編譯并安裝Openresty:
#1.安裝openresty:
mkdir /opt/software
mkdir /opt/module
cd /opt/software/ # 安裝文件所在目錄
wget https://openresty.org/download/openresty-1.9.7.4.tar.gz
tar -xzf openresty-1.9.7.4.tar.gz -C /opt/module/
cd /opt/module/openresty-1.9.7.4
#2.配置:
# 指定目錄為/opt/openresty,默認在/usr/local。
./configure --prefix=/opt/openresty \
--with-luajit \
--without-http_redis2_module \
--with-http_iconv_module
make
make install
3. 安裝lua-resty-kafka
因為我們需要通過(guò)nginx + lua腳本將數據轉發(fā)到Kafka,所以在編寫(xiě)lua腳本時(shí)需要在lua模塊中使用一些Kafka依賴(lài)項.
#下載lua-resty-kafka:
cd /opt/software/
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
unzip master.zip -d /opt/module/
#拷貝kafka相關(guān)依賴(lài)腳本到openresty
cp -rf /opt/module/lua-resty-kafka-master/lib/resty/kafka/ /opt/openresty/lualib/resty/
注意: 由于每個(gè)人都熟悉Kafka,因此這里不會(huì )介紹其安裝.
安裝Openresty之后,目錄結構如下:
drwxr-xr-x 2 root root 4096 Mar 24 14:26 bin
drwxr-xr-x 6 root root 4096 Mar 24 14:26 luajit
drwxr-xr-x 7 root root 4096 Mar 24 14:29 lualib
drwxr-xr-x 12 root root 4096 Mar 24 14:40 nginx
4. 配置文件
編輯/opt/openresty/nginx/conf/nginx.conf
user nginx; #Linux的用戶(hù)
worker_processes auto;
worker_rlimit_nofile 100000;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 102400;
multi_accept on;
use epoll;
}
http {
include mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
resolver 8.8.8.8;
#resolver 127.0.0.1 valid=3600s;
sendfile on;
keepalive_timeout 65;
underscores_in_headers on;
gzip on;
include /opt/openresty/nginx/conf/conf.d/common.conf; #common.conf這個(gè)文件名字可自定義
}
編輯/opt/openresty/nginx/conf/conf.d/common.conf
##api
lua_package_path "/opt/openresty/lualib/resty/kafka/?.lua;;";
lua_package_cpath "/opt/openresty/lualib/?.so;;";
lua_shared_dict ngx_cache 128m; # cache
lua_shared_dict cache_lock 100k; # lock for cache
server {
listen 8887; #*敏*感*詞*端口
server_name 192.168.3.215; #埋點(diǎn)日志的ip地址或域名,多個(gè)域名之間用空格分開(kāi)
root html; #root指令用于指定虛擬主機的網(wǎng)頁(yè)根目錄,這個(gè)目錄可以是相對路徑,也可以是絕對路徑。
lua_need_request_body on; #打開(kāi)獲取消息體的開(kāi)關(guān),以便能獲取到消息體
access_log /var/log/nginx/message.access.log main;
error_log /var/log/nginx/message.error.log notice;
location = /lzp/message {
lua_code_cache on;
charset utf-8;
default_type 'application/json';
content_by_lua_file "/opt/openresty/nginx/lua/testMessage_kafka.lua";#引用的lua腳本
}
}
編輯/opt/openresty/nginx/lua/testMessage_kafka.lua
#創(chuàng )建目錄mkdir /opt/openresty/nginx/lua/
vim /opt/openresty/nginx/lua/testMessage_kafka.lua<br />#編輯內存如下:
-- require需要resty.kafka.producer的lua腳本,沒(méi)有會(huì )報錯
local producer = require("resty.kafka.producer")
-- kafka的集群信息,單機也是可以的
local broker_list = {
{host = "192.168.3.215", port = 9092},
}
-- 定義最終kafka接受到的數據是怎樣的json格式
local log_json = {}
--增加read_body之后即可獲取到消息體,默認情況下可能會(huì )是nil
log_json["body"] = ngx.req.read_body()
log_json["body_data"] = ngx.req.get_body_data()
-- 定義kafka同步生產(chǎn)者,也可設置為異步 async
-- -- 注意?。?!當設置為異步時(shí),在測試環(huán)境需要修改batch_num,默認是200條,若大不到200條kafka端接受不到消息
-- -- encode()將log_json日志轉換為字符串
-- -- 發(fā)送日志消息,send配套之第一個(gè)參數topic:
-- -- 發(fā)送日志消息,send配套之第二個(gè)參數key,用于kafka路由控制:
-- -- key為nill(空)時(shí),一段時(shí)間向同一partition寫(xiě)入數據
-- -- 指定key,按照key的hash寫(xiě)入到對應的partition
-- -- batch_num修改為1方便測試
local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 })
-- local bp = producer:new(broker_list)
local cjson = require("cjson.safe")
local sendMsg = cjson.encode(log_json)
local ok, err = bp:send("testMessage",nil, sendMsg)
if not ok then
ngx.log(ngx.ERR, 'kafka send err:', err)
elseif ok then
ngx.say("the message send successful")
else
ngx.say("未知錯誤")
end
5. 開(kāi)始服務(wù)操作:
useradd nginx #創(chuàng )建用戶(hù)
passwd nginx #設置密碼
#設置openresty的所有者nginx
chown -R nginx:nginx /opt/openresty/
#啟動(dòng)服務(wù)
cd /opt/openresty/nginx/sbin
./nginx -c /opt/openresty/nginx/conf/nginx.conf
查看服務(wù):
ps -aux | grep nginx
nginx 2351 0.0 0.1 231052 46444 ? S Mar30 0:33 nginx: worker process
nginx 2352 0.0 0.1 233396 48540 ? S Mar30 0:35 nginx: worker process
nginx 2353 0.0 0.1 233396 48536 ? S Mar30 0:33 nginx: worker process
nginx 2354 0.0 0.1 232224 47464 ? S Mar30 0:34 nginx: worker process
nginx 2355 0.0 0.1 231052 46404 ? S Mar30 0:33 nginx: worker process
nginx 2356 0.0 0.1 232224 47460 ? S Mar30 0:34 nginx: worker process
nginx 2357 0.0 0.1 231052 46404 ? S Mar30 0:34 nginx: worker process
nginx 2358 0.0 0.1 232224 47484 ? S Mar30 0:34 nginx: worker process
root 7009 0.0 0.0 185492 2516 ? Ss Mar24 0:00 nginx: master process ./nginx -c /opt/openresty/nginx/conf/nginx.conf
查看端口:
netstat -anput | grep 8887
tcp 0 0 0.0.0.0:8887 0.0.0.0:* LISTEN 2351/nginx: worke
看到上述過(guò)程,就可以證明服務(wù)正常運行
6. 使用郵遞員發(fā)送發(fā)帖請求以進(jìn)行簡(jiǎn)單測試,以查看Kafka是否可以接受數據
7.kafka消費數據:
kafka-console-consumer --bootstrap-server 192.168.3.215:9092 --topic testMessage --from-beginning
<p>如果消耗了數據,則說(shuō)明配置成功. 如果未調整,則可以檢查與/var/log/nginx/message.access.log和/var/log/nginx/message.error.log相關(guān)的錯誤日志以進(jìn)行調整