fluent-plugin-kafka 0.18.1 → 0.19.0

This diff represents the content of publicly available package versions that have been released to one of the supported registries. The information contained in this diff is provided for informational purposes only and reflects changes between package versions as they appear in their respective public registries.
checksums.yaml CHANGED
@@ -1,7 +1,7 @@
1
1
  ---
2
2
  SHA256:
3
- metadata.gz: 8b47d0dafccd1d84a2adcef4e9a0830b26998fd21c4095a974887ffa201fc64c
4
- data.tar.gz: 744716215237149802687884a5c306e4684f1d2e89959d13d04b64f3116a65f2
3
+ metadata.gz: b4a8c37b041fedc3f95046620413e5f7da437557fc19294439585fa89fd5b244
4
+ data.tar.gz: fe4c8cc8df6b8b5b105fbf709044e10752d8b64f321b5a4e290cb7e316e10fe1
5
5
  SHA512:
6
- metadata.gz: 0215e2229d272a24abef39f8179457a4585d582a388f8b03bc70a1d4a066d27422878c91277d5b7412b3ddd930716b337ff56a75e4fb8de00366063b7c47c52a
7
- data.tar.gz: 6f47fa553a9cf2cf2d55b725d5b8f562e6aef1c3ef92aee597bbf9f84aac7e0fb7fac5d67864d935c02b1dc5a890563a9b530b75ae47d3bac1763fbc021f20bc
6
+ metadata.gz: 2ff333ee092e0ffd653ab476acf8b2656b4ca59ea32d6dcc846eb8a174f69d98811272e8a735bfa3bdfac3d5b3753ce499adb5c8a215b3197310b1f4d822364e
7
+ data.tar.gz: ebf6cafbde9635cfc886ee4dce84495353b2d389e447e6188f6a88eb4ca11b19086ce5a12565bac550ef26936f424e4ec5a0fe908fad4b39bea984758ac44720
@@ -1,5 +1,6 @@
1
1
  name: Bug Report
2
2
  description: Create a report with a procedure for reproducing the bug
3
+ labels: "waiting-for-triage"
3
4
  body:
4
5
  - type: markdown
5
6
  attributes:
@@ -1,5 +1,6 @@
1
1
  name: Feature request
2
2
  description: Suggest an idea for this project
3
+ labels: "waiting-for-triage"
3
4
  body:
4
5
  - type: markdown
5
6
  attributes:
@@ -0,0 +1,6 @@
1
+ version: 2
2
+ updates:
3
+ - package-ecosystem: 'github-actions'
4
+ directory: '/'
5
+ schedule:
6
+ interval: 'weekly'
@@ -12,12 +12,15 @@ jobs:
12
12
  strategy:
13
13
  fail-fast: false
14
14
  matrix:
15
- ruby: [ '3.1', '3.0', '2.7', '2.6' ]
15
+ ruby: [ '3.2', '3.1', '3.0', '2.7' ]
16
16
  os:
17
17
  - ubuntu-latest
18
- name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }}
18
+ rdkafka_versions:
19
+ - { min: '>= 0.6.0', max: '< 0.12.0' }
20
+ - { min: '>= 0.12.0', max: '>= 0.12.0' }
21
+ name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }} with rdkafka gem version (min ${{ matrix.rdkafka_versions.min }} max ${{ matrix.rdkafka_versions.max }})
19
22
  steps:
20
- - uses: actions/checkout@v2
23
+ - uses: actions/checkout@v3
21
24
  - uses: ruby/setup-ruby@v1
22
25
  with:
23
26
  ruby-version: ${{ matrix.ruby }}
@@ -33,6 +36,8 @@ jobs:
33
36
  - name: unit testing
34
37
  env:
35
38
  CI: true
39
+ RDKAFKA_VERSION_MIN_RANGE: ${{ matrix.rdkafka_versions.min }}
40
+ RDKAFKA_VERSION_MAX_RANGE: ${{ matrix.rdkafka_versions.max }}
36
41
  run: |
