最近中文字幕2019高清,亚洲人成高清在线播放,男生淦哭男生图片动漫有字,国产亚洲精品九九久在线观看,无码av专区丝袜专区

文章實(shí)時(shí)采集

文章實(shí)時(shí)采集

Flume和Kafka完成實(shí)時(shí)數據采集

采集交流 ? 優(yōu)采云 發(fā)表了文章 ? 0 個(gè)評論 ? 419 次瀏覽 ? 2020-08-06 16:17 ? 來(lái)自相關(guān)話(huà)題

  Flume和Kafka完成實(shí)時(shí)數據采集
  寫(xiě)在前面
  在生產(chǎn)環(huán)境中,通常將水槽和Kafka結合使用. 可以同時(shí)使用它們兩者來(lái)采集實(shí)時(shí)日志信息非常重要. 如果您不懂水槽和卡夫卡,可以先檢查一下我寫(xiě)的關(guān)于這兩部分的知識. 讓我們再次學(xué)習,這部分操作也是可能的.
  實(shí)時(shí)數據的采集面臨一個(gè)問(wèn)題. 我們如何生成實(shí)時(shí)數據源?因為我們可能想直接獲取實(shí)時(shí)數據流不是那么方便. 我之前寫(xiě)了一篇有關(guān)實(shí)時(shí)數據流的python生成器的文章,文章地址:
  您可以先看看如何生成實(shí)時(shí)數據...
  在想什么? ?如何開(kāi)始? ?
  分析: 我們可以從數據流開(kāi)始. 數據首先位于Web服務(wù)器中. 我們的訪(fǎng)問(wèn)日志由Nginx服務(wù)器實(shí)時(shí)采集到指定文件中. 我們從該文件采集日志數據,即: webserver => flume => kafka
  Web服務(wù)器日志存儲文件位置
  此文件的位置通常由我們自己設置
  我們的網(wǎng)絡(luò )日志的存儲目錄為:
  /home/hadoop/data/project/logs/access.log下面
  [hadoop@hadoop000 logs]$ pwd
/home/hadoop/data/project/logs
[hadoop@hadoop000 logs]$ ls
access.log
[hadoop@hadoop000 logs]$
  水槽
  做水槽實(shí)際上是寫(xiě)一個(gè)conf文件,所以面臨選擇的問(wèn)題
  來(lái)源選擇?頻道選擇?選擇接收器?
  在這里,我們選擇exec源存儲通道kafka sink
  怎么寫(xiě)?
  按照前面提到的步驟1234
  在官方網(wǎng)站上,我們可以找到應如何選擇:
  1)配置源
  exec來(lái)源
  # Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c
  2)配置頻道
  內存頻道
  a1.channels.c1.type = memory
  3)配置接收器
  卡夫卡水槽
  對于flume1.6版本,請參閱#kafka-sink
  a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1
  將以上三個(gè)組成部分組合在一起
  a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  我們的新文件稱(chēng)為test3.conf
  粘貼我們自己分析的代碼:
  [hadoop@hadoop000 conf]$ vim test3.conf
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c
a1.channels.c1.type = memory
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  我們不會(huì )從這里開(kāi)始,因為它涉及kafka的事情,我們必須首先部署kafka,
  Kafka部署
  如何部署Kafka? ?
  請訪(fǎng)問(wèn)官方網(wǎng)站,我們首先啟動(dòng)Zookeeper流程,然后才能啟動(dòng)kafka服務(wù)器
  第1步: 啟動(dòng)動(dòng)物園管理員
  [hadoop@hadoop000 ~]$
[hadoop@hadoop000 ~]$ jps
29147 Jps
[hadoop@hadoop000 ~]$ zkServer.sh start
JMX enabled by default
Using config: /home/hadoop/app/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@hadoop000 ~]$ jps
29172 QuorumPeerMain
29189 Jps
[hadoop@hadoop000 ~]$
  第2步: 啟動(dòng)服務(wù)器
  [hadoop@hadoop000 ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
#外開(kāi)一個(gè)窗口,查看jps
[hadoop@hadoop000 ~]$ jps
29330 Jps
29172 QuorumPeerMain
29229 Kafka
[hadoop@hadoop000 ~]$
  如果這部分不是很熟悉,您可以參考
  第3步: 創(chuàng )建主題
  [hadoop@hadoop000 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume_kafka
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "flume_kafka".
[hadoop@hadoop000 ~]$
  第4步: 啟動(dòng)上一個(gè)代理
   [hadoop@hadoop000 conf]$ flume-ng agent --name a1 --conf . --conf-file ./test3.conf -Dflume.root.logger=INFO,console
  第5步: 啟動(dòng)消費者
  kafka-console-consumer.sh --zookeeper hadoop000:2181 –topic flume-kafka
  執行上述第五步后,您將收到刷新屏幕的結果,哈哈哈! !
  
  上面的消費者將不斷刷新屏幕,這仍然非常有趣?。?!
  這里的使用者將接收到的數據放在屏幕上
  稍后,我們將介紹使用SparkStreaming作為使用者來(lái)實(shí)時(shí)接收數據,并對接收到的數據進(jìn)行簡(jiǎn)單的數據清理,并從隨機生成的日志中過(guò)濾出所需的數據... 查看全部

  Flume和Kafka完成實(shí)時(shí)數據采集
  寫(xiě)在前面
  在生產(chǎn)環(huán)境中,通常將水槽和Kafka結合使用. 可以同時(shí)使用它們兩者來(lái)采集實(shí)時(shí)日志信息非常重要. 如果您不懂水槽和卡夫卡,可以先檢查一下我寫(xiě)的關(guān)于這兩部分的知識. 讓我們再次學(xué)習,這部分操作也是可能的.
  實(shí)時(shí)數據的采集面臨一個(gè)問(wèn)題. 我們如何生成實(shí)時(shí)數據源?因為我們可能想直接獲取實(shí)時(shí)數據流不是那么方便. 我之前寫(xiě)了一篇有關(guān)實(shí)時(shí)數據流的python生成器的文章,文章地址:
  您可以先看看如何生成實(shí)時(shí)數據...
  在想什么? ?如何開(kāi)始? ?
  分析: 我們可以從數據流開(kāi)始. 數據首先位于Web服務(wù)器中. 我們的訪(fǎng)問(wèn)日志由Nginx服務(wù)器實(shí)時(shí)采集到指定文件中. 我們從該文件采集日志數據,即: webserver => flume => kafka
  Web服務(wù)器日志存儲文件位置
  此文件的位置通常由我們自己設置
  我們的網(wǎng)絡(luò )日志的存儲目錄為:
  /home/hadoop/data/project/logs/access.log下面
  [hadoop@hadoop000 logs]$ pwd
/home/hadoop/data/project/logs
[hadoop@hadoop000 logs]$ ls
access.log
[hadoop@hadoop000 logs]$
  水槽
  做水槽實(shí)際上是寫(xiě)一個(gè)conf文件,所以面臨選擇的問(wèn)題
  來(lái)源選擇?頻道選擇?選擇接收器?
  在這里,我們選擇exec源存儲通道kafka sink
  怎么寫(xiě)?
  按照前面提到的步驟1234
  在官方網(wǎng)站上,我們可以找到應如何選擇:
  1)配置源
  exec來(lái)源
  # Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c
  2)配置頻道
  內存頻道
  a1.channels.c1.type = memory
  3)配置接收器
  卡夫卡水槽
  對于flume1.6版本,請參閱#kafka-sink
  a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1
  將以上三個(gè)組成部分組合在一起
  a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  我們的新文件稱(chēng)為test3.conf
  粘貼我們自己分析的代碼:
  [hadoop@hadoop000 conf]$ vim test3.conf
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c
a1.channels.c1.type = memory
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  我們不會(huì )從這里開(kāi)始,因為它涉及kafka的事情,我們必須首先部署kafka,
  Kafka部署
  如何部署Kafka? ?
  請訪(fǎng)問(wèn)官方網(wǎng)站,我們首先啟動(dòng)Zookeeper流程,然后才能啟動(dòng)kafka服務(wù)器
  第1步: 啟動(dòng)動(dòng)物園管理員
  [hadoop@hadoop000 ~]$
