EasyKafakLogo
Kafka Client for Unreal Engine 4/5
EasyKafka is a Kafka/Redpanda client sub-system for unreal engine. It supports producing and consuming records through blueprint and C++.
- Windows x86_64
- Hololens 2 (Windows ARM64)
- Linux x86_64
- Linux ARM64
Link the plugin modules to your project through <YourModule>.build.cs
:
CppStandard = CppStandardVersion.Cpp17;//avoid using boost if(Target.Platform == UnrealTargetPlatform.HoloLens || Target.Platform == UnrealTargetPlatform.Win64) bUseRTTI = true; PrivateDependencyModuleNames.AddRange( new string[] { "EasyKafka", "KafkaLib", "KafkaConsumer", "KafkaProducer", "KafkaAdmin" });
PAY ATTENTION TO THE BLOCKING METHODS.
Create Consumer with default configuration:
#include "EasyKafkaSubsystem.h" TSharedPtr<FEasyKafkaModule> EasyKafka = GEngine->GetEngineSubsystem<UEasyKafkaSubsystem>()->GetEasyKafka(); EasyKafka->GetConsumer()->CreateConsumer(`<BOOTSTRAP_SERVERS_COMMA_SEPARATED>`, `<USERNAME>`, `<TOKEN/PASSWORD>`, (int)EKafkaLogLevel::ERR);
Create Consumer with configuration:
#include "EasyKafkaSubsystem.h" TSharedPtr<FEasyKafkaModule> EasyKafka = GEngine->GetEngineSubsystem<UEasyKafkaSubsystem>()->GetEasyKafka(); TMap<EKafkaConsumerConfig, FString> KafkaConfiguration = { {EKafkaConsumerConfig::CLIENT_ID,"34235"}, {EKafkaConsumerConfig::SOCKET_TIMEOUT_MS,"10000"} }; EasyKafka->GetConsumer()->CreateConsumer(`<BOOTSTRAP_SERVERS_COMMA_SEPARATED>`, `<USERNAME>`, `<TOKEN/PASSWORD>`, KafkaConfiguration, (int)EKafkaLogLevel::ERR);
Consume messages:
EasyKafka->GetConsumer()->OnNewMessage().AddLambda([](const TArray<FConsumerRecord>& Messages) { for (FConsumerRecord Message : Messages) { UE_LOG(LogTemp, Display, TEXT("New Message %s \n"), *Message.Value);//process messages } }); EasyKafka->GetConsumer()->Subscribe( { "topic", "topic1", "topic2" }); EasyKafka->GetConsumer()->StartConsuming();
ATTENTION: MAKE SURE TO COMMIT FROM THE CONSUMER RUNNABLE THREAD BEFORE PROCESSING RECORDS IF YOU DISABLED AUTOCOMMIT.
PAY ATTENTION TO THE BLOCKING METHODS.
Create Producer with default configuration:
#include "EasyKafkaSubsystem.h" TSharedPtr<FEasyKafkaModule> EasyKafka = GEngine->GetEngineSubsystem<UEasyKafkaSubsystem>()->GetEasyKafka(); EasyKafka->GetProducer()->CreateProducer(`<BOOTSTRAP_SERVERS_COMMA_SEPARATED>`, `<USERNAME>`, `<TOKEN/PASSWORD>`, (int)EKafkaLogLevel::ERR);
Create Producer with configuration:
#include "EasyKafkaSubsystem.h" TSharedPtr<FEasyKafkaModule> EasyKafka = GEngine->GetEngineSubsystem<UEasyKafkaSubsystem>()->GetEasyKafka(); TMap<EKafkaProducerConfig, FString> KafkaConfiguration = { {EKafkaProducerConfig::MESSAGE_TIMEOUT_MS,"5000"}, {EKafkaProducerConfig::REQUEST_TIMEOUT_MS,"5000"} }; EasyKafka->GetProducer()->CreateProducer(`<BOOTSTRAP_SERVERS_COMMA_SEPARATED>`, `<USERNAME>`, `<TOKEN/PASSWORD>`, KafkaConfiguration, (int)EKafkaLogLevel::ERR);
on record produced/failed to produce callback
EasyKafka->GetProducer()->OnProduce().AddLambda([](const FProducerCallback& Callback) { if (Callback.bError) { UE_LOG(LogTemp, Error, TEXT("Error producing recordId: %d \nError Message: %s\n"), Callback.RecordMetadata.RecordId, *Callback.ErrorMessage); } else { UE_LOG(LogTemp, Display, TEXT("RecordId: %d produced.\n"), Callback.RecordMetadata.RecordId); } });
produce record async
EasyKafka->GetProducer()->ProduceRecord('<TOPIC>', '<"RECORD_VALUE>'); /* More control over your record Such as headers,Id... */ FProducerRecord record; record.Key = "key"; record.Topic = "topic"; record.Value = "value"; record.Id = 2312;//Unique id to identify this record OnProduce callback; record.Headers = FRecordHeader( { {"KeyOne","ValueOne"}, {"KeyTwo","ValueTwo"} }); EasyKafka->GetProducer()->ProduceRecord(record);
ALL THE METHODS ARE BLOCKING, ASYNC TO BE ADDED.
Create Admin with default configuration:
#include "EasyKafkaSubsystem.h" TSharedPtr<FEasyKafkaModule> EasyKafka = GEngine->GetEngineSubsystem<UEasyKafkaSubsystem>()->GetEasyKafka(); EasyKafka->GetAdmin()->CreateAdmin(`<BOOTSTRAP_SERVERS_COMMA_SEPARATED>`, `<USERNAME>`, `<TOKEN/PASSWORD>`, (int)EKafkaLogLevel::ERR);
Create Admin with configuration:
#include "EasyKafkaSubsystem.h" TSharedPtr<FEasyKafkaModule> EasyKafka = GEngine->GetEngineSubsystem<UEasyKafkaSubsystem>()->GetEasyKafka(); TMap<EKafkaAdminConfig, FString> KafkaConfiguration = { {EKafkaAdminConfig::SOCKET_TIMEOUT_MS,"10000"} }; EasyKafka->GetAdmin()->CreateAdmin(`<BOOTSTRAP_SERVERS_COMMA_SEPARATED>`, `<USERNAME>`, `<TOKEN/PASSWORD>`, KafkaConfiguration, (int)EKafkaLogLevel::ERR);
Simple Admin request example:
const TArray<FString> TopicsToDelete = { "Topic1Name", "Topic2Name" }; FAdminRequestResult Result = EasyKafka->GetAdmin()->DeleteTopics(TopicsToDelete); if (Result.bError) { UE_LOG(LogTemp, Error, TEXT("Error deleting topics: %s\n"), *Result.ErrorMessage); }
Give us a ⭐️!