KafkaConsumer

public class KafkaConsumer: KafkaClient

A KafkaClient for consuming messages on a topic from a broker.

Usage Example:

do {
    let config = KafkaConfig()
    config.groupId = "Kitura"
    config.autoOffsetReset = .beginning
    let consumer = try KafkaConsumer(config: config)
    consumer.connect(brokers: "localhost:9092")
    try consumer.subscribe(topics: ["test"])
    let records = consumer.poll()
    print(records)
} catch {
    print("Error creating consumer: \(error)")
}
  • Create a new KafkaConsumer. If a groupID is not provided in the config, a random UUID will be used.

    Throws

    A KafkaError if the provided KafkaConfig is invalid.

    Declaration

    Swift

    public init(config: KafkaConfig = KafkaConfig()) throws

    Parameters

    config

    The KafkaConfig that will configure your Kafka consumer.

  • Subscribe to one or more topics. The consumer will be assigned partitions based on its consumer group. The subscribe() method is asynchronouswhich and will returns immediately. Background threads will (re)join the group, wait for group rebalance, issue any registered rebalance_cb, assign() the assigned partitions, and then start fetching messages. This cycle may take up to session.timeout.ms * 2 or more to complete.

    Throws

    A KafkaError if the consumer fails to subscribe the the topic.

    Declaration

    Swift

    public func subscribe(topics: [String]) throws

    Parameters

    topics

    An array of Kafka topics to subscribe to.

  • Assign this consumer to a single topic with a specified partition to consume from. The offset to begin consuming from can also be specified, otherwise the consumer default to the end of the current messages.

    Throws

    A KafkaError if the consumer fails to subscribe the the topic.

    Declaration

    Swift

    public func assign(topic: String, partition: Int, offset: Int = Int(RD_KAFKA_OFFSET_END)) throws

    Parameters

    topic

    The Kafka topic to subscribe to.

    partition

    The topic partition to consume from.

    offset

    The topic offset to begin consuming from. Defaults to -1 representing the end of current messages.

  • Consume messages from the topic you are subscribed to. The messages will be consumed from your last call to poll. This function will block for at most timeout seconds.

    Declaration

    Swift

    public func poll(timeout: TimeInterval = 1) throws -> [KafkaConsumerRecord]

    Parameters

    timeout

    The maximum TimeInterval in seconds that this call will block for while consuming messages.

    Return Value

    An array of KafkaConsumerRecord that have been consumed.

  • Commit the offsets for the current partition assignment on the broker.
    This marks the records since the last poll as processed so they will not be reassigned during a rebalance. If you are using commitSync(), enableAutoCommit on KafkaConfig should be set to false.

    Declaration

    Swift

    public func commitSync() throws
  • Close down the KafkaConsumer. This call will block until the consumer has revoked its assignment. The maximum blocking time is roughly limited to session.timeout.ms. If you don’t call this function, the consumer will be closed when it is deinitialized.

    Declaration

    Swift

    public func close() throws