[hadoop@hadoop000 ~]$ jps
29147 Jps
[hadoop@hadoop000 ~]$ zkServer.sh start
JMX enabled by default
Using config: /home/hadoop/app/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@hadoop000 ~]$ jps
29172 QuorumPeerMain
29189 Jps
[hadoop@hadoop000 ~]$
  第2步: 啟動(dòng)服務(wù)器
  [hadoop@hadoop000 ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
#外開(kāi)一個(gè)窗口,查看jps
[hadoop@hadoop000 ~]$ jps
29330 Jps
29172 QuorumPeerMain
29229 Kafka
[hadoop@hadoop000 ~]$
  如果這部分不是很熟悉,您可以參考
  第3步: 創(chuàng )建主題
  [hadoop@hadoop000 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume_kafka
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "flume_kafka".
[hadoop@hadoop000 ~]$
  第4步: 啟動(dòng)上一個(gè)代理
   [hadoop@hadoop000 conf]$ flume-ng agent --name a1 --conf . --conf-file ./test3.conf -Dflume.root.logger=INFO,console
  第5步: 啟動(dòng)消費者
  kafka-console-consumer.sh --zookeeper hadoop000:2181 –topic flume-kafka
  執行上述第五步后,您將收到刷新屏幕的結果,哈哈哈! !
  
  上面的消費者將不斷刷新屏幕,這仍然非常有趣?。?!
  這里的使用者將接收到的數據放在屏幕上
  稍后,我們將介紹使用SparkStreaming作為使用者來(lái)實(shí)時(shí)接收數據,并對接收到的數據進(jìn)行簡(jiǎn)單的數據清理,并從隨機生成的日志中過(guò)濾出所需的數據...

Filebeat實(shí)時(shí)采集Nginx日志

采集交流 ? 優(yōu)采云 發(fā)表了文章 ? 0 個(gè)評論 ? 634 次瀏覽 ? 2020-08-06 10:13 ? 來(lái)自相關(guān)話(huà)題

  說(shuō)明Filebeat版本為5.3.0
  之所以使用Beats系列的Filebeat代替Logstash的原因是,Logstash消耗了太多資源(請忽略那些擁有足夠服務(wù)器資源的用戶(hù))
  在官方網(wǎng)站上,Logstash的下載量為89M,而Filebeat的下載量?jì)H為840M,這很明顯
  Logstash可以配置jvm參數. 經(jīng)過(guò)我自己的調試,內存分配很小,啟動(dòng)緩慢,有時(shí)甚至根本無(wú)法啟動(dòng). 如果分配量很大,其他服務(wù)將沒(méi)有資源
  對于低配置服務(wù)器而言,選擇Filebeat是最佳選擇,并且既然Filebeat已開(kāi)始取代Logstash,仍然有必要修改nginx日志格式nginx.config
  更改日志記錄的格式
   log_format json '{ "@timestamp": "$time_iso8601", '
'"time": "$time_iso8601", '
'"remote_addr": "$remote_addr", '
'"remote_user": "$remote_user", '
'"body_bytes_sent": "$body_bytes_sent", '
'"request_time": "$request_time", '
'"status": "$status", '
'"host": "$host", '
'"request": "$request", '
'"request_method": "$request_method", '
'"uri": "$uri", '
'"http_referrer": "$http_referer", '
'"body_bytes_sent":"$body_bytes_sent", '
'"http_x_forwarded_for": "$http_x_forwarded_for", '
'"http_user_agent": "$http_user_agent" '
'}';
access_log /var/log/nginx/access.log json;
  filebeat.yml
   #=========================== Filebeat prospectors =============================
filebeat.prospectors:
- input_type: log
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/nginx/*access*.log
json.keys_under_root: true
json.overwrite_keys: true
#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["ip:port","ip:port"]
index: "filebeat_server_nginx_%{+YYYY-MM}"
  這里要注意的是
  json.keys_under_root: 默認值為FALSE,這意味著(zhù)我們的json日志將在解析后放置在json密鑰上. 設置為T(mén)RUE,所有鍵都將放置在根節點(diǎn)中
  json.overwrite_keys: 是否覆蓋原創(chuàng )密鑰,這是密鑰配置. 將keys_under_root設置為T(mén)RUE之后,然后將overwrite_keys設置為T(mén)RUE以覆蓋filebeat默認密鑰值
  還有其他配置
  json.add_error_key: 添加json_error密鑰以記錄json解析失敗錯誤
  json.message_key: 指定解析后放置json日志的鍵,默認為json,還可以指定日志等.
  坦率地說(shuō),區別在于配置前的elasticsearch數據如下:
   {
"_index": "filebeat_server_nginx_2018-05",
"_type": "log",
"_id": "AWM9sVOkCcRcg0IPg399",
"_version": 1,
"_score": 1,
"_source": {
"@timestamp": "2018-05-08T03:00:17.544Z",
"beat": {
"hostname": "VM_252_18_centos",
"name": "VM_252_18_centos",
"version": "5.3.0"
},
"input_type": "log",
"json": {},
"message": "{ "@timestamp": "2018-05-08T11:00:11+08:00", "time": "2018-05-08T11:00:11+08:00", "remote_addr": "113.16.251.67", "remote_user": "-", "body_bytes_sent": "403", "request_time": "0.000", "status": "200", "host": "blog.joylau.cn", "request": "GET /img/%E7%BD%91%E6%98%93%E4%BA%91%E9%9F%B3%E4%B9%90.png HTTP/1.1", "request_method": "GET", "uri": "/img/\xE7\xBD\x91\xE6\x98\x93\xE4\xBA\x91\xE9\x9F\xB3\xE4\xB9\x90.png", "http_referrer": "http://blog.joylau.cn/css/style.css", "body_bytes_sent":"403", "http_x_forwarded_for": "-", "http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36" }",
"offset": 7633,
"source": "/var/log/nginx/access.log",
"type": "log"
}
}
  配置后,它看起來(lái)像這樣:
   {
"_index": "filebeat_server_nginx_2018-05",
"_type": "log",
"_id": "AWM9rjLd8mVZNgvhdnN9",
"_version": 1,
"_score": 1,
"_source": {
"@timestamp": "2018-05-08T02:56:50.000Z",
"beat": {
"hostname": "VM_252_18_centos",
"name": "VM_252_18_centos",
"version": "5.3.0"
},
"body_bytes_sent": "12576",
"host": "blog.joylau.cn",
"http_referrer": "http://blog.joylau.cn/",
"http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36",
"http_x_forwarded_for": "-",
"input_type": "log",
"offset": 3916,
"remote_addr": "60.166.12.138",
"remote_user": "-",
"request": "GET /2018/03/01/JDK8-Stream-Distinct/ HTTP/1.1",
"request_method": "GET",
"request_time": "0.000",
"source": "/var/log/nginx/access.log",
"status": "200",
"time": "2018-05-08T10:56:50+08:00",
"type": "log",
"uri": "/2018/03/01/JDK8-Stream-Distinct/index.html"
}
}
  這樣看起來(lái)很舒服
  啟動(dòng)FileBeat
  進(jìn)入Filebeat目錄
   nohup sudo ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &
  更新
  如果nginx日志收錄中文,則中文將轉換為Unicode編碼. 如果沒(méi)有,只需添加escape = json參數.
   log_format json escape=json '{ "@timestamp": "$time_iso8601", '
'"time": "$time_iso8601", '
'"remote_addr": "$remote_addr", '
'"remote_user": "$remote_user", '
'"body_bytes_sent": "$body_bytes_sent", '
'"request_time": "$request_time", '
'"status": "$status", '
'"host": "$host", '
'"request": "$request", '
'"request_method": "$request_method", '
'"uri": "$uri", '
'"http_referrer": "$http_referer", '
'"body_bytes_sent":"$body_bytes_sent", '
'"http_x_forwarded_for": "$http_x_forwarded_for", '
'"http_user_agent": "$http_user_agent" '
'}';
access_log /var/log/nginx/access.log json;
  消息 查看全部

  說(shuō)明Filebeat版本為5.3.0
  之所以使用Beats系列的Filebeat代替Logstash的原因是,Logstash消耗了太多資源(請忽略那些擁有足夠服務(wù)器資源的用戶(hù))
  在官方網(wǎng)站上,Logstash的下載量為89M,而Filebeat的下載量?jì)H為840M,這很明顯
  Logstash可以配置jvm參數. 經(jīng)過(guò)我自己的調試,內存分配很小,啟動(dòng)緩慢,有時(shí)甚至根本無(wú)法啟動(dòng). 如果分配量很大,其他服務(wù)將沒(méi)有資源
  對于低配置服務(wù)器而言,選擇Filebeat是最佳選擇,并且既然Filebeat已開(kāi)始取代Logstash,仍然有必要修改nginx日志格式nginx.config
  更改日志記錄的格式
   log_format json '{ "@timestamp": "$time_iso8601", '
'"time": "$time_iso8601", '
'"remote_addr": "$remote_addr", '
'"remote_user": "$remote_user", '
'"body_bytes_sent": "$body_bytes_sent", '
'"request_time": "$request_time", '
'"status": "$status", '
'"host": "$host", '
'"request": "$request", '
'"request_method": "$request_method", '
'"uri": "$uri", '
'"http_referrer": "$http_referer", '
'"body_bytes_sent":"$body_bytes_sent", '
'"http_x_forwarded_for": "$http_x_forwarded_for", '
'"http_user_agent": "$http_user_agent" '
'}';
access_log /var/log/nginx/access.log json;
  filebeat.yml
   #=========================== Filebeat prospectors =============================
filebeat.prospectors:
- input_type: log
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/nginx/*access*.log
json.keys_under_root: true
json.overwrite_keys: true
#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["ip:port","ip:port"]
index: "filebeat_server_nginx_%{+YYYY-MM}"
  這里要注意的是
  json.keys_under_root: 默認值為FALSE,這意味著(zhù)我們的json日志將在解析后放置在json密鑰上. 設置為T(mén)RUE,所有鍵都將放置在根節點(diǎn)中
  json.overwrite_keys: 是否覆蓋原創(chuàng )密鑰,這是密鑰配置. 將keys_under_root設置為T(mén)RUE之后,然后將overwrite_keys設置為T(mén)RUE以覆蓋filebeat默認密鑰值
  還有其他配置
  json.add_error_key: 添加json_error密鑰以記錄json解析失敗錯誤
  json.message_key: 指定解析后放置json日志的鍵,默認為json,還可以指定日志等.
  坦率地說(shuō),區別在于配置前的elasticsearch數據如下:
   {
"_index": "filebeat_server_nginx_2018-05",
"_type": "log",
"_id": "AWM9sVOkCcRcg0IPg399",
"_version": 1,
"_score": 1,
"_source": {
"@timestamp": "2018-05-08T03:00:17.544Z",
"beat": {
"hostname": "VM_252_18_centos",
"name": "VM_252_18_centos",
"version": "5.3.0"
},
"input_type": "log",
"json": {},
"message": "{ "@timestamp": "2018-05-08T11:00:11+08:00", "time": "2018-05-08T11:00:11+08:00", "remote_addr": "113.16.251.67", "remote_user": "-", "body_bytes_sent": "403", "request_time": "0.000", "status": "200", "host": "blog.joylau.cn", "request": "GET /img/%E7%BD%91%E6%98%93%E4%BA%91%E9%9F%B3%E4%B9%90.png HTTP/1.1", "request_method": "GET", "uri": "/img/\xE7\xBD\x91\xE6\x98\x93\xE4\xBA\x91\xE9\x9F\xB3\xE4\xB9\x90.png", "http_referrer": "http://blog.joylau.cn/css/style.css", "body_bytes_sent":"403", "http_x_forwarded_for": "-", "http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36" }",
"offset": 7633,
"source": "/var/log/nginx/access.log",
"type": "log"
}
}
  配置后,它看起來(lái)像這樣:
   {
"_index": "filebeat_server_nginx_2018-05",
"_type": "log",
"_id": "AWM9rjLd8mVZNgvhdnN9",
"_version": 1,
"_score": 1,
"_source": {
"@timestamp": "2018-05-08T02:56:50.000Z",
"beat": {
"hostname": "VM_252_18_centos",
"name": "VM_252_18_centos",
"version": "5.3.0"
},
"body_bytes_sent": "12576",
"host": "blog.joylau.cn",
"http_referrer": "http://blog.joylau.cn/",
"http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36",
"http_x_forwarded_for": "-",
"input_type": "log",
"offset": 3916,
"remote_addr": "60.166.12.138",
"remote_user": "-",
"request": "GET /2018/03/01/JDK8-Stream-Distinct/ HTTP/1.1",
"request_method": "GET",
"request_time": "0.000",
"source": "/var/log/nginx/access.log",
"status": "200",
"time": "2018-05-08T10:56:50+08:00",
"type": "log",
"uri": "/2018/03/01/JDK8-Stream-Distinct/index.html"
}
}
  這樣看起來(lái)很舒服
  啟動(dòng)FileBeat
  進(jìn)入Filebeat目錄
   nohup sudo ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &
  更新
  如果nginx日志收錄中文,則中文將轉換為Unicode編碼. 如果沒(méi)有,只需添加escape = json參數.
   log_format json escape=json '{ "@timestamp": "$time_iso8601", '
'"time": "$time_iso8601", '
'"remote_addr": "$remote_addr", '
'"remote_user": "$remote_user", '
'"body_bytes_sent": "$body_bytes_sent", '
'"request_time": "$request_time", '
'"status": "$status", '
'"host": "$host", '
'"request": "$request", '
'"request_method": "$request_method", '
'"uri": "$uri", '
'"http_referrer": "$http_referer", '
'"body_bytes_sent":"$body_bytes_sent", '
'"http_x_forwarded_for": "$http_x_forwarded_for", '
'"http_user_agent": "$http_user_agent" '
'}';
access_log /var/log/nginx/access.log json;
  消息

Openresty + Lua + Kafka實(shí)現實(shí)時(shí)日志采集

采集交流 ? 優(yōu)采云 發(fā)表了文章 ? 0 個(gè)評論 ? 575 次瀏覽 ? 2020-08-06 09:06 ? 來(lái)自相關(guān)話(huà)題

  簡(jiǎn)介
  在許多數據采集方案中,Flume是高性能的日志采集工具. 我相信每個(gè)人都知道. 許多人認為Flume是一個(gè)組件,可以將它們中的大多數與Flume和Kafka的組合相關(guān)聯(lián)以進(jìn)行日志采集. 該解決方案具有許多優(yōu)勢,例如高性能,高吞吐量和數據可靠性. 但是,如果我們需要實(shí)時(shí)采集日志,顯然這不是一個(gè)好的解決方案. 原因如下:
  目前,Flume可以支持對目錄中數據文件的實(shí)時(shí)監視. 某個(gè)目錄的文件采集完成后,將使用完成的符號進(jìn)行標記. 如果以后有數據輸入此文件,則不會(huì )檢測到Flume.
  因此,我們通常使用這種方案進(jìn)行計時(shí)采集. 只要生成新的數據目錄,我們就會(huì )將數據文件采集到該目錄中.
  然后,本文將向您介紹基于Openresty + Lua + Kafka的實(shí)時(shí)日志采集.
  要求
  很多時(shí)候,我們需要實(shí)時(shí)采集用戶(hù)的掩埋點(diǎn)數據,然后使用這些數據對用戶(hù)的行為進(jìn)行一些實(shí)時(shí)分析. 因此,第一步當然是解決如何實(shí)時(shí)采集數據.
  我們在這里使用的解決方案是Openresty + Lua + Kafka.
  原理介紹
  那么什么是Openresty?這是官方報價(jià):
  OpenResty是基于Nginx和Lua的高性能Web平臺. 它集成了許多復雜的Lua庫,第三方模塊及其大多數依賴(lài)項. 它用于方便地構建可處理超高并發(fā)性和高可伸縮性的動(dòng)態(tài)Web應用程序,Web服務(wù)和動(dòng)態(tài)網(wǎng)關(guān).
  OpenResty通過(guò)融合各種精心設計的Nginx模塊,有效地將Nginx變成了功能強大的通用Web應用程序平臺. 這樣,Web開(kāi)發(fā)人員和系統工程師可以使用Lu腳本語(yǔ)言來(lái)調動(dòng)Nginx支持的各種C和Lua模塊,并快速構建一個(gè)具有10K甚至1000個(gè)以上的單機并發(fā)連接的高性能Web應用程序系統.
  OpenResty的目標是使您的Web服務(wù)直接在Nginx服務(wù)內部運行,充分利用Nginx的非阻塞I / O模型,不僅用于HTTP客戶(hù)端請求,甚至用于遠程后端(例如MySQL,PostgreSQL) ,Memcached和Redis等具有一致的高性能響應.
  簡(jiǎn)單來(lái)說(shuō),就是通過(guò)Nginx發(fā)送客戶(hù)端的請求(本文指的是用戶(hù)的行為日志),以將用戶(hù)的數據傳遞到我們指定的位置(卡夫卡),為了達到這一要求,我們使用Lua腳本,因為Openresty封裝了各種Lua模塊,其中之一是子安裝Kafka模塊,所以我們只需要編寫(xiě)一個(gè)簡(jiǎn)單的腳本即可通過(guò)Nginx將用戶(hù)數據轉發(fā)到Kafka,以便隨后使用數據.
  以下是供大家理解的體系結構圖:
  
  以下是使用Openresty + Lua + Kafka的優(yōu)點(diǎn)的簡(jiǎn)要摘要:
  1. 支持多種業(yè)務(wù)數據,不同的業(yè)務(wù)數據,只需要配置不同的Lua腳本,就可以將不同的業(yè)務(wù)數據發(fā)送到Kafka中的不同主題.
  2. 實(shí)時(shí)采集用戶(hù)觸發(fā)的埋藏點(diǎn)數據
  3. 高度可靠的集群. 由于Openresty基于Nginx,因此其群集具有非常高的性能和穩定性.
  4. 高并發(fā). 與tomcat,apache和其他Web服務(wù)器相比,Nginx的并發(fā)性比其他兩個(gè)要高得多. 在正常情況下處理數萬(wàn)個(gè)并發(fā)并不難.
  接下來(lái)讓我們做一些實(shí)際的工作.
  安裝Openresty
  此示例使用獨立部署表單. 成功完成獨立部署后,將在獨立計算機上構建集群,只是在不同的計算機上執行相同的步驟.
  注意: 本實(shí)驗基于centos7.0操作系統
  1. 下載Openresty依賴(lài)項:
  yum install readline-devel pcre-devel openssl-devel gcc
  2. 編譯并安裝Openresty:
  #1.安裝openresty:
mkdir /opt/software
mkdir /opt/module
cd /opt/software/ # 安裝文件所在目錄
wget https://openresty.org/download ... ar.gz
tar -xzf openresty-1.9.7.4.tar.gz -C /opt/module/
cd /opt/module/openresty-1.9.7.4
#2.配置:
# 指定目錄為/opt/openresty,默認在/usr/local。
./configure --prefix=/opt/openresty \
--with-luajit \
--without-http_redis2_module \
--with-http_iconv_module
make
make install
  3. 安裝lua-resty-kafka
  因為我們需要通過(guò)nginx + lua腳本將數據轉發(fā)到Kafka,所以在編寫(xiě)lua腳本時(shí)需要在lua模塊中使用一些Kafka依賴(lài)項.
  #下載lua-resty-kafka:
cd /opt/software/
wget https://github.com/doujiang24/ ... r.zip
unzip master.zip -d /opt/module/

#拷貝kafka相關(guān)依賴(lài)腳本到openresty
cp -rf /opt/module/lua-resty-kafka-master/lib/resty/kafka/ /opt/openresty/lualib/resty/
  注意: 由于每個(gè)人都熟悉Kafka,因此這里不會(huì )介紹其安裝.
  安裝Openresty之后,目錄結構如下:
  drwxr-xr-x 2 root root 4096 Mar 24 14:26 bin
drwxr-xr-x 6 root root 4096 Mar 24 14:26 luajit
drwxr-xr-x 7 root root 4096 Mar 24 14:29 lualib
drwxr-xr-x 12 root root 4096 Mar 24 14:40 nginx
  4. 配置文件
  編輯/opt/openresty/nginx/conf/nginx.conf
  user nginx; #Linux的用戶(hù)
worker_processes auto;
worker_rlimit_nofile 100000;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 102400;
multi_accept on;
use epoll;
}
http {
include mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
resolver 8.8.8.8;
#resolver 127.0.0.1 valid=3600s;
sendfile on;
keepalive_timeout 65;
underscores_in_headers on;
gzip on;
include /opt/openresty/nginx/conf/conf.d/common.conf; #common.conf這個(gè)文件名字可自定義
}
  編輯/opt/openresty/nginx/conf/conf.d/common.conf
  ##api
lua_package_path "/opt/openresty/lualib/resty/kafka/?.lua;;";
lua_package_cpath "/opt/openresty/lualib/?.so;;";
lua_shared_dict ngx_cache 128m; # cache
lua_shared_dict cache_lock 100k; # lock for cache
server {
listen 8887; #監聽(tīng)端口
server_name 192.168.3.215; #埋點(diǎn)日志的ip地址或域名,多個(gè)域名之間用空格分開(kāi)
root html; #root指令用于指定虛擬主機的網(wǎng)頁(yè)根目錄,這個(gè)目錄可以是相對路徑,也可以是絕對路徑。
lua_need_request_body on; #打開(kāi)獲取消息體的開(kāi)關(guān),以便能獲取到消息體
access_log /var/log/nginx/message.access.log main;
error_log /var/log/nginx/message.error.log notice;
location = /lzp/message {
lua_code_cache on;
charset utf-8;
default_type 'application/json';
content_by_lua_file "/opt/openresty/nginx/lua/testMessage_kafka.lua";#引用的lua腳本
}
}
  編輯/opt/openresty/nginx/lua/testMessage_kafka.lua
  #創(chuàng )建目錄mkdir /opt/openresty/nginx/lua/
vim /opt/openresty/nginx/lua/testMessage_kafka.lua<br />#編輯內存如下:
  -- require需要resty.kafka.producer的lua腳本,沒(méi)有會(huì )報錯
local producer = require("resty.kafka.producer")
-- kafka的集群信息,單機也是可以的
local broker_list = {
{host = "192.168.3.215", port = 9092},
}
-- 定義最終kafka接受到的數據是怎樣的json格式
local log_json = {}
--增加read_body之后即可獲取到消息體,默認情況下可能會(huì )是nil
log_json["body"] = ngx.req.read_body()
log_json["body_data"] = ngx.req.get_body_data()
-- 定義kafka同步生產(chǎn)者,也可設置為異步 async
-- -- 注意?。?!當設置為異步時(shí),在測試環(huán)境需要修改batch_num,默認是200條,若大不到200條kafka端接受不到消息
-- -- encode()將log_json日志轉換為字符串
-- -- 發(fā)送日志消息,send配套之第一個(gè)參數topic:
-- -- 發(fā)送日志消息,send配套之第二個(gè)參數key,用于kafka路由控制:
-- -- key為nill(空)時(shí),一段時(shí)間向同一partition寫(xiě)入數據
-- -- 指定key,按照key的hash寫(xiě)入到對應的partition
-- -- batch_num修改為1方便測試
local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 })
-- local bp = producer:new(broker_list)
local cjson = require("cjson.safe")
local sendMsg = cjson.encode(log_json)
local ok, err = bp:send("testMessage",nil, sendMsg)
if not ok then
ngx.log(ngx.ERR, 'kafka send err:', err)
elseif ok then
ngx.say("the message send successful")
else
ngx.say("未知錯誤")
end
  5. 開(kāi)始服務(wù)操作:
  useradd nginx #創(chuàng )建用戶(hù)
passwd nginx #設置密碼
#設置openresty的所有者nginx
chown -R nginx:nginx /opt/openresty/
#啟動(dòng)服務(wù)
cd /opt/openresty/nginx/sbin
./nginx -c /opt/openresty/nginx/conf/nginx.conf
查看服務(wù):
ps -aux | grep nginx
nginx 2351 0.0 0.1 231052 46444 ? S Mar30 0:33 nginx: worker process
nginx 2352 0.0 0.1 233396 48540 ? S Mar30 0:35 nginx: worker process
nginx 2353 0.0 0.1 233396 48536 ? S Mar30 0:33 nginx: worker process
nginx 2354 0.0 0.1 232224 47464 ? S Mar30 0:34 nginx: worker process
nginx 2355 0.0 0.1 231052 46404 ? S Mar30 0:33 nginx: worker process
nginx 2356 0.0 0.1 232224 47460 ? S Mar30 0:34 nginx: worker process
nginx 2357 0.0 0.1 231052 46404 ? S Mar30 0:34 nginx: worker process
nginx 2358 0.0 0.1 232224 47484 ? S Mar30 0:34 nginx: worker process
root 7009 0.0 0.0 185492 2516 ? Ss Mar24 0:00 nginx: master process ./nginx -c /opt/openresty/nginx/conf/nginx.conf
查看端口:
netstat -anput | grep 8887
tcp 0 0 0.0.0.0:8887 0.0.0.0:* LISTEN 2351/nginx: worke
  看到上述過(guò)程,就可以證明服務(wù)正常運行
  6. 使用郵遞員發(fā)送發(fā)帖請求以進(jìn)行簡(jiǎn)單測試,以查看Kafka是否可以接受數據
  
  7.kafka消費數據:
  kafka-console-consumer --bootstrap-server 192.168.3.215:9092 --topic testMessage --from-beginning
<p>如果消耗了數據,則說(shuō)明配置成功. 如果未調整,則可以檢查與/var/log/nginx/message.access.log和/var/log/nginx/message.error.log相關(guān)的錯誤日志以進(jìn)行調整 查看全部

  簡(jiǎn)介
  在許多數據采集方案中,Flume是高性能的日志采集工具. 我相信每個(gè)人都知道. 許多人認為Flume是一個(gè)組件,可以將它們中的大多數與Flume和Kafka的組合相關(guān)聯(lián)以進(jìn)行日志采集. 該解決方案具有許多優(yōu)勢,例如高性能,高吞吐量和數據可靠性. 但是,如果我們需要實(shí)時(shí)采集日志,顯然這不是一個(gè)好的解決方案. 原因如下:
  目前,Flume可以支持對目錄中數據文件的實(shí)時(shí)監視. 某個(gè)目錄的文件采集完成后,將使用完成的符號進(jìn)行標記. 如果以后有數據輸入此文件,則不會(huì )檢測到Flume.
  因此,我們通常使用這種方案進(jìn)行計時(shí)采集. 只要生成新的數據目錄,我們就會(huì )將數據文件采集到該目錄中.
  然后,本文將向您介紹基于Openresty + Lua + Kafka的實(shí)時(shí)日志采集.
  要求
  很多時(shí)候,我們需要實(shí)時(shí)采集用戶(hù)的掩埋點(diǎn)數據,然后使用這些數據對用戶(hù)的行為進(jìn)行一些實(shí)時(shí)分析. 因此,第一步當然是解決如何實(shí)時(shí)采集數據.
  我們在這里使用的解決方案是Openresty + Lua + Kafka.
  原理介紹
  那么什么是Openresty?這是官方報價(jià):
  OpenResty是基于Nginx和Lua的高性能Web平臺. 它集成了許多復雜的Lua庫,第三方模塊及其大多數依賴(lài)項. 它用于方便地構建可處理超高并發(fā)性和高可伸縮性的動(dòng)態(tài)Web應用程序,Web服務(wù)和動(dòng)態(tài)網(wǎng)關(guān).
  OpenResty通過(guò)融合各種精心設計的Nginx模塊,有效地將Nginx變成了功能強大的通用Web應用程序平臺. 這樣,Web開(kāi)發(fā)人員和系統工程師可以使用Lu腳本語(yǔ)言來(lái)調動(dòng)Nginx支持的各種C和Lua模塊,并快速構建一個(gè)具有10K甚至1000個(gè)以上的單機并發(fā)連接的高性能Web應用程序系統.
  OpenResty的目標是使您的Web服務(wù)直接在Nginx服務(wù)內部運行,充分利用Nginx的非阻塞I / O模型,不僅用于HTTP客戶(hù)端請求,甚至用于遠程后端(例如MySQL,PostgreSQL) ,Memcached和Redis等具有一致的高性能響應.
  簡(jiǎn)單來(lái)說(shuō),就是通過(guò)Nginx發(fā)送客戶(hù)端的請求(本文指的是用戶(hù)的行為日志),以將用戶(hù)的數據傳遞到我們指定的位置(卡夫卡),為了達到這一要求,我們使用Lua腳本,因為Openresty封裝了各種Lua模塊,其中之一是子安裝Kafka模塊,所以我們只需要編寫(xiě)一個(gè)簡(jiǎn)單的腳本即可通過(guò)Nginx將用戶(hù)數據轉發(fā)到Kafka,以便隨后使用數據.
  以下是供大家理解的體系結構圖:
  
  以下是使用Openresty + Lua + Kafka的優(yōu)點(diǎn)的簡(jiǎn)要摘要:
  1. 支持多種業(yè)務(wù)數據,不同的業(yè)務(wù)數據,只需要配置不同的Lua腳本,就可以將不同的業(yè)務(wù)數據發(fā)送到Kafka中的不同主題.
  2. 實(shí)時(shí)采集用戶(hù)觸發(fā)的埋藏點(diǎn)數據
  3. 高度可靠的集群. 由于Openresty基于Nginx,因此其群集具有非常高的性能和穩定性.
  4. 高并發(fā). 與tomcat,apache和其他Web服務(wù)器相比,Nginx的并發(fā)性比其他兩個(gè)要高得多. 在正常情況下處理數萬(wàn)個(gè)并發(fā)并不難.
  接下來(lái)讓我們做一些實(shí)際的工作.
  安裝Openresty
  此示例使用獨立部署表單. 成功完成獨立部署后,將在獨立計算機上構建集群,只是在不同的計算機上執行相同的步驟.
  注意: 本實(shí)驗基于centos7.0操作系統
  1. 下載Openresty依賴(lài)項:
  yum install readline-devel pcre-devel openssl-devel gcc
  2. 編譯并安裝Openresty:
  #1.安裝openresty:
mkdir /opt/software
mkdir /opt/module
cd /opt/software/ # 安裝文件所在目錄
wget https://openresty.org/download ... ar.gz
tar -xzf openresty-1.9.7.4.tar.gz -C /opt/module/
cd /opt/module/openresty-1.9.7.4
#2.配置:
# 指定目錄為/opt/openresty,默認在/usr/local。
./configure --prefix=/opt/openresty \
--with-luajit \
--without-http_redis2_module \
--with-http_iconv_module
make
make install
  3. 安裝lua-resty-kafka
  因為我們需要通過(guò)nginx + lua腳本將數據轉發(fā)到Kafka,所以在編寫(xiě)lua腳本時(shí)需要在lua模塊中使用一些Kafka依賴(lài)項.
  #下載lua-resty-kafka:
cd /opt/software/
wget https://github.com/doujiang24/ ... r.zip
unzip master.zip -d /opt/module/

#拷貝kafka相關(guān)依賴(lài)腳本到openresty
cp -rf /opt/module/lua-resty-kafka-master/lib/resty/kafka/ /opt/openresty/lualib/resty/
  注意: 由于每個(gè)人都熟悉Kafka,因此這里不會(huì )介紹其安裝.
  安裝Openresty之后,目錄結構如下:
  drwxr-xr-x 2 root root 4096 Mar 24 14:26 bin
drwxr-xr-x 6 root root 4096 Mar 24 14:26 luajit
drwxr-xr-x 7 root root 4096 Mar 24 14:29 lualib
drwxr-xr-x 12 root root 4096 Mar 24 14:40 nginx
  4. 配置文件
  編輯/opt/openresty/nginx/conf/nginx.conf
  user nginx; #Linux的用戶(hù)
worker_processes auto;
worker_rlimit_nofile 100000;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 102400;
multi_accept on;
use epoll;
}
http {
include mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
resolver 8.8.8.8;
#resolver 127.0.0.1 valid=3600s;
sendfile on;
keepalive_timeout 65;
underscores_in_headers on;
gzip on;
include /opt/openresty/nginx/conf/conf.d/common.conf; #common.conf這個(gè)文件名字可自定義
}
  編輯/opt/openresty/nginx/conf/conf.d/common.conf
  ##api
lua_package_path "/opt/openresty/lualib/resty/kafka/?.lua;;";
lua_package_cpath "/opt/openresty/lualib/?.so;;";
lua_shared_dict ngx_cache 128m; # cache
lua_shared_dict cache_lock 100k; # lock for cache
server {
listen 8887; #監聽(tīng)端口
server_name 192.168.3.215; #埋點(diǎn)日志的ip地址或域名,多個(gè)域名之間用空格分開(kāi)
root html; #root指令用于指定虛擬主機的網(wǎng)頁(yè)根目錄,這個(gè)目錄可以是相對路徑,也可以是絕對路徑。
lua_need_request_body on; #打開(kāi)獲取消息體的開(kāi)關(guān),以便能獲取到消息體
access_log /var/log/nginx/message.access.log main;
error_log /var/log/nginx/message.error.log notice;
location = /lzp/message {
lua_code_cache on;
charset utf-8;
default_type 'application/json';
content_by_lua_file "/opt/openresty/nginx/lua/testMessage_kafka.lua";#引用的lua腳本
}
}
  編輯/opt/openresty/nginx/lua/testMessage_kafka.lua
  #創(chuàng )建目錄mkdir /opt/openresty/nginx/lua/
vim /opt/openresty/nginx/lua/testMessage_kafka.lua<br />#編輯內存如下:
  -- require需要resty.kafka.producer的lua腳本,沒(méi)有會(huì )報錯
local producer = require("resty.kafka.producer")
-- kafka的集群信息,單機也是可以的
local broker_list = {
{host = "192.168.3.215", port = 9092},
}
-- 定義最終kafka接受到的數據是怎樣的json格式
local log_json = {}
--增加read_body之后即可獲取到消息體,默認情況下可能會(huì )是nil
log_json["body"] = ngx.req.read_body()
log_json["body_data"] = ngx.req.get_body_data()
-- 定義kafka同步生產(chǎn)者,也可設置為異步 async
-- -- 注意?。?!當設置為異步時(shí),在測試環(huán)境需要修改batch_num,默認是200條,若大不到200條kafka端接受不到消息
-- -- encode()將log_json日志轉換為字符串
-- -- 發(fā)送日志消息,send配套之第一個(gè)參數topic:
-- -- 發(fā)送日志消息,send配套之第二個(gè)參數key,用于kafka路由控制:
-- -- key為nill(空)時(shí),一段時(shí)間向同一partition寫(xiě)入數據
-- -- 指定key,按照key的hash寫(xiě)入到對應的partition
-- -- batch_num修改為1方便測試
local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 })
-- local bp = producer:new(broker_list)
local cjson = require("cjson.safe")
local sendMsg = cjson.encode(log_json)
local ok, err = bp:send("testMessage",nil, sendMsg)
if not ok then
ngx.log(ngx.ERR, 'kafka send err:', err)
elseif ok then
ngx.say("the message send successful")
else
ngx.say("未知錯誤")
end
  5. 開(kāi)始服務(wù)操作:
  useradd nginx #創(chuàng )建用戶(hù)
passwd nginx #設置密碼
#設置openresty的所有者nginx
chown -R nginx:nginx /opt/openresty/
#啟動(dòng)服務(wù)
cd /opt/openresty/nginx/sbin
./nginx -c /opt/openresty/nginx/conf/nginx.conf
查看服務(wù):
ps -aux | grep nginx
nginx 2351 0.0 0.1 231052 46444 ? S Mar30 0:33 nginx: worker process
nginx 2352 0.0 0.1 233396 48540 ? S Mar30 0:35 nginx: worker process
nginx 2353 0.0 0.1 233396 48536 ? S Mar30 0:33 nginx: worker process
nginx 2354 0.0 0.1 232224 47464 ? S Mar30 0:34 nginx: worker process
nginx 2355 0.0 0.1 231052 46404 ? S Mar30 0:33 nginx: worker process
nginx 2356 0.0 0.1 232224 47460 ? S Mar30 0:34 nginx: worker process
nginx 2357 0.0 0.1 231052 46404 ? S Mar30 0:34 nginx: worker process
nginx 2358 0.0 0.1 232224 47484 ? S Mar30 0:34 nginx: worker process
root 7009 0.0 0.0 185492 2516 ? Ss Mar24 0:00 nginx: master process ./nginx -c /opt/openresty/nginx/conf/nginx.conf
查看端口:
netstat -anput | grep 8887
tcp 0 0 0.0.0.0:8887 0.0.0.0:* LISTEN 2351/nginx: worke
  看到上述過(guò)程,就可以證明服務(wù)正常運行
  6. 使用郵遞員發(fā)送發(fā)帖請求以進(jìn)行簡(jiǎn)單測試,以查看Kafka是否可以接受數據
  
  7.kafka消費數據:
  kafka-console-consumer --bootstrap-server 192.168.3.215:9092 --topic testMessage --from-beginning
<p>如果消耗了數據,則說(shuō)明配置成功. 如果未調整,則可以檢查與/var/log/nginx/message.access.log和/var/log/nginx/message.error.log相關(guān)的錯誤日志以進(jìn)行調整

Flume和Kafka完成實(shí)時(shí)數據采集

采集交流 ? 優(yōu)采云 發(fā)表了文章 ? 0 個(gè)評論 ? 419 次瀏覽 ? 2020-08-06 16:17 ? 來(lái)自相關(guān)話(huà)題

  Flume和Kafka完成實(shí)時(shí)數據采集
  寫(xiě)在前面
  在生產(chǎn)環(huán)境中,通常將水槽和Kafka結合使用. 可以同時(shí)使用它們兩者來(lái)采集實(shí)時(shí)日志信息非常重要. 如果您不懂水槽和卡夫卡,可以先檢查一下我寫(xiě)的關(guān)于這兩部分的知識. 讓我們再次學(xué)習,這部分操作也是可能的.
  實(shí)時(shí)數據的采集面臨一個(gè)問(wèn)題. 我們如何生成實(shí)時(shí)數據源?因為我們可能想直接獲取實(shí)時(shí)數據流不是那么方便. 我之前寫(xiě)了一篇有關(guān)實(shí)時(shí)數據流的python生成器的文章,文章地址:
  您可以先看看如何生成實(shí)時(shí)數據...
  在想什么? ?如何開(kāi)始? ?
  分析: 我們可以從數據流開(kāi)始. 數據首先位于Web服務(wù)器中. 我們的訪(fǎng)問(wèn)日志由Nginx服務(wù)器實(shí)時(shí)采集到指定文件中. 我們從該文件采集日志數據,即: webserver => flume => kafka
  Web服務(wù)器日志存儲文件位置
  此文件的位置通常由我們自己設置
  我們的網(wǎng)絡(luò )日志的存儲目錄為:
  /home/hadoop/data/project/logs/access.log下面
  [hadoop@hadoop000 logs]$ pwd
/home/hadoop/data/project/logs
[hadoop@hadoop000 logs]$ ls
access.log
[hadoop@hadoop000 logs]$
  水槽
  做水槽實(shí)際上是寫(xiě)一個(gè)conf文件,所以面臨選擇的問(wèn)題
  來(lái)源選擇?頻道選擇?選擇接收器?
  在這里,我們選擇exec源存儲通道kafka sink
  怎么寫(xiě)?
  按照前面提到的步驟1234
  在官方網(wǎng)站上,我們可以找到應如何選擇:
  1)配置源
  exec來(lái)源
  # Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c
  2)配置頻道
  內存頻道
  a1.channels.c1.type = memory
  3)配置接收器
  卡夫卡水槽
  對于flume1.6版本,請參閱#kafka-sink
  a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1
  將以上三個(gè)組成部分組合在一起
  a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  我們的新文件稱(chēng)為test3.conf
  粘貼我們自己分析的代碼:
  [hadoop@hadoop000 conf]$ vim test3.conf
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c
a1.channels.c1.type = memory
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  我們不會(huì )從這里開(kāi)始,因為它涉及kafka的事情,我們必須首先部署kafka,
  Kafka部署
  如何部署Kafka? ?
  請訪(fǎng)問(wèn)官方網(wǎng)站,我們首先啟動(dòng)Zookeeper流程,然后才能啟動(dòng)kafka服務(wù)器
  第1步: 啟動(dòng)動(dòng)物園管理員
  [hadoop@hadoop000 ~]$
[hadoop@hadoop000 ~]$ jps
29147 Jps
[hadoop@hadoop000 ~]$ zkServer.sh start
JMX enabled by default
Using config: /home/hadoop/app/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@hadoop000 ~]$ jps
29172 QuorumPeerMain
29189 Jps
[hadoop@hadoop000 ~]$
  第2步: 啟動(dòng)服務(wù)器
  [hadoop@hadoop000 ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
#外開(kāi)一個(gè)窗口,查看jps
[hadoop@hadoop000 ~]$ jps
29330 Jps
29172 QuorumPeerMain
29229 Kafka
[hadoop@hadoop000 ~]$
  如果這部分不是很熟悉,您可以參考
  第3步: 創(chuàng )建主題
  [hadoop@hadoop000 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume_kafka
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "flume_kafka".
[hadoop@hadoop000 ~]$
  第4步: 啟動(dòng)上一個(gè)代理
   [hadoop@hadoop000 conf]$ flume-ng agent --name a1 --conf . --conf-file ./test3.conf -Dflume.root.logger=INFO,console
  第5步: 啟動(dòng)消費者
  kafka-console-consumer.sh --zookeeper hadoop000:2181 –topic flume-kafka
  執行上述第五步后,您將收到刷新屏幕的結果,哈哈哈! !
  
  上面的消費者將不斷刷新屏幕,這仍然非常有趣?。?!
  這里的使用者將接收到的數據放在屏幕上
  稍后,我們將介紹使用SparkStreaming作為使用者來(lái)實(shí)時(shí)接收數據,并對接收到的數據進(jìn)行簡(jiǎn)單的數據清理,并從隨機生成的日志中過(guò)濾出所需的數據... 查看全部

  Flume和Kafka完成實(shí)時(shí)數據采集
  寫(xiě)在前面
  在生產(chǎn)環(huán)境中,通常將水槽和Kafka結合使用. 可以同時(shí)使用它們兩者來(lái)采集實(shí)時(shí)日志信息非常重要. 如果您不懂水槽和卡夫卡,可以先檢查一下我寫(xiě)的關(guān)于這兩部分的知識. 讓我們再次學(xué)習,這部分操作也是可能的.
  實(shí)時(shí)數據的采集面臨一個(gè)問(wèn)題. 我們如何生成實(shí)時(shí)數據源?因為我們可能想直接獲取實(shí)時(shí)數據流不是那么方便. 我之前寫(xiě)了一篇有關(guān)實(shí)時(shí)數據流的python生成器的文章,文章地址:
  您可以先看看如何生成實(shí)時(shí)數據...
  在想什么? ?如何開(kāi)始? ?
  分析: 我們可以從數據流開(kāi)始. 數據首先位于Web服務(wù)器中. 我們的訪(fǎng)問(wèn)日志由Nginx服務(wù)器實(shí)時(shí)采集到指定文件中. 我們從該文件采集日志數據,即: webserver => flume => kafka
  Web服務(wù)器日志存儲文件位置
  此文件的位置通常由我們自己設置
  我們的網(wǎng)絡(luò )日志的存儲目錄為:
  /home/hadoop/data/project/logs/access.log下面
  [hadoop@hadoop000 logs]$ pwd
/home/hadoop/data/project/logs
[hadoop@hadoop000 logs]$ ls
access.log
[hadoop@hadoop000 logs]$
  水槽
  做水槽實(shí)際上是寫(xiě)一個(gè)conf文件,所以面臨選擇的問(wèn)題
  來(lái)源選擇?頻道選擇?選擇接收器?
  在這里,我們選擇exec源存儲通道kafka sink
  怎么寫(xiě)?
  按照前面提到的步驟1234
  在官方網(wǎng)站上,我們可以找到應如何選擇:
  1)配置源
  exec來(lái)源
  # Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c
  2)配置頻道
  內存頻道
  a1.channels.c1.type = memory
  3)配置接收器
  卡夫卡水槽
  對于flume1.6版本,請參閱#kafka-sink
  a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1
  將以上三個(gè)組成部分組合在一起
  a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  我們的新文件稱(chēng)為test3.conf
  粘貼我們自己分析的代碼:
  [hadoop@hadoop000 conf]$ vim test3.conf
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c
a1.channels.c1.type = memory
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  我們不會(huì )從這里開(kāi)始,因為它涉及kafka的事情,我們必須首先部署kafka,
  Kafka部署
  如何部署Kafka? ?
  請訪(fǎng)問(wèn)官方網(wǎng)站,我們首先啟動(dòng)Zookeeper流程,然后才能啟動(dòng)kafka服務(wù)器
  第1步: 啟動(dòng)動(dòng)物園管理員
  [hadoop@hadoop000 ~]$
[hadoop@hadoop000 ~]$ jps
29147 Jps
[hadoop@hadoop000 ~]$ zkServer.sh start
JMX enabled by default
Using config: /home/hadoop/app/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@hadoop000 ~]$ jps
29172 QuorumPeerMain
29189 Jps
[hadoop@hadoop000 ~]$
  第2步: 啟動(dòng)服務(wù)器
  [hadoop@hadoop000 ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
#外開(kāi)一個(gè)窗口,查看jps
[hadoop@hadoop000 ~]$ jps
29330 Jps
29172 QuorumPeerMain
29229 Kafka
[hadoop@hadoop000 ~]$
  如果這部分不是很熟悉,您可以參考
  第3步: 創(chuàng )建主題
  [hadoop@hadoop000 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume_kafka
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "flume_kafka".
[hadoop@hadoop000 ~]$
  第4步: 啟動(dòng)上一個(gè)代理
   [hadoop@hadoop000 conf]$ flume-ng agent --name a1 --conf . --conf-file ./test3.conf -Dflume.root.logger=INFO,console
  第5步: 啟動(dòng)消費者
  kafka-console-consumer.sh --zookeeper hadoop000:2181 –topic flume-kafka
  執行上述第五步后,您將收到刷新屏幕的結果,哈哈哈! !
  
  上面的消費者將不斷刷新屏幕,這仍然非常有趣?。?!
  這里的使用者將接收到的數據放在屏幕上
  稍后,我們將介紹使用SparkStreaming作為使用者來(lái)實(shí)時(shí)接收數據,并對接收到的數據進(jìn)行簡(jiǎn)單的數據清理,并從隨機生成的日志中過(guò)濾出所需的數據...

Filebeat實(shí)時(shí)采集Nginx日志

采集交流 ? 優(yōu)采云 發(fā)表了文章 ? 0 個(gè)評論 ? 634 次瀏覽 ? 2020-08-06 10:13 ? 來(lái)自相關(guān)話(huà)題

  說(shuō)明Filebeat版本為5.3.0
  之所以使用Beats系列的Filebeat代替Logstash的原因是,Logstash消耗了太多資源(請忽略那些擁有足夠服務(wù)器資源的用戶(hù))
  在官方網(wǎng)站上,Logstash的下載量為89M,而Filebeat的下載量?jì)H為840M,這很明顯
  Logstash可以配置jvm參數. 經(jīng)過(guò)我自己的調試,內存分配很小,啟動(dòng)緩慢,有時(shí)甚至根本無(wú)法啟動(dòng). 如果分配量很大,其他服務(wù)將沒(méi)有資源
  對于低配置服務(wù)器而言,選擇Filebeat是最佳選擇,并且既然Filebeat已開(kāi)始取代Logstash,仍然有必要修改nginx日志格式nginx.config
  更改日志記錄的格式
   log_format json &#39;{ "@timestamp": "$time_iso8601", &#39;
&#39;"time": "$time_iso8601", &#39;
&#39;"remote_addr": "$remote_addr", &#39;
&#39;"remote_user": "$remote_user", &#39;
&#39;"body_bytes_sent": "$body_bytes_sent", &#39;
&#39;"request_time": "$request_time", &#39;
&#39;"status": "$status", &#39;
&#39;"host": "$host", &#39;
&#39;"request": "$request", &#39;
&#39;"request_method": "$request_method", &#39;
&#39;"uri": "$uri", &#39;
&#39;"http_referrer": "$http_referer", &#39;
&#39;"body_bytes_sent":"$body_bytes_sent", &#39;
&#39;"http_x_forwarded_for": "$http_x_forwarded_for", &#39;
&#39;"http_user_agent": "$http_user_agent" &#39;
&#39;}&#39;;
access_log /var/log/nginx/access.log json;
  filebeat.yml
   #=========================== Filebeat prospectors =============================
filebeat.prospectors:
- input_type: log
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/nginx/*access*.log
json.keys_under_root: true
json.overwrite_keys: true
#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["ip:port","ip:port"]
index: "filebeat_server_nginx_%{+YYYY-MM}"
  這里要注意的是
  json.keys_under_root: 默認值為FALSE,這意味著(zhù)我們的json日志將在解析后放置在json密鑰上. 設置為T(mén)RUE,所有鍵都將放置在根節點(diǎn)中
  json.overwrite_keys: 是否覆蓋原創(chuàng )密鑰,這是密鑰配置. 將keys_under_root設置為T(mén)RUE之后,然后將overwrite_keys設置為T(mén)RUE以覆蓋filebeat默認密鑰值
  還有其他配置
  json.add_error_key: 添加json_error密鑰以記錄json解析失敗錯誤
  json.message_key: 指定解析后放置json日志的鍵,默認為json,還可以指定日志等.
  坦率地說(shuō),區別在于配置前的elasticsearch數據如下:
   {
"_index": "filebeat_server_nginx_2018-05",
"_type": "log",
"_id": "AWM9sVOkCcRcg0IPg399",
"_version": 1,
"_score": 1,
"_source": {
"@timestamp": "2018-05-08T03:00:17.544Z",
"beat": {
"hostname": "VM_252_18_centos",
"name": "VM_252_18_centos",
"version": "5.3.0"
},
"input_type": "log",
"json": {},
"message": "{ "@timestamp": "2018-05-08T11:00:11+08:00", "time": "2018-05-08T11:00:11+08:00", "remote_addr": "113.16.251.67", "remote_user": "-", "body_bytes_sent": "403", "request_time": "0.000", "status": "200", "host": "blog.joylau.cn", "request": "GET /img/%E7%BD%91%E6%98%93%E4%BA%91%E9%9F%B3%E4%B9%90.png HTTP/1.1", "request_method": "GET", "uri": "/img/\xE7\xBD\x91\xE6\x98\x93\xE4\xBA\x91\xE9\x9F\xB3\xE4\xB9\x90.png", "http_referrer": "http://blog.joylau.cn/css/style.css", "body_bytes_sent":"403", "http_x_forwarded_for": "-", "http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36" }",
"offset": 7633,
"source": "/var/log/nginx/access.log",
"type": "log"
}
}
  配置后,它看起來(lái)像這樣:
   {
"_index": "filebeat_server_nginx_2018-05",
"_type": "log",
"_id": "AWM9rjLd8mVZNgvhdnN9",
"_version": 1,
"_score": 1,
"_source": {
"@timestamp": "2018-05-08T02:56:50.000Z",
"beat": {
"hostname": "VM_252_18_centos",
"name": "VM_252_18_centos",
"version": "5.3.0"
},
"body_bytes_sent": "12576",
"host": "blog.joylau.cn",
"http_referrer": "http://blog.joylau.cn/",
"http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36",
"http_x_forwarded_for": "-",
"input_type": "log",
"offset": 3916,
"remote_addr": "60.166.12.138",
"remote_user": "-",
"request": "GET /2018/03/01/JDK8-Stream-Distinct/ HTTP/1.1",
"request_method": "GET",
"request_time": "0.000",
"source": "/var/log/nginx/access.log",
"status": "200",
"time": "2018-05-08T10:56:50+08:00",
"type": "log",
"uri": "/2018/03/01/JDK8-Stream-Distinct/index.html"
}
}
  這樣看起來(lái)很舒服
  啟動(dòng)FileBeat
  進(jìn)入Filebeat目錄
   nohup sudo ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &
  更新
  如果nginx日志收錄中文,則中文將轉換為Unicode編碼. 如果沒(méi)有,只需添加escape = json參數.
   log_format json escape=json &#39;{ "@timestamp": "$time_iso8601", &#39;
&#39;"time": "$time_iso8601", &#39;
&#39;"remote_addr": "$remote_addr", &#39;
&#39;"remote_user": "$remote_user", &#39;
&#39;"body_bytes_sent": "$body_bytes_sent", &#39;
&#39;"request_time": "$request_time", &#39;
&#39;"status": "$status", &#39;
&#39;"host": "$host", &#39;
&#39;"request": "$request", &#39;
&#39;"request_method": "$request_method", &#39;
&#39;"uri": "$uri", &#39;
&#39;"http_referrer": "$http_referer", &#39;
&#39;"body_bytes_sent":"$body_bytes_sent", &#39;
&#39;"http_x_forwarded_for": "$http_x_forwarded_for", &#39;
&#39;"http_user_agent": "$http_user_agent" &#39;
&#39;}&#39;;
access_log /var/log/nginx/access.log json;
  消息 查看全部

  說(shuō)明Filebeat版本為5.3.0
  之所以使用Beats系列的Filebeat代替Logstash的原因是,Logstash消耗了太多資源(請忽略那些擁有足夠服務(wù)器資源的用戶(hù))
  在官方網(wǎng)站上,Logstash的下載量為89M,而Filebeat的下載量?jì)H為840M,這很明顯
  Logstash可以配置jvm參數. 經(jīng)過(guò)我自己的調試,內存分配很小,啟動(dòng)緩慢,有時(shí)甚至根本無(wú)法啟動(dòng). 如果分配量很大,其他服務(wù)將沒(méi)有資源
  對于低配置服務(wù)器而言,選擇Filebeat是最佳選擇,并且既然Filebeat已開(kāi)始取代Logstash,仍然有必要修改nginx日志格式nginx.config
  更改日志記錄的格式
   log_format json &#39;{ "@timestamp": "$time_iso8601", &#39;
&#39;"time": "$time_iso8601", &#39;
&#39;"remote_addr": "$remote_addr", &#39;
&#39;"remote_user": "$remote_user", &#39;
&#39;"body_bytes_sent": "$body_bytes_sent", &#39;
&#39;"request_time": "$request_time", &#39;
&#39;"status": "$status", &#39;
&#39;"host": "$host", &#39;
&#39;"request": "$request", &#39;
&#39;"request_method": "$request_method", &#39;
&#39;"uri": "$uri", &#39;
&#39;"http_referrer": "$http_referer", &#39;
&#39;"body_bytes_sent":"$body_bytes_sent", &#39;
&#39;"http_x_forwarded_for": "$http_x_forwarded_for", &#39;
&#39;"http_user_agent": "$http_user_agent" &#39;
&#39;}&#39;;
access_log /var/log/nginx/access.log json;
  filebeat.yml
   #=========================== Filebeat prospectors =============================
filebeat.prospectors:
- input_type: log
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/nginx/*access*.log
json.keys_under_root: true
json.overwrite_keys: true
#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["ip:port","ip:port"]
index: "filebeat_server_nginx_%{+YYYY-MM}"
  這里要注意的是
  json.keys_under_root: 默認值為FALSE,這意味著(zhù)我們的json日志將在解析后放置在json密鑰上. 設置為T(mén)RUE,所有鍵都將放置在根節點(diǎn)中
  json.overwrite_keys: 是否覆蓋原創(chuàng )密鑰,這是密鑰配置. 將keys_under_root設置為T(mén)RUE之后,然后將overwrite_keys設置為T(mén)RUE以覆蓋filebeat默認密鑰值
  還有其他配置
  json.add_error_key: 添加json_error密鑰以記錄json解析失敗錯誤
  json.message_key: 指定解析后放置json日志的鍵,默認為json,還可以指定日志等.
  坦率地說(shuō),區別在于配置前的elasticsearch數據如下:
   {
"_index": "filebeat_server_nginx_2018-05",
"_type": "log",
"_id": "AWM9sVOkCcRcg0IPg399",
"_version": 1,
"_score": 1,
"_source": {
"@timestamp": "2018-05-08T03:00:17.544Z",
"beat": {
"hostname": "VM_252_18_centos",
"name": "VM_252_18_centos",
"version": "5.3.0"
},
"input_type": "log",
"json": {},
"message": "{ "@timestamp": "2018-05-08T11:00:11+08:00", "time": "2018-05-08T11:00:11+08:00", "remote_addr": "113.16.251.67", "remote_user": "-", "body_bytes_sent": "403", "request_time": "0.000", "status": "200", "host": "blog.joylau.cn", "request": "GET /img/%E7%BD%91%E6%98%93%E4%BA%91%E9%9F%B3%E4%B9%90.png HTTP/1.1", "request_method": "GET", "uri": "/img/\xE7\xBD\x91\xE6\x98\x93\xE4\xBA\x91\xE9\x9F\xB3\xE4\xB9\x90.png", "http_referrer": "http://blog.joylau.cn/css/style.css", "body_bytes_sent":"403", "http_x_forwarded_for": "-", "http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36" }",
"offset": 7633,
"source": "/var/log/nginx/access.log",
"type": "log"
}
}
  配置后,它看起來(lái)像這樣:
   {
"_index": "filebeat_server_nginx_2018-05",
"_type": "log",
"_id": "AWM9rjLd8mVZNgvhdnN9",
"_version": 1,
"_score": 1,
"_source": {
"@timestamp": "2018-05-08T02:56:50.000Z",
"beat": {
"hostname": "VM_252_18_centos",
"name": "VM_252_18_centos",
"version": "5.3.0"
},
"body_bytes_sent": "12576",
"host": "blog.joylau.cn",
"http_referrer": "http://blog.joylau.cn/",
"http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36",
"http_x_forwarded_for": "-",
"input_type": "log",
"offset": 3916,
"remote_addr": "60.166.12.138",
"remote_user": "-",
"request": "GET /2018/03/01/JDK8-Stream-Distinct/ HTTP/1.1",
"request_method": "GET",
"request_time": "0.000",
"source": "/var/log/nginx/access.log",
"status": "200",
"time": "2018-05-08T10:56:50+08:00",
"type": "log",
"uri": "/2018/03/01/JDK8-Stream-Distinct/index.html"
}
}
  這樣看起來(lái)很舒服
  啟動(dòng)FileBeat
  進(jìn)入Filebeat目錄
   nohup sudo ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &
  更新
  如果nginx日志收錄中文,則中文將轉換為Unicode編碼. 如果沒(méi)有,只需添加escape = json參數.
   log_format json escape=json &#39;{ "@timestamp": "$time_iso8601", &#39;
&#39;"time": "$time_iso8601", &#39;
&#39;"remote_addr": "$remote_addr", &#39;
&#39;"remote_user": "$remote_user", &#39;
&#39;"body_bytes_sent": "$body_bytes_sent", &#39;
&#39;"request_time": "$request_time", &#39;
&#39;"status": "$status", &#39;
&#39;"host": "$host", &#39;
&#39;"request": "$request", &#39;
&#39;"request_method": "$request_method", &#39;
&#39;"uri": "$uri", &#39;
&#39;"http_referrer": "$http_referer", &#39;
&#39;"body_bytes_sent":"$body_bytes_sent", &#39;
&#39;"http_x_forwarded_for": "$http_x_forwarded_for", &#39;
&#39;"http_user_agent": "$http_user_agent" &#39;
&#39;}&#39;;
access_log /var/log/nginx/access.log json;
  消息

Openresty + Lua + Kafka實(shí)現實(shí)時(shí)日志采集

采集交流 ? 優(yōu)采云 發(fā)表了文章 ? 0 個(gè)評論 ? 575 次瀏覽 ? 2020-08-06 09:06 ? 來(lái)自相關(guān)話(huà)題

  簡(jiǎn)介
  在許多數據采集方案中,Flume是高性能的日志采集工具. 我相信每個(gè)人都知道. 許多人認為Flume是一個(gè)組件,可以將它們中的大多數與Flume和Kafka的組合相關(guān)聯(lián)以進(jìn)行日志采集. 該解決方案具有許多優(yōu)勢,例如高性能,高吞吐量和數據可靠性. 但是,如果我們需要實(shí)時(shí)采集日志,顯然這不是一個(gè)好的解決方案. 原因如下:
  目前,Flume可以支持對目錄中數據文件的實(shí)時(shí)監視. 某個(gè)目錄的文件采集完成后,將使用完成的符號進(jìn)行標記. 如果以后有數據輸入此文件,則不會(huì )檢測到Flume.
  因此,我們通常使用這種方案進(jìn)行計時(shí)采集. 只要生成新的數據目錄,我們就會(huì )將數據文件采集到該目錄中.
  然后,本文將向您介紹基于Openresty + Lua + Kafka的實(shí)時(shí)日志采集.
  要求
  很多時(shí)候,我們需要實(shí)時(shí)采集用戶(hù)的掩埋點(diǎn)數據,然后使用這些數據對用戶(hù)的行為進(jìn)行一些實(shí)時(shí)分析. 因此,第一步當然是解決如何實(shí)時(shí)采集數據.
  我們在這里使用的解決方案是Openresty + Lua + Kafka.
  原理介紹
  那么什么是Openresty?這是官方報價(jià):
  OpenResty是基于Nginx和Lua的高性能Web平臺. 它集成了許多復雜的Lua庫,第三方模塊及其大多數依賴(lài)項. 它用于方便地構建可處理超高并發(fā)性和高可伸縮性的動(dòng)態(tài)Web應用程序,Web服務(wù)和動(dòng)態(tài)網(wǎng)關(guān).
  OpenResty通過(guò)融合各種精心設計的Nginx模塊,有效地將Nginx變成了功能強大的通用Web應用程序平臺. 這樣,Web開(kāi)發(fā)人員和系統工程師可以使用Lu腳本語(yǔ)言來(lái)調動(dòng)Nginx支持的各種C和Lua模塊,并快速構建一個(gè)具有10K甚至1000個(gè)以上的單機并發(fā)連接的高性能Web應用程序系統.
  OpenResty的目標是使您的Web服務(wù)直接在Nginx服務(wù)內部運行,充分利用Nginx的非阻塞I / O模型,不僅用于HTTP客戶(hù)端請求,甚至用于遠程后端(例如MySQL,PostgreSQL) ,Memcached和Redis等具有一致的高性能響應.
  簡(jiǎn)單來(lái)說(shuō),就是通過(guò)Nginx發(fā)送客戶(hù)端的請求(本文指的是用戶(hù)的行為日志),以將用戶(hù)的數據傳遞到我們指定的位置(卡夫卡),為了達到這一要求,我們使用Lua腳本,因為Openresty封裝了各種Lua模塊,其中之一是子安裝Kafka模塊,所以我們只需要編寫(xiě)一個(gè)簡(jiǎn)單的腳本即可通過(guò)Nginx將用戶(hù)數據轉發(fā)到Kafka,以便隨后使用數據.
  以下是供大家理解的體系結構圖:
  
  以下是使用Openresty + Lua + Kafka的優(yōu)點(diǎn)的簡(jiǎn)要摘要:
  1. 支持多種業(yè)務(wù)數據,不同的業(yè)務(wù)數據,只需要配置不同的Lua腳本,就可以將不同的業(yè)務(wù)數據發(fā)送到Kafka中的不同主題.
  2. 實(shí)時(shí)采集用戶(hù)觸發(fā)的埋藏點(diǎn)數據
  3. 高度可靠的集群. 由于Openresty基于Nginx,因此其群集具有非常高的性能和穩定性.
  4. 高并發(fā). 與tomcat,apache和其他Web服務(wù)器相比,Nginx的并發(fā)性比其他兩個(gè)要高得多. 在正常情況下處理數萬(wàn)個(gè)并發(fā)并不難.
  接下來(lái)讓我們做一些實(shí)際的工作.
  安裝Openresty
  此示例使用獨立部署表單. 成功完成獨立部署后,將在獨立計算機上構建集群,只是在不同的計算機上執行相同的步驟.
  注意: 本實(shí)驗基于centos7.0操作系統
  1. 下載Openresty依賴(lài)項:
  yum install readline-devel pcre-devel openssl-devel gcc
  2. 編譯并安裝Openresty:
  #1.安裝openresty:
mkdir /opt/software
mkdir /opt/module
cd /opt/software/ # 安裝文件所在目錄
wget https://openresty.org/download ... ar.gz
tar -xzf openresty-1.9.7.4.tar.gz -C /opt/module/
cd /opt/module/openresty-1.9.7.4
#2.配置:
# 指定目錄為/opt/openresty,默認在/usr/local。
./configure --prefix=/opt/openresty \
--with-luajit \
--without-http_redis2_module \
--with-http_iconv_module
make
make install
  3. 安裝lua-resty-kafka
  因為我們需要通過(guò)nginx + lua腳本將數據轉發(fā)到Kafka,所以在編寫(xiě)lua腳本時(shí)需要在lua模塊中使用一些Kafka依賴(lài)項.
  #下載lua-resty-kafka:
cd /opt/software/
wget https://github.com/doujiang24/ ... r.zip
unzip master.zip -d /opt/module/

#拷貝kafka相關(guān)依賴(lài)腳本到openresty
cp -rf /opt/module/lua-resty-kafka-master/lib/resty/kafka/ /opt/openresty/lualib/resty/
  注意: 由于每個(gè)人都熟悉Kafka,因此這里不會(huì )介紹其安裝.
  安裝Openresty之后,目錄結構如下:
  drwxr-xr-x 2 root root 4096 Mar 24 14:26 bin
drwxr-xr-x 6 root root 4096 Mar 24 14:26 luajit
drwxr-xr-x 7 root root 4096 Mar 24 14:29 lualib
drwxr-xr-x 12 root root 4096 Mar 24 14:40 nginx
  4. 配置文件
  編輯/opt/openresty/nginx/conf/nginx.conf
  user nginx; #Linux的用戶(hù)
worker_processes auto;
worker_rlimit_nofile 100000;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 102400;
multi_accept on;
use epoll;
}
http {
include mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
resolver 8.8.8.8;
#resolver 127.0.0.1 valid=3600s;
sendfile on;
keepalive_timeout 65;
underscores_in_headers on;
gzip on;
include /opt/openresty/nginx/conf/conf.d/common.conf; #common.conf這個(gè)文件名字可自定義
}
  編輯/opt/openresty/nginx/conf/conf.d/common.conf
  ##api
lua_package_path "/opt/openresty/lualib/resty/kafka/?.lua;;";
lua_package_cpath "/opt/openresty/lualib/?.so;;";
lua_shared_dict ngx_cache 128m; # cache
lua_shared_dict cache_lock 100k; # lock for cache
server {
listen 8887; #監聽(tīng)端口
server_name 192.168.3.215; #埋點(diǎn)日志的ip地址或域名,多個(gè)域名之間用空格分開(kāi)
root html; #root指令用于指定虛擬主機的網(wǎng)頁(yè)根目錄,這個(gè)目錄可以是相對路徑,也可以是絕對路徑。
lua_need_request_body on; #打開(kāi)獲取消息體的開(kāi)關(guān),以便能獲取到消息體
access_log /var/log/nginx/message.access.log main;
error_log /var/log/nginx/message.error.log notice;
location = /lzp/message {
lua_code_cache on;
charset utf-8;
default_type 'application/json';
content_by_lua_file "/opt/openresty/nginx/lua/testMessage_kafka.lua";#引用的lua腳本
}
}
  編輯/opt/openresty/nginx/lua/testMessage_kafka.lua
  #創(chuàng )建目錄mkdir /opt/openresty/nginx/lua/
vim /opt/openresty/nginx/lua/testMessage_kafka.lua<br />#編輯內存如下:
  -- require需要resty.kafka.producer的lua腳本,沒(méi)有會(huì )報錯
local producer = require("resty.kafka.producer")
-- kafka的集群信息,單機也是可以的
local broker_list = {
{host = "192.168.3.215", port = 9092},
}
-- 定義最終kafka接受到的數據是怎樣的json格式
local log_json = {}
--增加read_body之后即可獲取到消息體,默認情況下可能會(huì )是nil
log_json["body"] = ngx.req.read_body()
log_json["body_data"] = ngx.req.get_body_data()
-- 定義kafka同步生產(chǎn)者,也可設置為異步 async
-- -- 注意?。?!當設置為異步時(shí),在測試環(huán)境需要修改batch_num,默認是200條,若大不到200條kafka端接受不到消息
-- -- encode()將log_json日志轉換為字符串
-- -- 發(fā)送日志消息,send配套之第一個(gè)參數topic:
-- -- 發(fā)送日志消息,send配套之第二個(gè)參數key,用于kafka路由控制:
-- -- key為nill(空)時(shí),一段時(shí)間向同一partition寫(xiě)入數據
-- -- 指定key,按照key的hash寫(xiě)入到對應的partition
-- -- batch_num修改為1方便測試
local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 })
-- local bp = producer:new(broker_list)
local cjson = require("cjson.safe")
local sendMsg = cjson.encode(log_json)
local ok, err = bp:send("testMessage",nil, sendMsg)
if not ok then
ngx.log(ngx.ERR, 'kafka send err:', err)
elseif ok then
ngx.say("the message send successful")
else
ngx.say("未知錯誤")
end
  5. 開(kāi)始服務(wù)操作:
  useradd nginx #創(chuàng )建用戶(hù)
passwd nginx #設置密碼
#設置openresty的所有者nginx
chown -R nginx:nginx /opt/openresty/
#啟動(dòng)服務(wù)
cd /opt/openresty/nginx/sbin
./nginx -c /opt/openresty/nginx/conf/nginx.conf
查看服務(wù):
ps -aux | grep nginx
nginx 2351 0.0 0.1 231052 46444 ? S Mar30 0:33 nginx: worker process
nginx 2352 0.0 0.1 233396 48540 ? S Mar30 0:35 nginx: worker process
nginx 2353 0.0 0.1 233396 48536 ? S Mar30 0:33 nginx: worker process
nginx 2354 0.0 0.1 232224 47464 ? S Mar30 0:34 nginx: worker process
nginx 2355 0.0 0.1 231052 46404 ? S Mar30 0:33 nginx: worker process
nginx 2356 0.0 0.1 232224 47460 ? S Mar30 0:34 nginx: worker process
nginx 2357 0.0 0.1 231052 46404 ? S Mar30 0:34 nginx: worker process
nginx 2358 0.0 0.1 232224 47484 ? S Mar30 0:34 nginx: worker process
root 7009 0.0 0.0 185492 2516 ? Ss Mar24 0:00 nginx: master process ./nginx -c /opt/openresty/nginx/conf/nginx.conf
查看端口:
netstat -anput | grep 8887
tcp 0 0 0.0.0.0:8887 0.0.0.0:* LISTEN 2351/nginx: worke
  看到上述過(guò)程,就可以證明服務(wù)正常運行
  6. 使用郵遞員發(fā)送發(fā)帖請求以進(jìn)行簡(jiǎn)單測試,以查看Kafka是否可以接受數據
  
  7.kafka消費數據:
  kafka-console-consumer --bootstrap-server 192.168.3.215:9092 --topic testMessage --from-beginning
<p>如果消耗了數據,則說(shuō)明配置成功. 如果未調整,則可以檢查與/var/log/nginx/message.access.log和/var/log/nginx/message.error.log相關(guān)的錯誤日志以進(jìn)行調整 查看全部

  簡(jiǎn)介
  在許多數據采集方案中,Flume是高性能的日志采集工具. 我相信每個(gè)人都知道. 許多人認為Flume是一個(gè)組件,可以將它們中的大多數與Flume和Kafka的組合相關(guān)聯(lián)以進(jìn)行日志采集. 該解決方案具有許多優(yōu)勢,例如高性能,高吞吐量和數據可靠性. 但是,如果我們需要實(shí)時(shí)采集日志,顯然這不是一個(gè)好的解決方案. 原因如下:
  目前,Flume可以支持對目錄中數據文件的實(shí)時(shí)監視. 某個(gè)目錄的文件采集完成后,將使用完成的符號進(jìn)行標記. 如果以后有數據輸入此文件,則不會(huì )檢測到Flume.
  因此,我們通常使用這種方案進(jìn)行計時(shí)采集. 只要生成新的數據目錄,我們就會(huì )將數據文件采集到該目錄中.
  然后,本文將向您介紹基于Openresty + Lua + Kafka的實(shí)時(shí)日志采集.
  要求
  很多時(shí)候,我們需要實(shí)時(shí)采集用戶(hù)的掩埋點(diǎn)數據,然后使用這些數據對用戶(hù)的行為進(jìn)行一些實(shí)時(shí)分析. 因此,第一步當然是解決如何實(shí)時(shí)采集數據.
  我們在這里使用的解決方案是Openresty + Lua + Kafka.
  原理介紹
  那么什么是Openresty?這是官方報價(jià):
  OpenResty是基于Nginx和Lua的高性能Web平臺. 它集成了許多復雜的Lua庫,第三方模塊及其大多數依賴(lài)項. 它用于方便地構建可處理超高并發(fā)性和高可伸縮性的動(dòng)態(tài)Web應用程序,Web服務(wù)和動(dòng)態(tài)網(wǎng)關(guān).
  OpenResty通過(guò)融合各種精心設計的Nginx模塊,有效地將Nginx變成了功能強大的通用Web應用程序平臺. 這樣,Web開(kāi)發(fā)人員和系統工程師可以使用Lu腳本語(yǔ)言來(lái)調動(dòng)Nginx支持的各種C和Lua模塊,并快速構建一個(gè)具有10K甚至1000個(gè)以上的單機并發(fā)連接的高性能Web應用程序系統.
  OpenResty的目標是使您的Web服務(wù)直接在Nginx服務(wù)內部運行,充分利用Nginx的非阻塞I / O模型,不僅用于HTTP客戶(hù)端請求,甚至用于遠程后端(例如MySQL,PostgreSQL) ,Memcached和Redis等具有一致的高性能響應.
  簡(jiǎn)單來(lái)說(shuō),就是通過(guò)Nginx發(fā)送客戶(hù)端的請求(本文指的是用戶(hù)的行為日志),以將用戶(hù)的數據傳遞到我們指定的位置(卡夫卡),為了達到這一要求,我們使用Lua腳本,因為Openresty封裝了各種Lua模塊,其中之一是子安裝Kafka模塊,所以我們只需要編寫(xiě)一個(gè)簡(jiǎn)單的腳本即可通過(guò)Nginx將用戶(hù)數據轉發(fā)到Kafka,以便隨后使用數據.
  以下是供大家理解的體系結構圖:
  
  以下是使用Openresty + Lua + Kafka的優(yōu)點(diǎn)的簡(jiǎn)要摘要:
  1. 支持多種業(yè)務(wù)數據,不同的業(yè)務(wù)數據,只需要配置不同的Lua腳本,就可以將不同的業(yè)務(wù)數據發(fā)送到Kafka中的不同主題.
  2. 實(shí)時(shí)采集用戶(hù)觸發(fā)的埋藏點(diǎn)數據
  3. 高度可靠的集群. 由于Openresty基于Nginx,因此其群集具有非常高的性能和穩定性.
  4. 高并發(fā). 與tomcat,apache和其他Web服務(wù)器相比,Nginx的并發(fā)性比其他兩個(gè)要高得多. 在正常情況下處理數萬(wàn)個(gè)并發(fā)并不難.
  接下來(lái)讓我們做一些實(shí)際的工作.
  安裝Openresty
  此示例使用獨立部署表單. 成功完成獨立部署后,將在獨立計算機上構建集群,只是在不同的計算機上執行相同的步驟.
  注意: 本實(shí)驗基于centos7.0操作系統
  1. 下載Openresty依賴(lài)項:
  yum install readline-devel pcre-devel openssl-devel gcc
  2. 編譯并安裝Openresty:
  #1.安裝openresty:
mkdir /opt/software
mkdir /opt/module
cd /opt/software/ # 安裝文件所在目錄
wget https://openresty.org/download ... ar.gz
tar -xzf openresty-1.9.7.4.tar.gz -C /opt/module/
cd /opt/module/openresty-1.9.7.4
#2.配置:
# 指定目錄為/opt/openresty,默認在/usr/local。
./configure --prefix=/opt/openresty \
--with-luajit \
--without-http_redis2_module \
--with-http_iconv_module
make
make install
  3. 安裝lua-resty-kafka
  因為我們需要通過(guò)nginx + lua腳本將數據轉發(fā)到Kafka,所以在編寫(xiě)lua腳本時(shí)需要在lua模塊中使用一些Kafka依賴(lài)項.
  #下載lua-resty-kafka:
cd /opt/software/
wget https://github.com/doujiang24/ ... r.zip
unzip master.zip -d /opt/module/

#拷貝kafka相關(guān)依賴(lài)腳本到openresty
cp -rf /opt/module/lua-resty-kafka-master/lib/resty/kafka/ /opt/openresty/lualib/resty/
  注意: 由于每個(gè)人都熟悉Kafka,因此這里不會(huì )介紹其安裝.
  安裝Openresty之后,目錄結構如下:
  drwxr-xr-x 2 root root 4096 Mar 24 14:26 bin
drwxr-xr-x 6 root root 4096 Mar 24 14:26 luajit
drwxr-xr-x 7 root root 4096 Mar 24 14:29 lualib
drwxr-xr-x 12 root root 4096 Mar 24 14:40 nginx
  4. 配置文件
  編輯/opt/openresty/nginx/conf/nginx.conf
  user nginx; #Linux的用戶(hù)
worker_processes auto;
worker_rlimit_nofile 100000;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 102400;
multi_accept on;
use epoll;
}
http {
include mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
resolver 8.8.8.8;
#resolver 127.0.0.1 valid=3600s;
sendfile on;
keepalive_timeout 65;
underscores_in_headers on;
gzip on;
include /opt/openresty/nginx/conf/conf.d/common.conf; #common.conf這個(gè)文件名字可自定義
}
  編輯/opt/openresty/nginx/conf/conf.d/common.conf
  ##api
lua_package_path "/opt/openresty/lualib/resty/kafka/?.lua;;";
lua_package_cpath "/opt/openresty/lualib/?.so;;";
lua_shared_dict ngx_cache 128m; # cache
lua_shared_dict cache_lock 100k; # lock for cache
server {
listen 8887; #監聽(tīng)端口
server_name 192.168.3.215; #埋點(diǎn)日志的ip地址或域名,多個(gè)域名之間用空格分開(kāi)
root html; #root指令用于指定虛擬主機的網(wǎng)頁(yè)根目錄,這個(gè)目錄可以是相對路徑,也可以是絕對路徑。
lua_need_request_body on; #打開(kāi)獲取消息體的開(kāi)關(guān),以便能獲取到消息體
access_log /var/log/nginx/message.access.log main;
error_log /var/log/nginx/message.error.log notice;
location = /lzp/message {
lua_code_cache on;
charset utf-8;
default_type 'application/json';
content_by_lua_file "/opt/openresty/nginx/lua/testMessage_kafka.lua";#引用的lua腳本
}
}
  編輯/opt/openresty/nginx/lua/testMessage_kafka.lua
  #創(chuàng )建目錄mkdir /opt/openresty/nginx/lua/
vim /opt/openresty/nginx/lua/testMessage_kafka.lua<br />#編輯內存如下:
  -- require需要resty.kafka.producer的lua腳本,沒(méi)有會(huì )報錯
local producer = require("resty.kafka.producer")
-- kafka的集群信息,單機也是可以的
local broker_list = {
{host = "192.168.3.215", port = 9092},
}
-- 定義最終kafka接受到的數據是怎樣的json格式
local log_json = {}
--增加read_body之后即可獲取到消息體,默認情況下可能會(huì )是nil
log_json["body"] = ngx.req.read_body()
log_json["body_data"] = ngx.req.get_body_data()
-- 定義kafka同步生產(chǎn)者,也可設置為異步 async
-- -- 注意?。?!當設置為異步時(shí),在測試環(huán)境需要修改batch_num,默認是200條,若大不到200條kafka端接受不到消息
-- -- encode()將log_json日志轉換為字符串
-- -- 發(fā)送日志消息,send配套之第一個(gè)參數topic:
-- -- 發(fā)送日志消息,send配套之第二個(gè)參數key,用于kafka路由控制:
-- -- key為nill(空)時(shí),一段時(shí)間向同一partition寫(xiě)入數據
-- -- 指定key,按照key的hash寫(xiě)入到對應的partition
-- -- batch_num修改為1方便測試
local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 })
-- local bp = producer:new(broker_list)
local cjson = require("cjson.safe")
local sendMsg = cjson.encode(log_json)
local ok, err = bp:send("testMessage",nil, sendMsg)
if not ok then
ngx.log(ngx.ERR, 'kafka send err:', err)
elseif ok then
ngx.say("the message send successful")
else
ngx.say("未知錯誤")
end
  5. 開(kāi)始服務(wù)操作:
  useradd nginx #創(chuàng )建用戶(hù)
passwd nginx #設置密碼
#設置openresty的所有者nginx
chown -R nginx:nginx /opt/openresty/
#啟動(dòng)服務(wù)
cd /opt/openresty/nginx/sbin
./nginx -c /opt/openresty/nginx/conf/nginx.conf
查看服務(wù):
ps -aux | grep nginx
nginx 2351 0.0 0.1 231052 46444 ? S Mar30 0:33 nginx: worker process
nginx 2352 0.0 0.1 233396 48540 ? S Mar30 0:35 nginx: worker process
nginx 2353 0.0 0.1 233396 48536 ? S Mar30 0:33 nginx: worker process
nginx 2354 0.0 0.1 232224 47464 ? S Mar30 0:34 nginx: worker process
nginx 2355 0.0 0.1 231052 46404 ? S Mar30 0:33 nginx: worker process
nginx 2356 0.0 0.1 232224 47460 ? S Mar30 0:34 nginx: worker process
nginx 2357 0.0 0.1 231052 46404 ? S Mar30 0:34 nginx: worker process
nginx 2358 0.0 0.1 232224 47484 ? S Mar30 0:34 nginx: worker process
root 7009 0.0 0.0 185492 2516 ? Ss Mar24 0:00 nginx: master process ./nginx -c /opt/openresty/nginx/conf/nginx.conf
查看端口:
netstat -anput | grep 8887
tcp 0 0 0.0.0.0:8887 0.0.0.0:* LISTEN 2351/nginx: worke
  看到上述過(guò)程,就可以證明服務(wù)正常運行
  6. 使用郵遞員發(fā)送發(fā)帖請求以進(jìn)行簡(jiǎn)單測試,以查看Kafka是否可以接受數據
  
  7.kafka消費數據:
  kafka-console-consumer --bootstrap-server 192.168.3.215:9092 --topic testMessage --from-beginning
<p>如果消耗了數據,則說(shuō)明配置成功. 如果未調整,則可以檢查與/var/log/nginx/message.access.log和/var/log/nginx/message.error.log相關(guān)的錯誤日志以進(jìn)行調整

官方客服QQ群

微信人工客服

QQ人工客服


線(xiàn)

最近中文字幕2019高清,亚洲人成高清在线播放,男生淦哭男生图片动漫有字,国产亚洲精品九九久在线观看,无码av专区丝袜专区