KafkaConfig

public class KafkaConfig

The configuration settings for a Kafka consumer/producer. These can be set either using the helper functions provided or by subscripting KafkaConfig with the configuration key.
Link to configuration keys and descriptions.

Usage Example:

let config = KafkaConfig()
config.groupId = "Kitura"
let producer = try? KafkaConsumer(config: config)
  • The internal representation of the Kafka configuration.
    Values can be set by subscripting KafkaConfig directly.

    Declaration

    Swift

    public internal(set) var dictionary = [String: String]()
  • Initialize a KafkaConfig instance with default settings.

    Declaration

    Swift

    public init()
  • Directly set the configuration settings for librdKafka.
    Link to configuration keys and descriptions.

    Declaration

    Swift

    public subscript(key: String) -> String?
  • Client identifier. Defaults to rdkafka.

    Declaration

    Swift

    public var clientID: String
  • Maximum Kafka protocol request message size. Defaults to 1000000.

    Declaration

    Swift

    public var messageMaxBytes: Int
  • Maximum size for message to be copied to buffer. Messages larger than this will be passed by reference (zero-copy) at the expense of larger iovecs. Defaults to 65535.

    Declaration

    Swift

    public var messageCopyMaxBytes: Int
  • Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least fetchMaxBytes + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set. Defaults to 100000000.

    Declaration

    Swift

    public var receiveMessageMaxBytes: Int
  • Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. Defaults to 1000000

    Declaration

    Swift

    public var maxInFlightRequestsPerConnection: Int
  • Non-topic request timeout in milliseconds. This is for metadata requests, etc. Defaults to 60000.

    Declaration

    Swift

    public var metadataRequestTimeoutMs: Int
  • Topic metadata refresh interval in milliseconds. The metadata is automatically refreshed on error and connect. Use -1 to disable the intervalled refresh. Defaults to 300000

    Declaration

    Swift

    public var topicMetadataRefreshIntervalMs: Int
  • Metadata cache max age. Defaults to topic.metadata.refresh.interval.ms * 3

    Declaration

    Swift

    public var metadataMaxAgeMs: Int
  • When a topic loses its leader a new metadata request will be enqueued with this initial interval, exponentially increasing until the topic metadata has been refreshed. This is used to recover quickly from transitioning leader brokers. Defaults to 250.

    Declaration

    Swift

    public var topicMetadataRefreshFastIntervalMs: Int
  • Sparse metadata requests (consumes less network bandwidth). Defaults to true.

    Declaration

    Swift

    public var topicMetadataRefreshSparse: Bool
  • An array of debug contexts to enable. Defaults to an empty array.

    Declaration

    Swift

    public var debug: [DebugOptions]
  • The possible debug options that can be enabled.

    See more

    Declaration

    Swift

    public struct DebugOptions: CustomStringConvertible
  • Default timeout for network requests.
    Producer: ProduceRequests will use the lesser value of socket.timeout.ms and remaining message.timeout.ms for the first message in the batch.
    Consumer: FetchRequests will use fetch.wait.max.ms + socket.timeout.ms. Admin: Admin requests will use socket.timeout.ms
    Defaults to 60000.

    Declaration

    Swift

    public var socketTimeoutMs: Int
  • Broker socket send buffer size. System default is used if 0. Defaults to 0.

    Declaration

    Swift

    public var socketSendBufferBytes: Int
  • Broker socket receive buffer size. System default is used if 0. Defaults to 0.

    Declaration

    Swift

    public var socketReceiveBufferBytes: Int
  • Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets. Defaults to false

    Declaration

    Swift

    public var socketKeepaliveEnable: Bool
  • Disable the Nagle algorithm (TCP_NODELAY) on broker sockets.

    Declaration

    Swift

    public var socketNagleDisable: Bool
  • Disconnect from broker when this number of send failures (e.g., timed out requests) is reached. Disable with 0. WARNING: It is highly recommended to leave this setting at its default value of 1 to avoid the client and broker to become desynchronized in case of request timeouts. NOTE: The connection is automatically re-established.

    Declaration

    Swift

    public var socketMaxFails: Int
  • Allowed broker IPAddressFamily (any, IPv4, IPv6). Defaults to any.

    Declaration

    Swift

    public var brokerAddressFamily: IPAddressFamily
  • The possible IP address family options that can be set.

    See more

    Declaration

    Swift

    public struct IPAddressFamily: CustomStringConvertible
  • The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until reconnect.backoff.max.ms is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately. Defaults to 100.

    Declaration

    Swift

    public var reconnectBackoffMs: Int
  • The maximum time to wait before reconnecting to a broker after the connection has been closed. Defaults to 10000.

    Declaration

    Swift

    public var reconnectBackoffMaxMs: Int
  • The SecurityProtocol used to communicate with brokers.

    Declaration

    Swift

    public var securityProtocol: SecurityProtocol
  • The possible security protocols that can be used to communicate with brokers.

    See more

    Declaration

    Swift

    public struct SecurityProtocol: CustomStringConvertible
  • Path to client’s private key (PEM) used for authentication. Defaults to nil.

    Declaration

    Swift

    public var sslKeyLocation: String?
  • Private key passphrase. Defaults to nil.

    Declaration

    Swift

    public var sslKeyPassword: String?
  • Path to client’s public key (PEM) used for authentication. Defaults to nil.

    Declaration

    Swift

    public var sslCertificateLocation: String?
  • File or directory path to CA certificate(s) for verifying the broker’s key. Defaults to nil.

    Declaration

    Swift

    public var sslCaLocation: String?
  • Path to CRL for verifying broker’s certificate validity. Defaults to nil.

    Declaration

    Swift

    public var sslCrlLocation: String?
  • Path to client’s keystore (PKCS#12) used for authentication. Defaults to nil.

    Declaration

    Swift

    public var sslKeystoreLocation: String?
  • Client’s keystore (PKCS#12) password. Defaults to nil.

    Declaration

    Swift

    public var sslKeystorePassword: String?
  • SASL mechanism to use for authentication. Defaults to GSSAPI.

    Declaration

    Swift

    public var saslMechanism: SASLMechanism
  • The possible SASL mechanisms that can be used for authentication.

    See more

    Declaration

    Swift

    public struct SASLMechanism: CustomStringConvertible
  • SASL username for use with the PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512 mechanisms. Defaults to nil.

    Declaration

    Swift

    public var saslUsername: String?
  • SASL password for use with the PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512 mechanisms. Defaults to nil.

    Declaration

    Swift

    public var saslPassword: String?
  • Consumer Only. Client group id string. All clients sharing the same group.id belong to the same group and the messages will be split between them. Defaults to nil, meaning a random UUID String will be used as the groupId.

    Declaration

    Swift

    public var groupId: String?
  • Consumer Only. Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. Defaults to 10000.

    Declaration

    Swift

    public var sessionTimeoutMs: Int
  • Consumer Only. Group session keepalive heartbeat interval. Defaults to 3000.

    Declaration

    Swift

    public var heartbeatIntervalMs: Int
  • Consumer Only. Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set enable.auto.offset.store=false for long-time processing applications and then explicitly store offsets (using offsets_store()) after message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. Defaults to 300000.

    Declaration

    Swift

    public var maxPollIntervalMs: Int
  • Consumer Only. Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). Defaults to true.

    Declaration

    Swift

    public var enableAutoCommit: Bool
  • Consumer Only. The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. (0 = disable). Default to 5000.

    Declaration

    Swift

    public var autoCommitIntervalMs: Int
  • Consumer Only. Automatically store offset of last message provided to application. The offset store is an in-memory store of the next offset to (auto-)commit for each partition. Defaults to true

    Declaration

    Swift

    public var enableAutoOffsetStore: Bool
  • Consumer Only. Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. defaults to true.

    Declaration

    Swift

    public var enablePartitionEOF: Bool
  • Producer only. When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible. Defaults to false.

    Declaration

    Swift

    public var enableIdempotence: Bool
  • Producer only. Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. Defaults to 100000.

    Declaration

    Swift

    public var queueBufferingMaxMessages: Int
  • Producer only. Maximum total message size sum allowed on the producer queue. This queue is shared by all topics and partitions. This property has higher priority than queue.buffering.max.messages. Defaults to 1048576.

    Declaration

    Swift

    public var queueBufferingMaxKBytes: Int
  • Producer only. Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency. Defaults to 0.

    Declaration

    Swift

    public var queueBufferingMaxMs: Int
  • Producer only. How many times to retry sending a failing Message. Note: retrying may cause reordering unless enable.idempotence is set to true. Defaults to 2.

    Declaration

    Swift

    public var messageSendMaxRetries: Int
  • Producer only. This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request: 0=Broker does not send any response/ack to client, -1=Broker will block until message is committed by all in sync replicas (ISRs). Defaults to -1.

    Declaration

    Swift

    public var requestRequiredAcks: Int
  • Producer only. The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on request.required.acks being != 0. Default 5000.

    Declaration

    Swift

    public var requestTimeoutMs: Int
  • Producer only. Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time Kitura-Kafka may use to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. Defaults to 300000.

    Declaration

    Swift

    public var messageTimeoutMs: Int
  • Consumer only. Action to take when there is no initial offset in offset store or the desired offset is out of range: ‘smallest’,‘earliest’, ‘beginning’ - automatically reset the offset to the smallest offset, ‘largest’,‘latest’, ‘end’ - automatically reset the offset to the largest offset, ‘error’ - trigger an error which is retrieved by consuming message. Defaults to largest.

    Declaration

    Swift

    public var autoOffsetReset: AutoResetOptions
  • A struct representing the options actions to take when there is no initial offset in offset store or the desired offset is out of range.

    See more

    Declaration

    Swift

    public struct AutoResetOptions: CustomStringConvertible