KafkaConsumer
public class KafkaConsumer: KafkaClient
A KafkaClient for consuming messages on a topic from a broker.
Usage Example:
do {
let config = KafkaConfig()
config.groupId = "Kitura"
config.autoOffsetReset = .beginning
let consumer = try KafkaConsumer(config: config)
consumer.connect(brokers: "localhost:9092")
try consumer.subscribe(topics: ["test"])
let records = consumer.poll()
print(records)
} catch {
print("Error creating consumer: \(error)")
}
-
Create a new
KafkaConsumer. If a groupID is not provided in the config, a random UUID will be used.Declaration
Swift
public init(config: KafkaConfig = KafkaConfig()) throwsParameters
configThe
KafkaConfigthat will configure your Kafka consumer. -
Subscribe to one or more topics. The consumer will be assigned partitions based on its consumer group. The subscribe() method is asynchronouswhich and will returns immediately. Background threads will (re)join the group, wait for group rebalance, issue any registered rebalance_cb, assign() the assigned partitions, and then start fetching messages. This cycle may take up to session.timeout.ms * 2 or more to complete.
Throws
AKafkaErrorif the consumer fails to subscribe the the topic.Declaration
Swift
public func subscribe(topics: [String]) throwsParameters
topicsAn array of Kafka topics to subscribe to.
-
Assign this consumer to a single topic with a specified partition to consume from. The offset to begin consuming from can also be specified, otherwise the consumer default to the end of the current messages.
Throws
AKafkaErrorif the consumer fails to subscribe the the topic.Declaration
Swift
public func assign(topic: String, partition: Int, offset: Int = Int(RD_KAFKA_OFFSET_END)) throwsParameters
topicThe Kafka topic to subscribe to.
partitionThe topic partition to consume from.
offsetThe topic offset to begin consuming from. Defaults to -1 representing the end of current messages.
-
Consume messages from the topic you are subscribed to. The messages will be consumed from your last call to poll. This function will block for at most timeout seconds.
Declaration
Swift
public func poll(timeout: TimeInterval = 1) throws -> [KafkaConsumerRecord]Parameters
timeoutThe maximum
TimeIntervalin seconds that this call will block for while consuming messages.Return Value
An array of
KafkaConsumerRecordthat have been consumed. -
Commit the offsets for the current partition assignment on the broker.
This marks the records since the last poll as processed so they will not be reassigned during a rebalance. If you are usingcommitSync(),enableAutoCommitonKafkaConfigshould be set to false.Declaration
Swift
public func commitSync() throws -
Close down the
KafkaConsumer. This call will block until the consumer has revoked its assignment. The maximum blocking time is roughly limited to session.timeout.ms. If you don’t call this function, the consumer will be closed when it is deinitialized.Declaration
Swift
public func close() throws
KafkaConsumer Class Reference