在实际应用中,Logstash进程会被氛围两个不同的角色。运行在应用服务器上的尽量减轻运行压力,只做读取和转发,这个角色叫做shipper运行在独立的服务器上完成数据解析处理,负责写入到Elasticsearch的角色,叫做Indexer5.1.1 读取redis 数据:zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f redis.comf Settings: Default pipeline workers: 1Pipeline main started{ "message" => "Hello world", "tags" => [ [0] "_jsonparsefailure" ], "@version" => "1", "@timestamp" => "2016-08-19T06:26:12.854Z"}zjtest7-frontend:/usr/local/logstash-2.3.4/config# cat redis.comf input { redis { data_type =>"pattern_channel" key =>"logstash-*" host=>"192.168.32.67" port=>6379 password => "1234567" }}output { stdout { codec=>rubydebug{} }}采用list类型扩展Logstash:127.0.0.1:6379> PUBLISH logstash-list "Hello xxxxxx"(integer) 2127.0.0.1:6379> PUBLISH logstash-list "Hello yyyyy"zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f redis.comf Settings: Default pipeline workers: 1Pipeline main started{ "message" => "Hello xxxxxx", "tags" => [ [0] "_jsonparsefailure" ], "@version" => "1", "@timestamp" => "2016-08-19T07:46:27.031Z"}{ "message" => "Hello yyyyy", "tags" => [ [0] "_jsonparsefailure" ], "@version" => "1", "@timestamp" => "2016-08-19T07:46:37.365Z"zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f redis.comf Settings: Default pipeline workers: 1Pipeline main started{ "message" => "Hello xxxxxx", "tags" => [ [0] "_jsonparsefailure" ], "@version" => "1", "@timestamp" => "2016-08-19T07:46:26.964Z"}{ "message" => "Hello yyyyy", "tags" => [ [0] "_jsonparsefailure" ], "@version" => "1", "@timestamp" => "2016-08-19T07:46:37.362Z"}zjtest7-frontend:/usr/local/logstash-2.3.4/config# cat redis.comf input { redis { data_type =>"pattern_channel" key =>"logstash-list" host=>"192.168.32.67" port=>6379 password => "1234567" }}output { stdout { codec=>rubydebug{} }}两个终端同时启动logstash -f redis.conf 进程,结果会是两个终端都输出消息。这个时候,就需要用list 类型,在这种类型中,数据输入到Redis 服务器上暂存,Logstash 则连上Redis 服务器取走(BLPOP命令,所以只要logstash不堵塞,redis 服务器上也不会有数据堆积占用空间)数据。1.配置示例:zjtest7-frontend:/usr/local/logstash-2.3.4/config# cat redis.comf input { redis { data_type =>"list" key =>"logstash-list" host=>"192.168.32.67" port=>6379 password => "1234567" }}output { stdout { codec=>rubydebug{} }}这时候可以看到, 只有一个终端输出了结果连续RPUSH几次, 可以看到两个终端近乎各自输出一半条目。3.批量推送:RPUSH 支持batch 方式,修改Logstash 配置中的batch_count值,5.1.3 输出到Redis:1.配置示例zjtest7-frontend:/usr/local/logstash-2.3.4/config# cat inputredis.conf input {stdin {} } output { redis { data_type =>"channel" key=>"logstash-chan-%{+yyyy.MM.dd}" host=>"192.168.32.67" port=>6379 password => "1234567" }}127.0.0.1:6379> SUBSCRIBE logstash-chan-2016.08.19Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "logstash-chan-2016.08.19"3) (integer) 11) "message"2) "logstash-chan-2016.08.19"3) "{\"message\":\"\",\"@version\":\"1\",\"@timestamp\":\"2016-08-19T08:27:07.190Z\",\"host\":\"0.0.0.0\"}"1) "message"2) "logstash-chan-2016.08.19"3) "{\"message\":\"\",\"@version\":\"1\",\"@timestamp\":\"2016-08-19T08:27:07.736Z\",\"host\":\"0.0.0.0\"}"1) "message"2) "logstash-chan-2016.08.19"3) "{\"message\":\"\",\"@version\":\"1\",\"@timestamp\":\"2016-08-19T08:27:07.772Z\",\"host\":\"0.0.0.0\"}"1) "message"2) "logstash-chan-2016.08.19"3) "{\"message\":\"\",\"@version\":\"1\",\"@timestamp\":\"2016-08-19T08:27:07.808Z\",\"host\":\"0.0.0.0\"}"1) "message"2) "logstash-chan-2016.08.19"3) "{\"message\":\"\",\"@version\":\"1\",\"@timestamp\":\"2016-08-19T08:27:07.844Z\",\"host\":\"0.0.0.0\"}"1) "message"2) "logstash-chan-2016.08.19"3) "{\"message\":\"\",\"@version\":\"1\",\"@timestamp\":\"2016-08-19T08:27:07.880Z\",\"host\":\"0.0.0.0\"}"1) "message"2) "logstash-chan-2016.08.19"3) "{\"message\":\"\",\"@version\":\"1\",\"@timestamp\":\"2016-08-19T08:27:07.916Z\",\"host\":\"0.0.0.0\"}"1) "message"2) "logstash-chan-2016.08.19"3) "{\"message\":\"hello world\",\"@version\":\"1\",\"@timestamp\":\"2016-08-19T08:27:10.486Z\",\"host\":\"0.0.0.0\"}"1) "message"2) "logstash-chan-2016.08.19"3) "{\"message\":\"what'sup scab\",\"@version\":\"1\",\"@timestamp\":\"2016-08-19T08:27:37.919Z\",\"host\":\"0.0.0.0\"}"