fluent-plugin-kafka 0.18.1 → 0.19.0
This diff represents the content of publicly available package versions that have been released to one of the supported registries. The information contained in this diff is provided for informational purposes only and reflects changes between package versions as they appear in their respective public registries.
- checksums.yaml +4 -4
- data/.github/ISSUE_TEMPLATE/bug_report.yaml +1 -0
- data/.github/ISSUE_TEMPLATE/feature_request.yaml +1 -0
- data/.github/dependabot.yml +6 -0
- data/.github/workflows/linux.yml +8 -3
- data/.github/workflows/stale-actions.yml +5 -3
- data/ChangeLog +6 -0
- data/Gemfile +1 -1
- data/README.md +26 -2
- data/examples/README.md +3 -0
- data/examples/out_kafka2/dynamic_topic_based_on_tag.conf +32 -0
- data/examples/out_kafka2/protobuf-formatter.conf +23 -0
- data/examples/out_kafka2/record_key.conf +31 -0
- data/fluent-plugin-kafka.gemspec +2 -2
- data/lib/fluent/plugin/in_kafka.rb +3 -0
- data/lib/fluent/plugin/in_kafka_group.rb +3 -0
- data/lib/fluent/plugin/kafka_plugin_util.rb +15 -0
- data/lib/fluent/plugin/out_kafka2.rb +38 -15
- data/lib/fluent/plugin/out_rdkafka2.rb +28 -1
- metadata +9 -4
checksums.yaml
CHANGED
@@ -1,7 +1,7 @@
|
|
1
1
|
---
|
2
2
|
SHA256:
|
3
|
-
metadata.gz:
|
4
|
-
data.tar.gz:
|
3
|
+
metadata.gz: b4a8c37b041fedc3f95046620413e5f7da437557fc19294439585fa89fd5b244
|
4
|
+
data.tar.gz: fe4c8cc8df6b8b5b105fbf709044e10752d8b64f321b5a4e290cb7e316e10fe1
|
5
5
|
SHA512:
|
6
|
-
metadata.gz:
|
7
|
-
data.tar.gz:
|
6
|
+
metadata.gz: 2ff333ee092e0ffd653ab476acf8b2656b4ca59ea32d6dcc846eb8a174f69d98811272e8a735bfa3bdfac3d5b3753ce499adb5c8a215b3197310b1f4d822364e
|
7
|
+
data.tar.gz: ebf6cafbde9635cfc886ee4dce84495353b2d389e447e6188f6a88eb4ca11b19086ce5a12565bac550ef26936f424e4ec5a0fe908fad4b39bea984758ac44720
|
data/.github/workflows/linux.yml
CHANGED
@@ -12,12 +12,15 @@ jobs:
|
|
12
12
|
strategy:
|
13
13
|
fail-fast: false
|
14
14
|
matrix:
|
15
|
-
ruby: [ '3.
|
15
|
+
ruby: [ '3.2', '3.1', '3.0', '2.7' ]
|
16
16
|
os:
|
17
17
|
- ubuntu-latest
|
18
|
-
|
18
|
+
rdkafka_versions:
|
19
|
+
- { min: '>= 0.6.0', max: '< 0.12.0' }
|
20
|
+
- { min: '>= 0.12.0', max: '>= 0.12.0' }
|
21
|
+
name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }} with rdkafka gem version (min ${{ matrix.rdkafka_versions.min }} max ${{ matrix.rdkafka_versions.max }})
|
19
22
|
steps:
|
20
|
-
- uses: actions/checkout@
|
23
|
+
- uses: actions/checkout@v3
|
21
24
|
- uses: ruby/setup-ruby@v1
|
22
25
|
with:
|
23
26
|
ruby-version: ${{ matrix.ruby }}
|
@@ -33,6 +36,8 @@ jobs:
|
|
33
36
|
- name: unit testing
|
34
37
|
env:
|
35
38
|
CI: true
|
39
|
+
RDKAFKA_VERSION_MIN_RANGE: ${{ matrix.rdkafka_versions.min }}
|
40
|
+
RDKAFKA_VERSION_MAX_RANGE: ${{ matrix.rdkafka_versions.max }}
|
36
41
|
run: |
|
37
42
|
sudo ./ci/prepare-kafka-server.sh
|
38
43
|
gem install bundler rake
|
@@ -7,7 +7,7 @@ jobs:
|
|
7
7
|
stale:
|
8
8
|
runs-on: ubuntu-latest
|
9
9
|
steps:
|
10
|
-
- uses: actions/stale@
|
10
|
+
- uses: actions/stale@v8
|
11
11
|
with:
|
12
12
|
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
13
13
|
days-before-stale: 90
|
@@ -18,5 +18,7 @@ jobs:
|
|
18
18
|
close-pr-message: "This PR was automatically closed because of stale in 30 days"
|
19
19
|
stale-pr-label: "stale"
|
20
20
|
stale-issue-label: "stale"
|
21
|
-
exempt-issue-labels: "bug,enhancement,help wanted"
|
22
|
-
exempt-pr-labels: "bug,enhancement,help wanted"
|
21
|
+
exempt-issue-labels: "bug,enhancement,help wanted,waiting-for-triage"
|
22
|
+
exempt-pr-labels: "bug,enhancement,help wanted,waiting-for-triage"
|
23
|
+
exempt-all-assignees: true
|
24
|
+
exempt-all-milestones: true
|
data/ChangeLog
CHANGED
@@ -1,3 +1,9 @@
|
|
1
|
+
Release 0.19.0 - 2023/04/26
|
2
|
+
* out_kafka2: Add support for AWS IAM authentication
|
3
|
+
* in_kafka, in_kafka_group, out_kafka2: Add support for ssl client cert key password
|
4
|
+
* out_rdkafka2: Mask `ssl_client_cert_key_password` on dumping it to log
|
5
|
+
* out_rdkafka2: Support rdkafka-ruby 0.12
|
6
|
+
|
1
7
|
Release 0.18.1 - 2022/08/17
|
2
8
|
* out_kafka2: Fix a bug that it doesn't respect `chunk_limit_records` and `chunk_limit_size`
|
3
9
|
|
data/Gemfile
CHANGED
data/README.md
CHANGED
@@ -36,6 +36,7 @@ If you want to use zookeeper related parameters, you also need to install zookee
|
|
36
36
|
- ssl_ca_cert
|
37
37
|
- ssl_client_cert
|
38
38
|
- ssl_client_cert_key
|
39
|
+
- ssl_client_cert_key_password
|
39
40
|
- ssl_ca_certs_from_system
|
40
41
|
|
41
42
|
Set path to SSL related files. See [Encryption and Authentication using SSL](https://github.com/zendesk/ruby-kafka#encryption-and-authentication-using-ssl) for more detail.
|
@@ -187,6 +188,10 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst
|
|
187
188
|
@type kafka2
|
188
189
|
|
189
190
|
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
|
191
|
+
|
192
|
+
# Kafka topic, placerholders are supported. Chunk keys are required in the Buffer section inorder for placeholders
|
193
|
+
# to work.
|
194
|
+
topic (string) :default => nil
|
190
195
|
topic_key (string) :default => 'topic'
|
191
196
|
partition_key (string) :default => 'partition'
|
192
197
|
partition_key_key (string) :default => 'partition_key'
|
@@ -208,6 +213,16 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst
|
|
208
213
|
partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
|
209
214
|
share_producer (bool) :default => false
|
210
215
|
|
216
|
+
# If you intend to rely on AWS IAM auth to MSK with long lived credentials
|
217
|
+
# https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html
|
218
|
+
#
|
219
|
+
# For AWS STS support, see status in
|
220
|
+
# - https://github.com/zendesk/ruby-kafka/issues/944
|
221
|
+
# - https://github.com/zendesk/ruby-kafka/pull/951
|
222
|
+
sasl_aws_msk_iam_access_key_id (string) :default => nil
|
223
|
+
sasl_aws_msk_iam_secret_key_id (string) :default => nil
|
224
|
+
sasl_aws_msk_iam_aws_region (string) :default => nil
|
225
|
+
|
211
226
|
<format>
|
212
227
|
@type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
|
213
228
|
</format>
|
@@ -243,13 +258,12 @@ ruby-kafka's log is routed to fluentd log so you can see ruby-kafka's log in flu
|
|
243
258
|
|
244
259
|
Supports following ruby-kafka's producer options.
|
245
260
|
|
246
|
-
- max_send_retries - default:
|
261
|
+
- max_send_retries - default: 2 - Number of times to retry sending of messages to a leader.
|
247
262
|
- required_acks - default: -1 - The number of acks required per request. If you need flush performance, set lower value, e.g. 1, 2.
|
248
263
|
- ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
|
249
264
|
- compression_codec - default: nil - The codec the producer uses to compress messages.
|
250
265
|
- max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped.
|
251
266
|
- discard_kafka_delivery_failed - default: false - discard the record where [Kafka::DeliveryFailed](http://www.rubydoc.info/gems/ruby-kafka/Kafka/DeliveryFailed) occurred
|
252
|
-
- monitoring_list - default: [] - library to be used to monitor. statsd and datadog are supported
|
253
267
|
|
254
268
|
If you want to know about detail of monitoring, see also https://github.com/zendesk/ruby-kafka#monitoring
|
255
269
|
|
@@ -420,6 +434,16 @@ Support of fluentd v0.12 has ended. `kafka_buffered` will be an alias of `kafka2
|
|
420
434
|
monitoring_list (array) :default => []
|
421
435
|
</match>
|
422
436
|
|
437
|
+
`kafka_buffered` supports the following `ruby-kafka` parameters:
|
438
|
+
|
439
|
+
- max_send_retries - default: 2 - Number of times to retry sending of messages to a leader.
|
440
|
+
- required_acks - default: -1 - The number of acks required per request. If you need flush performance, set lower value, e.g. 1, 2.
|
441
|
+
- ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
|
442
|
+
- compression_codec - default: nil - The codec the producer uses to compress messages.
|
443
|
+
- max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped.
|
444
|
+
- discard_kafka_delivery_failed - default: false - discard the record where [Kafka::DeliveryFailed](http://www.rubydoc.info/gems/ruby-kafka/Kafka/DeliveryFailed) occurred
|
445
|
+
- monitoring_list - default: [] - library to be used to monitor. statsd and datadog are supported
|
446
|
+
|
423
447
|
`kafka_buffered` has two additional parameters:
|
424
448
|
|
425
449
|
- kafka_agg_max_bytes - default: 4096 - Maximum value of total message size to be included in one batch transmission.
|
data/examples/README.md
ADDED
@@ -0,0 +1,32 @@
|
|
1
|
+
<source>
|
2
|
+
@type sample
|
3
|
+
sample {"hello": "world"}
|
4
|
+
rate 7000
|
5
|
+
tag sample.hello.world
|
6
|
+
</source>
|
7
|
+
|
8
|
+
<match sample.**>
|
9
|
+
@type kafka2
|
10
|
+
|
11
|
+
brokers "broker:29092"
|
12
|
+
|
13
|
+
# Writes to topic `events.sample.hello.world`
|
14
|
+
topic "events.${tag}"
|
15
|
+
|
16
|
+
# Writes to topic `hello.world`
|
17
|
+
# topic "${tag[1]}.${tag[2]}"
|
18
|
+
|
19
|
+
<format>
|
20
|
+
@type json
|
21
|
+
</format>
|
22
|
+
|
23
|
+
<buffer tag>
|
24
|
+
flush_at_shutdown true
|
25
|
+
flush_mode interval
|
26
|
+
flush_interval 1s
|
27
|
+
chunk_limit_size 3MB
|
28
|
+
chunk_full_threshold 1
|
29
|
+
total_limit_size 1024MB
|
30
|
+
overflow_action block
|
31
|
+
</buffer>
|
32
|
+
</match>
|
@@ -0,0 +1,23 @@
|
|
1
|
+
<source>
|
2
|
+
@type sample
|
3
|
+
sample {"hello": "world", "some_record":{"event":"message"}}
|
4
|
+
rate 7000
|
5
|
+
tag sample.hello.world
|
6
|
+
</source>
|
7
|
+
|
8
|
+
<match sample.**>
|
9
|
+
@type kafka2
|
10
|
+
|
11
|
+
brokers "broker:29092"
|
12
|
+
|
13
|
+
record_key "some_record"
|
14
|
+
default_topic "events"
|
15
|
+
|
16
|
+
<format>
|
17
|
+
# requires the fluent-plugin-formatter-protobuf gem
|
18
|
+
# see its docs for full usage
|
19
|
+
@type protobuf
|
20
|
+
class_name SomeRecord
|
21
|
+
include_paths ["/opt/fluent-plugin-formatter-protobuf/some_record_pb.rb"]
|
22
|
+
</format>
|
23
|
+
</match>
|
@@ -0,0 +1,31 @@
|
|
1
|
+
<source>
|
2
|
+
@type sample
|
3
|
+
sample {"hello": "world", "some_record":{"event":"message"}}
|
4
|
+
rate 7000
|
5
|
+
tag sample.hello.world
|
6
|
+
</source>
|
7
|
+
|
8
|
+
<match sample.**>
|
9
|
+
@type kafka2
|
10
|
+
|
11
|
+
brokers "broker:29092"
|
12
|
+
|
13
|
+
# {"event": "message"} will be formatted and sent to Kafka
|
14
|
+
record_key "some_record"
|
15
|
+
|
16
|
+
default_topic "events"
|
17
|
+
|
18
|
+
<format>
|
19
|
+
@type json
|
20
|
+
</format>
|
21
|
+
|
22
|
+
<buffer>
|
23
|
+
flush_at_shutdown true
|
24
|
+
flush_mode interval
|
25
|
+
flush_interval 1s
|
26
|
+
chunk_limit_size 3MB
|
27
|
+
chunk_full_threshold 1
|
28
|
+
total_limit_size 1024MB
|
29
|
+
overflow_action block
|
30
|
+
</buffer>
|
31
|
+
</match>
|
data/fluent-plugin-kafka.gemspec
CHANGED
@@ -13,12 +13,12 @@ Gem::Specification.new do |gem|
|
|
13
13
|
gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
|
14
14
|
gem.name = "fluent-plugin-kafka"
|
15
15
|
gem.require_paths = ["lib"]
|
16
|
-
gem.version = '0.
|
16
|
+
gem.version = '0.19.0'
|
17
17
|
gem.required_ruby_version = ">= 2.1.0"
|
18
18
|
|
19
19
|
gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
|
20
20
|
gem.add_dependency 'ltsv'
|
21
|
-
gem.add_dependency 'ruby-kafka', '>= 1.
|
21
|
+
gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2'
|
22
22
|
gem.add_development_dependency "rake", ">= 0.9.2"
|
23
23
|
gem.add_development_dependency "test-unit", ">= 3.0.8"
|
24
24
|
gem.add_development_dependency "test-unit-rr", "~> 1.0"
|
@@ -200,16 +200,19 @@ class Fluent::KafkaInput < Fluent::Input
|
|
200
200
|
if @scram_mechanism != nil && @username != nil && @password != nil
|
201
201
|
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
|
202
202
|
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
|
203
|
+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
|
203
204
|
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
|
204
205
|
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
|
205
206
|
elsif @username != nil && @password != nil
|
206
207
|
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
|
207
208
|
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
|
209
|
+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
|
208
210
|
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,sasl_plain_username: @username, sasl_plain_password: @password,
|
209
211
|
sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
|
210
212
|
else
|
211
213
|
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
|
212
214
|
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
|
215
|
+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
|
213
216
|
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab,
|
214
217
|
ssl_verify_hostname: @ssl_verify_hostname)
|
215
218
|
end
|
@@ -188,16 +188,19 @@ class Fluent::KafkaGroupInput < Fluent::Input
|
|
188
188
|
if @scram_mechanism != nil && @username != nil && @password != nil
|
189
189
|
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
|
190
190
|
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
|
191
|
+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
|
191
192
|
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
|
192
193
|
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
|
193
194
|
elsif @username != nil && @password != nil
|
194
195
|
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
|
195
196
|
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
|
197
|
+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
|
196
198
|
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password,
|
197
199
|
sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
|
198
200
|
else
|
199
201
|
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
|
200
202
|
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
|
203
|
+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
|
201
204
|
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab,
|
202
205
|
ssl_verify_hostname: @ssl_verify_hostname)
|
203
206
|
end
|
@@ -1,5 +1,18 @@
|
|
1
1
|
module Fluent
|
2
2
|
module KafkaPluginUtil
|
3
|
+
module AwsIamSettings
|
4
|
+
def self.included(klass)
|
5
|
+
klass.instance_eval do
|
6
|
+
config_param :sasl_aws_msk_iam_access_key_id, :string, :default => nil, secret: true,
|
7
|
+
desc: "AWS access key Id for IAM authentication to MSK."
|
8
|
+
config_param :sasl_aws_msk_iam_secret_key_id, :string, :default => nil, secret: true,
|
9
|
+
desc: "AWS access key secret for IAM authentication to MSK."
|
10
|
+
config_param :sasl_aws_msk_iam_aws_region, :string, :default => nil,
|
11
|
+
desc: "AWS region for IAM authentication to MSK."
|
12
|
+
end
|
13
|
+
end
|
14
|
+
end
|
15
|
+
|
3
16
|
module SSLSettings
|
4
17
|
def self.included(klass)
|
5
18
|
klass.instance_eval {
|
@@ -10,6 +23,8 @@ module Fluent
|
|
10
23
|
:desc => "a PEM encoded client cert to use with and SSL connection. Must be used in combination with ssl_client_cert_key."
|
11
24
|
config_param :ssl_client_cert_key, :string, :default => nil,
|
12
25
|
:desc => "a PEM encoded client cert key to use with and SSL connection. Must be used in combination with ssl_client_cert."
|
26
|
+
config_param :ssl_client_cert_key_password, :string, :default => nil, secret: true,
|
27
|
+
:desc => "a PEM encoded client cert key password to use with SSL connection."
|
13
28
|
config_param :ssl_client_cert_chain, :string, :default => nil,
|
14
29
|
:desc => "an extra PEM encoded cert to use with and SSL connection."
|
15
30
|
config_param :ssl_ca_certs_from_system, :bool, :default => false,
|
@@ -95,6 +95,7 @@ DESC
|
|
95
95
|
config_set_default :@type, 'json'
|
96
96
|
end
|
97
97
|
|
98
|
+
include Fluent::KafkaPluginUtil::AwsIamSettings
|
98
99
|
include Fluent::KafkaPluginUtil::SSLSettings
|
99
100
|
include Fluent::KafkaPluginUtil::SaslSettings
|
100
101
|
|
@@ -113,25 +114,47 @@ DESC
|
|
113
114
|
def refresh_client(raise_error = true)
|
114
115
|
begin
|
115
116
|
logger = @get_kafka_client_log ? log : nil
|
117
|
+
use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
|
116
118
|
if @scram_mechanism != nil && @username != nil && @password != nil
|
117
|
-
|
118
|
-
|
119
|
-
|
120
|
-
|
121
|
-
|
119
|
+
sasl_params = {
|
120
|
+
sasl_scram_username: @username,
|
121
|
+
sasl_scram_password: @password,
|
122
|
+
sasl_scram_mechanism: @scram_mechanism,
|
123
|
+
}
|
122
124
|
elsif @username != nil && @password != nil
|
123
|
-
|
124
|
-
|
125
|
-
|
126
|
-
|
127
|
-
|
125
|
+
sasl_params = {
|
126
|
+
sasl_plain_username: @username,
|
127
|
+
sasl_plain_password: @password,
|
128
|
+
}
|
129
|
+
elsif use_long_lived_aws_credentials
|
130
|
+
sasl_params = {
|
131
|
+
sasl_aws_msk_iam_access_key_id: @sasl_aws_msk_iam_access_key_id,
|
132
|
+
sasl_aws_msk_iam_secret_key_id: @sasl_aws_msk_iam_secret_key_id,
|
133
|
+
sasl_aws_msk_iam_aws_region: @sasl_aws_msk_iam_aws_region,
|
134
|
+
}
|
128
135
|
else
|
129
|
-
|
130
|
-
|
131
|
-
|
132
|
-
|
133
|
-
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
|
136
|
+
sasl_params = {
|
137
|
+
sasl_gssapi_principal: @principal,
|
138
|
+
sasl_gssapi_keytab: @keytab,
|
139
|
+
}
|
134
140
|
end
|
141
|
+
@kafka = Kafka.new(
|
142
|
+
seed_brokers: @seed_brokers,
|
143
|
+
client_id: @client_id,
|
144
|
+
logger: logger,
|
145
|
+
connect_timeout: @connect_timeout,
|
146
|
+
socket_timeout: @socket_timeout,
|
147
|
+
ssl_ca_cert_file_path: @ssl_ca_cert,
|
148
|
+
ssl_client_cert: read_ssl_file(@ssl_client_cert),
|
149
|
+
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
|
150
|
+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
|
151
|
+
ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
|
152
|
+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
|
153
|
+
ssl_verify_hostname: @ssl_verify_hostname,
|
154
|
+
resolve_seed_brokers: @resolve_seed_brokers,
|
155
|
+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function),
|
156
|
+
sasl_over_ssl: @sasl_over_ssl,
|
157
|
+
**sasl_params)
|
135
158
|
log.info "initialized kafka producer: #{@client_id}"
|
136
159
|
rescue Exception => e
|
137
160
|
if raise_error # During startup, error should be reported to engine and stop its phase for safety.
|
@@ -5,9 +5,37 @@ require 'fluent/plugin/kafka_plugin_util'
|
|
5
5
|
|
6
6
|
require 'rdkafka'
|
7
7
|
|
8
|
+
# This is required for `rdkafka` version >= 0.12.0
|
9
|
+
# Overriding the close method in order to provide a time limit for when it should be forcibly closed
|
10
|
+
class Rdkafka::Producer::Client
|
11
|
+
# return false if producer is forcefully closed, otherwise return true
|
12
|
+
def close(timeout=nil)
|
13
|
+
return unless @native
|
14
|
+
|
15
|
+
# Indicate to polling thread that we're closing
|
16
|
+
@polling_thread[:closing] = true
|
17
|
+
# Wait for the polling thread to finish up
|
18
|
+
thread = @polling_thread.join(timeout)
|
19
|
+
|
20
|
+
Rdkafka::Bindings.rd_kafka_destroy(@native)
|
21
|
+
|
22
|
+
@native = nil
|
23
|
+
|
24
|
+
return !thread.nil?
|
25
|
+
end
|
26
|
+
end
|
27
|
+
|
8
28
|
class Rdkafka::Producer
|
9
29
|
# return false if producer is forcefully closed, otherwise return true
|
10
30
|
def close(timeout = nil)
|
31
|
+
rdkafka_version = Rdkafka::VERSION || '0.0.0'
|
32
|
+
# Rdkafka version >= 0.12.0 changed its internals
|
33
|
+
if Gem::Version::create(rdkafka_version) >= Gem::Version.create('0.12.0')
|
34
|
+
ObjectSpace.undefine_finalizer(self)
|
35
|
+
|
36
|
+
return @client.close(timeout)
|
37
|
+
end
|
38
|
+
|
11
39
|
@closing = true
|
12
40
|
# Wait for the polling thread to finish up
|
13
41
|
# If the broker isn't alive, the thread doesn't exit
|
@@ -95,7 +123,6 @@ DESC
|
|
95
123
|
config_param :max_enqueue_bytes_per_second, :size, :default => nil, :desc => 'The maximum number of enqueueing bytes per second'
|
96
124
|
|
97
125
|
config_param :service_name, :string, :default => nil, :desc => 'Used for sasl.kerberos.service.name'
|
98
|
-
config_param :ssl_client_cert_key_password, :string, :default => nil, :desc => 'Used for ssl.key.password'
|
99
126
|
|
100
127
|
config_section :buffer do
|
101
128
|
config_set_default :chunk_keys, ["topic"]
|
metadata
CHANGED
@@ -1,7 +1,7 @@
|
|
1
1
|
--- !ruby/object:Gem::Specification
|
2
2
|
name: fluent-plugin-kafka
|
3
3
|
version: !ruby/object:Gem::Version
|
4
|
-
version: 0.
|
4
|
+
version: 0.19.0
|
5
5
|
platform: ruby
|
6
6
|
authors:
|
7
7
|
- Hidemasa Togashi
|
@@ -9,7 +9,7 @@ authors:
|
|
9
9
|
autorequire:
|
10
10
|
bindir: bin
|
11
11
|
cert_chain: []
|
12
|
-
date:
|
12
|
+
date: 2023-04-26 00:00:00.000000000 Z
|
13
13
|
dependencies:
|
14
14
|
- !ruby/object:Gem::Dependency
|
15
15
|
name: fluentd
|
@@ -51,7 +51,7 @@ dependencies:
|
|
51
51
|
requirements:
|
52
52
|
- - ">="
|
53
53
|
- !ruby/object:Gem::Version
|
54
|
-
version: 1.
|
54
|
+
version: 1.5.0
|
55
55
|
- - "<"
|
56
56
|
- !ruby/object:Gem::Version
|
57
57
|
version: '2'
|
@@ -61,7 +61,7 @@ dependencies:
|
|
61
61
|
requirements:
|
62
62
|
- - ">="
|
63
63
|
- !ruby/object:Gem::Version
|
64
|
-
version: 1.
|
64
|
+
version: 1.5.0
|
65
65
|
- - "<"
|
66
66
|
- !ruby/object:Gem::Version
|
67
67
|
version: '2'
|
@@ -146,6 +146,7 @@ files:
|
|
146
146
|
- ".github/ISSUE_TEMPLATE/bug_report.yaml"
|
147
147
|
- ".github/ISSUE_TEMPLATE/config.yml"
|
148
148
|
- ".github/ISSUE_TEMPLATE/feature_request.yaml"
|
149
|
+
- ".github/dependabot.yml"
|
149
150
|
- ".github/workflows/linux.yml"
|
150
151
|
- ".github/workflows/stale-actions.yml"
|
151
152
|
- ".gitignore"
|
@@ -155,6 +156,10 @@ files:
|
|
155
156
|
- README.md
|
156
157
|
- Rakefile
|
157
158
|
- ci/prepare-kafka-server.sh
|
159
|
+
- examples/README.md
|
160
|
+
- examples/out_kafka2/dynamic_topic_based_on_tag.conf
|
161
|
+
- examples/out_kafka2/protobuf-formatter.conf
|
162
|
+
- examples/out_kafka2/record_key.conf
|
158
163
|
- fluent-plugin-kafka.gemspec
|
159
164
|
- lib/fluent/plugin/in_kafka.rb
|
160
165
|
- lib/fluent/plugin/in_kafka_group.rb
|