Bogdan Popa <bogdan@defn.io>
This package provides a client for Apache Kafka versions 0.11 and up. It is a work in progress, so expect breaking changes.
Clients transparently pool connections to brokers within a cluster. Connections are leased from the pool in order of least in-progress requests. Reconnections are handled transparently, and connection errors bubble up to the caller. Despite being thread-safe, clients may not be shared between consumers.
procedure
#:bootstrap-hosthost#:bootstrap-portport#:sasl-mechanism&ctxsasl-ctx#:ssl-ctxssl-ctx
When a sasl-ctx is provided, it is used to authenticate the connection to the bootstrap host as well as any subsequent connections made to other nodes in the cluster.
When an ssl-ctx is provided, it is used to encrypt all connections.
procedure
( disconnect-all c)→void?
c:client?
value
sasl-ctx-proc/c :(-> string? (integer-in 065535)sasl-ctx? )
Proxies act as intermediaries between clients and brokers.
procedure
( make-http-proxy hostport)→proxy?
host:string?
procedure
( exn:fail:kafka? v)→boolean?
v:any/cprocedure
v:any/cprocedure
v:any/c
procedure
( create-topics ct...+)→CreatedTopics?
c:client?
When given a set of topics, some of them may succeed, and some may fail. It’s up to the caller to inspect the error codes on the returned CreatedTopic s.
procedure
( delete-topics ct...+)→DeletedTopics?
c:client?t:string?
procedure
#:partitionspartitions[ #:replication-factorfactor#:assignmentsassignmentsname:string?partitions:exact-positive-integer?
struct
(struct CreatedTopics (topics))
struct
(struct CreatedTopic (error-codeerror-messagename))
error-code:exact-nonnegative-integer?name:string?
struct
(struct DeletedTopics (throttle-time-mstopicstags))
struct
(struct DeletedTopic (error-codeerror-messagenameuuidtags))
error-code:error-code/cname:string?
Record results represent the results of publishing individual records.
struct
(struct RecordResult (topicpartition))
topic:string?partition:ProduceResponsePartition?
struct
(struct ProduceResponsePartition (iderror-codeoffset))
error-code:exact-nonnegative-integer?offset:exact-nonnegative-integer?
Consumers form consumer groups to subscribe to topics and retrieve records. As the name implies, consumer groups group consumers together so that topic partitions may be spread out across the members of the group.
Consumers are not thread-safe.
procedure
group-idtopic...+[ #:reset-strategystrategy#:session-timeout-mssession-timeout-ms])client:client?group-id:string?topic:string?
The #:reset-strategy argument controls what the consumer’s initial offsets for newly-assigned partitions are going to be. When this value is 'earliest, the consumer will receive records starting from the beginning of each partition. When this value is 'latest, it will receive records starting from the time that it subscribes to each topic.
procedure
( consume-evt c[timeout-ms])
When a consumer leaves or joins the consumer group, the event will synchronize to a 'rebalance result. In that case, the consumer will automatically re-join the group and discard any un-committed offsets. The associated data is a hash from topic names to hashes of partition ids to offsets. When a rebalance happens, you must take care not to commit any old offsets (i.e. you must issue a new consume-evt before making any calls to consumer-commit ).
When either the timeout passes or new records become available on the broker, the event will synchronize to a 'records result whose associated data will be a vector of records.
More result types may be added in the future.
The timeout-ms argument controls how long the server-side may wait before returning a response. If there are no records in between the time this function is called and when the timeout passes, an empty vector or records will be returned. The other end may not necessarily respect the timeout value, and may return immediately when there are no more records.
procedure
( consumer-commit c)→void?
Call this function after you have successfully processed a batch of records received from consume-evt . If you forget to call this function, or if the consumer crashes in between calling consume-evt and calling this function, another consumer in the group will eventually receive that same batch again.
procedure
( consumer-stop c)→void?
Records represent individual key-value pairs on a topic.
procedure
r:record?
procedure
r:record?
procedure
( record-key r)→(or/c #fbytes? )
r:record?
procedure
( record-value r)→bytes?
r:record?
procedure
( record-headers r)→(hash/c string? bytes? )
r:record?
Consumers have several limitations at the moment, some of which will be addressed in future versions.
At the moment, only 'gzip is supported when producing records and 'gzip, 'lz4, 'snappy and 'zstd are supported when consuming records. Fetching a batch of records that is compressed using any other method will fail silently.
Only brokers that implement client-side assignment are supported (Apache Kafka versions 0.11 and up). At the moment, only the range and round-robin group assignment strategies are implemented.
Batches retrieved from the broker contain a CRC code for error detection, but the library does not validate these at the moment.
Producers publish data to one or more topics. They batch data internally by topic & partition, and they are thread-safe.
procedure
[ #:acksacks#:compressioncompression#:flush-intervalinterval#:max-batch-bytesmax-bytesc:client?
Data is batched internally by topic & partition. Within each batch, the data is compressed according to the #:compression method.
The producer automatically flushes its data every #:flush-interval milliseconds, whenever the total size of all its batches exceeds #:max-batch-bytes, or whenever the total number of records contained in all of its batches exceeds #:max-batch-size, whichever condition is true first.
During a flush, calling produce on a producer blocks until the flush completes.
topic:string?
Typically, you would call this function in a loop to produce a set of data, collect the results then sync them to ensure they’ve been written to the log.
procedure
( producer-flush p)→void?
procedure
( producer-stop p)→void?
Kafka supports snappy, lz4, and zstd compression in addition to gzip, but this library only supports gzip at the moment.