37
42
  sudo ./ci/prepare-kafka-server.sh
38
43
  gem install bundler rake
@@ -7,7 +7,7 @@ jobs:
7
7
  stale:
8
8
  runs-on: ubuntu-latest
9
9
  steps:
10
- - uses: actions/stale@v3
10
+ - uses: actions/stale@v8
11
11
  with:
12
12
  repo-token: ${{ secrets.GITHUB_TOKEN }}
13
13
  days-before-stale: 90
@@ -18,5 +18,7 @@ jobs:
18
18
  close-pr-message: "This PR was automatically closed because of stale in 30 days"
19
19
  stale-pr-label: "stale"
20
20
  stale-issue-label: "stale"
21
- exempt-issue-labels: "bug,enhancement,help wanted"
22
- exempt-pr-labels: "bug,enhancement,help wanted"
21
+ exempt-issue-labels: "bug,enhancement,help wanted,waiting-for-triage"
22
+ exempt-pr-labels: "bug,enhancement,help wanted,waiting-for-triage"
23
+ exempt-all-assignees: true
24
+ exempt-all-milestones: true
data/ChangeLog CHANGED
@@ -1,3 +1,9 @@
1
+ Release 0.19.0 - 2023/04/26
2
+ * out_kafka2: Add support for AWS IAM authentication
3
+ * in_kafka, in_kafka_group, out_kafka2: Add support for ssl client cert key password
4
+ * out_rdkafka2: Mask `ssl_client_cert_key_password` on dumping it to log
5
+ * out_rdkafka2: Support rdkafka-ruby 0.12
6
+
1
7
  Release 0.18.1 - 2022/08/17
2
8
  * out_kafka2: Fix a bug that it doesn't respect `chunk_limit_records` and `chunk_limit_size`
3
9
 
data/Gemfile CHANGED
@@ -3,4 +3,4 @@ source 'https://rubygems.org'
3
3
  # Specify your gem's dependencies in fluent-plugin-kafka.gemspec
4
4
  gemspec
5
5
 
6
- gem 'rdkafka', '>= 0.6.0' if ENV["USE_RDKAFKA"]
6
+ gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA']
data/README.md CHANGED
@@ -36,6 +36,7 @@ If you want to use zookeeper related parameters, you also need to install zookee
36
36
  - ssl_ca_cert
37
37
  - ssl_client_cert
38
38
  - ssl_client_cert_key
39
+ - ssl_client_cert_key_password
39
40
  - ssl_ca_certs_from_system
40
41
 
41
42
  Set path to SSL related files. See [Encryption and Authentication using SSL](https://github.com/zendesk/ruby-kafka#encryption-and-authentication-using-ssl) for more detail.
@@ -187,6 +188,10 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst
187
188
  @type kafka2
188
189
 
189
190
  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
191
+
192
+ # Kafka topic, placerholders are supported. Chunk keys are required in the Buffer section inorder for placeholders
193
+ # to work.
194
+ topic (string) :default => nil
190
195
  topic_key (string) :default => 'topic'
191
196
  partition_key (string) :default => 'partition'
192
197
  partition_key_key (string) :default => 'partition_key'
@@ -208,6 +213,16 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst
208
213
  partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
209
214
  share_producer (bool) :default => false
210
215
 
216
+ # If you intend to rely on AWS IAM auth to MSK with long lived credentials
217
+ # https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html
218
+ #
219
+ # For AWS STS support, see status in
220
+ # - https://github.com/zendesk/ruby-kafka/issues/944
221
+ # - https://github.com/zendesk/ruby-kafka/pull/951
222
+ sasl_aws_msk_iam_access_key_id (string) :default => nil
223
+ sasl_aws_msk_iam_secret_key_id (string) :default => nil
224
+ sasl_aws_msk_iam_aws_region (string) :default => nil
225
+
211
226
  <format>
212
227
  @type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
213
228
  </format>
@@ -243,13 +258,12 @@ ruby-kafka's log is routed to fluentd log so you can see ruby-kafka's log in flu
243
258
 
244
259
  Supports following ruby-kafka's producer options.
245
260
 
246
- - max_send_retries - default: 1 - Number of times to retry sending of messages to a leader.
261
+ - max_send_retries - default: 2 - Number of times to retry sending of messages to a leader.
247
262
  - required_acks - default: -1 - The number of acks required per request. If you need flush performance, set lower value, e.g. 1, 2.
248
263
  - ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
249
264
  - compression_codec - default: nil - The codec the producer uses to compress messages.
250
265
  - max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped.
251
266
  - discard_kafka_delivery_failed - default: false - discard the record where [Kafka::DeliveryFailed](http://www.rubydoc.info/gems/ruby-kafka/Kafka/DeliveryFailed) occurred
252
- - monitoring_list - default: [] - library to be used to monitor. statsd and datadog are supported
253
267
 
254
268
  If you want to know about detail of monitoring, see also https://github.com/zendesk/ruby-kafka#monitoring
255
269
 
@@ -420,6 +434,16 @@ Support of fluentd v0.12 has ended. `kafka_buffered` will be an alias of `kafka2
420
434
  monitoring_list (array) :default => []
421
435
  </match>
422
436
 
437
+ `kafka_buffered` supports the following `ruby-kafka` parameters:
438
+
439
+ - max_send_retries - default: 2 - Number of times to retry sending of messages to a leader.
440
+ - required_acks - default: -1 - The number of acks required per request. If you need flush performance, set lower value, e.g. 1, 2.
441
+ - ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
442
+ - compression_codec - default: nil - The codec the producer uses to compress messages.
443
+ - max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped.
444
+ - discard_kafka_delivery_failed - default: false - discard the record where [Kafka::DeliveryFailed](http://www.rubydoc.info/gems/ruby-kafka/Kafka/DeliveryFailed) occurred
445
+ - monitoring_list - default: [] - library to be used to monitor. statsd and datadog are supported
446
+
423
447
  `kafka_buffered` has two additional parameters:
424
448
 
425
449
  - kafka_agg_max_bytes - default: 4096 - Maximum value of total message size to be included in one batch transmission.
@@ -0,0 +1,3 @@
1
+ # Examples
2
+
3
+ This directory contains example Fluentd config for this plugin
@@ -0,0 +1,32 @@
1
+ <source>
2
+ @type sample
3
+ sample {"hello": "world"}
4
+ rate 7000
5
+ tag sample.hello.world
6
+ </source>
7
+
8
+ <match sample.**>
9
+ @type kafka2
10
+
11
+ brokers "broker:29092"
12
+
13
+ # Writes to topic `events.sample.hello.world`
14
+ topic "events.${tag}"
15
+
16
+ # Writes to topic `hello.world`
17
+ # topic "${tag[1]}.${tag[2]}"
18
+
19
+ <format>
20
+ @type json
21
+ </format>
22
+
23
+ <buffer tag>
24
+ flush_at_shutdown true
25
+ flush_mode interval
26
+ flush_interval 1s
27
+ chunk_limit_size 3MB
28
+ chunk_full_threshold 1
29
+ total_limit_size 1024MB
30
+ overflow_action block
31
+ </buffer>
32
+ </match>
@@ -0,0 +1,23 @@
1
+ <source>
2
+ @type sample
3
+ sample {"hello": "world", "some_record":{"event":"message"}}
4
+ rate 7000
5
+ tag sample.hello.world
6
+ </source>
7
+
8
+ <match sample.**>
9
+ @type kafka2
10
+
11
+ brokers "broker:29092"
12
+
13
+ record_key "some_record"
14
+ default_topic "events"
15
+
16
+ <format>
17
+ # requires the fluent-plugin-formatter-protobuf gem
18
+ # see its docs for full usage
19
+ @type protobuf
20
+ class_name SomeRecord
21
+ include_paths ["/opt/fluent-plugin-formatter-protobuf/some_record_pb.rb"]
22
+ </format>
23
+ </match>
@@ -0,0 +1,31 @@
1
+ <source>
2
+ @type sample
3
+ sample {"hello": "world", "some_record":{"event":"message"}}
4
+ rate 7000
5
+ tag sample.hello.world
6
+ </source>
7
+
8
+ <match sample.**>
9
+ @type kafka2
10
+
11
+ brokers "broker:29092"
12
+
13
+ # {"event": "message"} will be formatted and sent to Kafka
14
+ record_key "some_record"
15
+
16
+ default_topic "events"
17
+
18
+ <format>
19
+ @type json
20
+ </format>
21
+
22
+ <buffer>
23
+ flush_at_shutdown true
24
+ flush_mode interval
25
+ flush_interval 1s
26
+ chunk_limit_size 3MB
27
+ chunk_full_threshold 1
28
+ total_limit_size 1024MB
29
+ overflow_action block
30
+ </buffer>
31
+ </match>
@@ -13,12 +13,12 @@ Gem::Specification.new do |gem|
13
13
  gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
14
14
  gem.name = "fluent-plugin-kafka"
15
15
  gem.require_paths = ["lib"]
16
- gem.version = '0.18.1'
16
+ gem.version = '0.19.0'
17
17
  gem.required_ruby_version = ">= 2.1.0"
18
18
 
19
19
  gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
20
20
  gem.add_dependency 'ltsv'
21
- gem.add_dependency 'ruby-kafka', '>= 1.4.0', '< 2'
21
+ gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2'
22
22
  gem.add_development_dependency "rake", ">= 0.9.2"
23
23
  gem.add_development_dependency "test-unit", ">= 3.0.8"
24
24
  gem.add_development_dependency "test-unit-rr", "~> 1.0"
@@ -200,16 +200,19 @@ class Fluent::KafkaInput < Fluent::Input
200
200
  if @scram_mechanism != nil && @username != nil && @password != nil
201
201
  @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
202
202
  ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
203
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
203
204
  ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
204
205
  sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
205
206
  elsif @username != nil && @password != nil
206
207
  @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
207
208
  ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
209
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
208
210
  ssl_ca_certs_from_system: @ssl_ca_certs_from_system,sasl_plain_username: @username, sasl_plain_password: @password,
209
211
  sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
210
212
  else
211
213
  @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
212
214
  ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
215
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
213
216
  ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab,
214
217
  ssl_verify_hostname: @ssl_verify_hostname)
215
218
  end
@@ -188,16 +188,19 @@ class Fluent::KafkaGroupInput < Fluent::Input
188
188
  if @scram_mechanism != nil && @username != nil && @password != nil
189
189
  @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
190
190
  ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
191
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
191
192
  ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
192
193
  sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
193
194
  elsif @username != nil && @password != nil
194
195
  @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
195
196
  ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
197
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
196
198
  ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password,
197
199
  sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
198
200
  else
199
201
  @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
200
202
  ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
203
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
201
204
  ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab,
202
205
  ssl_verify_hostname: @ssl_verify_hostname)
203
206
  end
@@ -1,5 +1,18 @@
1
1
  module Fluent
2
2
  module KafkaPluginUtil
3
+ module AwsIamSettings
4
+ def self.included(klass)
5
+ klass.instance_eval do
6
+ config_param :sasl_aws_msk_iam_access_key_id, :string, :default => nil, secret: true,
7
+ desc: "AWS access key Id for IAM authentication to MSK."
8
+ config_param :sasl_aws_msk_iam_secret_key_id, :string, :default => nil, secret: true,
9
+ desc: "AWS access key secret for IAM authentication to MSK."
10
+ config_param :sasl_aws_msk_iam_aws_region, :string, :default => nil,
11
+ desc: "AWS region for IAM authentication to MSK."
12
+ end
13
+ end
14
+ end
15
+
3
16
  module SSLSettings
4
17
  def self.included(klass)
5
18
  klass.instance_eval {
@@ -10,6 +23,8 @@ module Fluent
10
23
  :desc => "a PEM encoded client cert to use with and SSL connection. Must be used in combination with ssl_client_cert_key."
11
24
  config_param :ssl_client_cert_key, :string, :default => nil,
12
25
  :desc => "a PEM encoded client cert key to use with and SSL connection. Must be used in combination with ssl_client_cert."
26
+ config_param :ssl_client_cert_key_password, :string, :default => nil, secret: true,
27
+ :desc => "a PEM encoded client cert key password to use with SSL connection."
13
28
  config_param :ssl_client_cert_chain, :string, :default => nil,
14
29
  :desc => "an extra PEM encoded cert to use with and SSL connection."
15
30
  config_param :ssl_ca_certs_from_system, :bool, :default => false,
@@ -95,6 +95,7 @@ DESC
95
95
  config_set_default :@type, 'json'
96
96
  end
97
97
 
98
+ include Fluent::KafkaPluginUtil::AwsIamSettings
98
99
  include Fluent::KafkaPluginUtil::SSLSettings
99
100
  include Fluent::KafkaPluginUtil::SaslSettings
100
101
 
@@ -113,25 +114,47 @@ DESC
113
114
  def refresh_client(raise_error = true)
114
115
  begin
115
116
  logger = @get_kafka_client_log ? log : nil
117
+ use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
116
118
  if @scram_mechanism != nil && @username != nil && @password != nil
117
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
118
- ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
119
- ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
120
- sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
121
- partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
119
+ sasl_params = {
120
+ sasl_scram_username: @username,
121
+ sasl_scram_password: @password,
122
+ sasl_scram_mechanism: @scram_mechanism,
123
+ }
122
124
  elsif @username != nil && @password != nil
123
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
124
- ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
125
- ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
126
- ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
127
- partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
125
+ sasl_params = {
126
+ sasl_plain_username: @username,
127
+ sasl_plain_password: @password,
128
+ }
129
+ elsif use_long_lived_aws_credentials
130
+ sasl_params = {
131
+ sasl_aws_msk_iam_access_key_id: @sasl_aws_msk_iam_access_key_id,
132
+ sasl_aws_msk_iam_secret_key_id: @sasl_aws_msk_iam_secret_key_id,
133
+ sasl_aws_msk_iam_aws_region: @sasl_aws_msk_iam_aws_region,
134
+ }
128
135
  else
129
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
130
- ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
131
- ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl,
132
- ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
133
- partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
136
+ sasl_params = {
137
+ sasl_gssapi_principal: @principal,
138
+ sasl_gssapi_keytab: @keytab,
139
+ }
134
140
  end
141
+ @kafka = Kafka.new(
142
+ seed_brokers: @seed_brokers,
143
+ client_id: @client_id,
144
+ logger: logger,
145
+ connect_timeout: @connect_timeout,
146
+ socket_timeout: @socket_timeout,
147
+ ssl_ca_cert_file_path: @ssl_ca_cert,
148
+ ssl_client_cert: read_ssl_file(@ssl_client_cert),
149
+ ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
150
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
151
+ ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
152
+ ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
153
+ ssl_verify_hostname: @ssl_verify_hostname,
154
+ resolve_seed_brokers: @resolve_seed_brokers,
155
+ partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function),
156
+ sasl_over_ssl: @sasl_over_ssl,
157
+ **sasl_params)
135
158
  log.info "initialized kafka producer: #{@client_id}"
136
159
  rescue Exception => e
137
160
  if raise_error # During startup, error should be reported to engine and stop its phase for safety.
@@ -5,9 +5,37 @@ require 'fluent/plugin/kafka_plugin_util'
5
5
 
6
6
  require 'rdkafka'
7
7
 
8
+ # This is required for `rdkafka` version >= 0.12.0
9
+ # Overriding the close method in order to provide a time limit for when it should be forcibly closed
10
+ class Rdkafka::Producer::Client
11
+ # return false if producer is forcefully closed, otherwise return true
12
+ def close(timeout=nil)
13
+ return unless @native
14
+
15
+ # Indicate to polling thread that we're closing
16
+ @polling_thread[:closing] = true
17
+ # Wait for the polling thread to finish up
18
+ thread = @polling_thread.join(timeout)
19
+
20
+ Rdkafka::Bindings.rd_kafka_destroy(@native)
21
+
22
+ @native = nil
23
+
24
+ return !thread.nil?
25
+ end
26
+ end
27
+
8
28
  class Rdkafka::Producer
9
29
  # return false if producer is forcefully closed, otherwise return true
10
30
  def close(timeout = nil)
31
+ rdkafka_version = Rdkafka::VERSION || '0.0.0'
32
+ # Rdkafka version >= 0.12.0 changed its internals
33
+ if Gem::Version::create(rdkafka_version) >= Gem::Version.create('0.12.0')
34
+ ObjectSpace.undefine_finalizer(self)
35
+
36
+ return @client.close(timeout)
37
+ end
38
+
11
39
  @closing = true
12
40
  # Wait for the polling thread to finish up
13
41
  # If the broker isn't alive, the thread doesn't exit
@@ -95,7 +123,6 @@ DESC
95
123
  config_param :max_enqueue_bytes_per_second, :size, :default => nil, :desc => 'The maximum number of enqueueing bytes per second'
96
124
 
97
125
  config_param :service_name, :string, :default => nil, :desc => 'Used for sasl.kerberos.service.name'
98
- config_param :ssl_client_cert_key_password, :string, :default => nil, :desc => 'Used for ssl.key.password'
99
126
 
100
127
  config_section :buffer do
101
128
  config_set_default :chunk_keys, ["topic"]
metadata CHANGED
@@ -1,7 +1,7 @@
1
1
  --- !ruby/object:Gem::Specification
2
2
  name: fluent-plugin-kafka
3
3
  version: !ruby/object:Gem::Version
4
- version: 0.18.1
4
+ version: 0.19.0
5
5
  platform: ruby
6
6
  authors:
7
7
  - Hidemasa Togashi
@@ -9,7 +9,7 @@ authors:
9
9
  autorequire:
10
10
  bindir: bin
11
11
  cert_chain: []
12
- date: 2022-08-17 00:00:00.000000000 Z
12
+ date: 2023-04-26 00:00:00.000000000 Z
13
13
  dependencies:
14
14
  - !ruby/object:Gem::Dependency
15
15
  name: fluentd
@@ -51,7 +51,7 @@ dependencies:
51
51
  requirements:
52
52
  - - ">="
53
53
  - !ruby/object:Gem::Version
54
- version: 1.4.0
54
+ version: 1.5.0
55
55
  - - "<"
56
56
  - !ruby/object:Gem::Version
57
57
  version: '2'
@@ -61,7 +61,7 @@ dependencies:
61
61
  requirements:
62
62
  - - ">="
63
63
  - !ruby/object:Gem::Version
64
- version: 1.4.0
64
+ version: 1.5.0
65
65
  - - "<"
66
66
  - !ruby/object:Gem::Version
67
67
  version: '2'
@@ -146,6 +146,7 @@ files:
146
146
  - ".github/ISSUE_TEMPLATE/bug_report.yaml"
147
147
  - ".github/ISSUE_TEMPLATE/config.yml"
148
148
  - ".github/ISSUE_TEMPLATE/feature_request.yaml"
149
+ - ".github/dependabot.yml"
149
150
  - ".github/workflows/linux.yml"
150
151
  - ".github/workflows/stale-actions.yml"
151
152
  - ".gitignore"
@@ -155,6 +156,10 @@ files:
155
156
  - README.md
156
157
  - Rakefile
157
158
  - ci/prepare-kafka-server.sh
159
+ - examples/README.md
160
+ - examples/out_kafka2/dynamic_topic_based_on_tag.conf
161
+ - examples/out_kafka2/protobuf-formatter.conf
162
+ - examples/out_kafka2/record_key.conf
158
163
  - fluent-plugin-kafka.gemspec
159
164
  - lib/fluent/plugin/in_kafka.rb
160
165
  - lib/fluent/plugin/in_kafka_group.rb