Expand Up 
  
 @@ -24,6 +24,7 @@ 
  
 
  
  
 
 #include "src/errors.h"  
 
 
  
  
 
 #include "src/config.h"  
 
 
  
  
 
 #include "src/callbacks.h"  
 
 
 
  
 
 #include "src/workers.h"  
 
 
  
  
 
  
 
 
  
  
 
 namespace NodeKafka {  
 
 
  
  
 
  
 
 
 
 
 
  
 
  
 Expand Down 
 
 
  
 
  
 Expand Up 
  
 @@ -427,8 +428,9 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  Napi::FunctionReference* callback = new Napi::FunctionReference();  
 
 
  
  
 
  *callback = Napi::Persistent(cb);  
 
 
  
  
 
  
 
 
  
 
 
  Napi::AsyncWorker::Queue(new Workers::ConnectionMetadata(  
 
 
  
 
 
 							 callback, obj, topic, timeout_ms, allTopics));  
 
 
 
  
 
  Napi::AsyncWorker *worker = new Workers::ConnectionMetadata(  
 
 
 
  
 
  callback, obj, topic, timeout_ms, allTopics);  
 
 
 
  
 
  worker->Queue();  
 
 
  
  
 
  
 
 
  
  
 
  return env.Null();  
 
 
  
  
 
  }  
 
 
 
 
 
  
 
  
 Expand All 
  
 @@ -446,24 +448,16 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  std::vector<RdKafka::TopicPartition *> toppars =  
 
 
  
  
 
  Conversion::TopicPartition::FromV8Array(info[0].As<Napi::Array>());  
 
 
  
  
 
  
 
 
  
 
 
  int timeout_ms;  
 
 
  
 
 
  Napi::Maybe<uint32_t> maybeTimeout =  
 
 
  
 
 
  info[1].As<Napi::Number>(.As<Napi::Number>().Uint32Value());  
 
 
  
 
 
  
 
 
  
 
 
  if (maybeTimeout.IsNothing()) {  
 
 
  
 
 
  timeout_ms = 1000;  
 
 
  
 
 
  } else {  
 
 
  
 
 
  timeout_ms = static_cast<int>(maybeTimeout);  
 
 
  
 
 
  }  
 
 
 
  
 
  int timeout_ms = info[1].As<Napi::Number>().Int32Value();  
 
 
  
  
 
  
 
 
  
  
 
  Napi::Function cb = info[2].As<Napi::Function>();  
 
 
  
  
 
  Napi::FunctionReference callback = Napi::Persistent(cb);  
 
 
  
  
 
  
 
 
  
  
 
  Connection* handle = this;  
 
 
  
  
 
  
 
 
  
 
 
  Napi::AsyncQueueWorker (  
 
 
  
 
 
  new Workers::Handle::OffsetsForTimes( callback, handle,  
 
 
  
 
 
 								toppars, timeout_ms) ); 
 
 
 
  
 
  Napi::AsyncWorker *worker = new Workers::Handle::OffsetsForTimes (  
 
 
 
  
 
  callback, handle, toppars, timeout_ms);   
 
 
 
  
 
  worker->Queue( ); 
 
 
  
  
 
  
 
 
  
  
 
  return env.Null();  
 
 
  
  
 
  }  
 
 
 
 
 
  
 
  
 Expand All 
  
 @@ -472,12 +466,12 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  Napi::Env env = info.Env();  
 
 
  
  
 
  Napi::HandleScope scope(env);  
 
 
  
  
 
  
 
 
  
 
 
  Connection* obj = ObjectWrap::Unwrap<Connection>(info.This()) ;  
 
 
 
  
 
  Connection* obj = this ;  
 
 
  
  
 
  
 
 
  
  
 
  if (!info[0].IsString()) {  
 
 
  
  
 
  Napi::Error::New(env, "1st parameter must be a topic string").ThrowAsJavaScriptException();  
 
 
  
  
 
  ;  
 
 
  
 
 
  return;  
 
 
 
  
 
  return env.Null() ;  
 
 
  
  
 
  }  
 
 
  
  
 
  
 
 
  
  
 
  if (!info[1].IsNumber()) {  
 
 
 
 
 
  
 
  
 Expand All 
  
 @@ -496,9 +490,9 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  }  
 
 
  
  
 
  
 
 
  
  
 
  // Get string pointer for the topic name  
 
 
  
 
 
  std::string topicUTF8 = info[0].As<Napi::String>(.To<Napi::String>() );  
 
 
 
  
 
  std::string topicUTF8 = info[0].ToString().Utf8Value( );  
 
 
  
  
 
  // The first parameter is the topic  
 
 
  
 
 
  std::string topic_name(* topicUTF8);  
 
 
 
  
 
  std::string topic_name(topicUTF8);  
 
 
  
  
 
  
 
 
  
  
 
  // Second parameter is the partition  
 
 
  
  
 
  int32_t partition = info[1].As<Napi::Number>().Int32Value();  
 
 
 
 
 
  
 
  
 Expand All 
  
 @@ -508,10 +502,12 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  
 
 
  
  
 
  // Fourth parameter is the callback  
 
 
  
  
 
  Napi::Function cb = info[3].As<Napi::Function>();  
 
 
  
 
 
  Napi::FunctionReference *callback = new Napi::FunctionReference(cb);  
 
 
 
  
 
  Napi::FunctionReference *callback = new Napi::FunctionReference();  
 
 
 
  
 
  callback->Reset(cb);  
 
 
  
  
 
  
 
 
  
 
 
  Napi::AsyncQueueWorker(new Workers::ConnectionQueryWatermarkOffsets(  
 
 
  
 
 
 									callback, obj, topic_name, partition, timeout_ms));  
 
 
 
  
 
  Napi::AsyncWorker *worker = new Workers::ConnectionQueryWatermarkOffsets(  
 
 
 
  
 
  callback, obj, topic_name, partition, timeout_ms);  
 
 
 
  
 
  worker->Queue();  
 
 
  
  
 
  
 
 
  
  
 
  return env.Null();  
 
 
  
  
 
  }  
 
 
 
 
 
  
 
  
 Expand All 
  
 @@ -529,21 +525,19 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  }  
 
 
  
  
 
  
 
 
  
  
 
  // Get string pointer for the username  
 
 
  
 
 
  std::string usernameUTF8 = info[0].As<Napi::String>(.To<Napi::String>() );  
 
 
 
  
 
  std::string usernameUTF8 = info[0].As<Napi::String>().Utf8Value( );  
 
 
  
  
 
  // The first parameter is the username  
 
 
  
 
 
  std::string username(* usernameUTF8);  
 
 
 
  
 
  std::string username(usernameUTF8);  
 
 
  
  
 
  
 
 
  
  
 
  // Get string pointer for the password  
 
 
  
 
 
  std::string passwordUTF8 = info[1].As<Napi::String>(.To<Napi::String>() );  
 
 
 
  
 
  std::string passwordUTF8 = info[1].As<Napi::String>().Utf8Value( );  
 
 
  
  
 
  // The first parameter is the password  
 
 
  
 
 
  std::string password(* passwordUTF8);  
 
 
 
  
 
  std::string password(passwordUTF8);  
 
 
  
  
 
  
 
 
  
 
 
  Connection* obj = ObjectWrap::Unwrap<Connection>(info.This());  
 
 
  
 
 
  Baton b = obj->SetSaslCredentials(username, password);  
 
 
 
  
 
  Baton b = this->SetSaslCredentials(username, password);  
 
 
  
  
 
  
 
 
  
  
 
  if (b.err() != RdKafka::ERR_NO_ERROR) {  
 
 
  
 
 
  Napi::Value errorObject = b.ToObject();  
 
 
  
 
 
  Napi::Error::New(env, errorObject).ThrowAsJavaScriptException();  
 
 
 
  
 
  b.ToError(env).ThrowAsJavaScriptException();  
 
 
  
  
 
  return env.Null();  
 
 
  
  
 
  }  
 
 
  
  
 
  
 
 
 
 
 
  
 
  
 Expand All 
  
 @@ -563,16 +557,16 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  Napi::Error::New(env, "Need to specify a callbacks object").ThrowAsJavaScriptException();  
 
 
  
  
 
  return env.Null();  
 
 
  
  
 
  }  
 
 
  
 
 
  v8::Local<v8::Context> context = Napi::GetCurrentContext();  
 
 
  
 
 
  Connection* obj = ObjectWrap::Unwrap<Connection>(info.This());  
 
 
 
  
 
  
 
 
 
  
 
  Connection* obj = this;  
 
 
 
  
 
  
 
 
 
  
 
  const bool add = info[0].As<Napi::Boolean>().Value();  
 
 
  
  
 
  
 
 
  
 
 
  const bool add = info[0].As<Napi::Boolean>().Value().ToChecked();  
 
 
  
 
 
  Napi::Object configs_object =  
 
 
  
 
 
  info[1].ToObject(context);  
 
 
 
  
 
  Napi::Object configs_object = info[1].ToObject();  
 
 
  
  
 
  Napi::Array configs_property_names =  
 
 
  
 
 
  configs_object->GetOwnPropertyNames(context );  
 
 
 
  
 
  configs_object.GetPropertyNames( );  
 
 
  
  
 
  
 
 
  
 
 
  for (unsigned int j = 0; j < configs_property_names-> Length(); ++j) {  
 
 
 
  
 
  for (unsigned int j = 0; j < configs_property_names. Length(); ++j) {  
 
 
  
  
 
  std::string configs_string_key;  
 
 
  
  
 
  
 
 
  
  
 
  Napi::Value configs_key =  
 
 
 
 
 
  
 
  
 Expand All 
  
 @@ -583,7 +577,7 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  int config_type = 0;  
 
 
  
  
 
  if (configs_value.IsObject() && configs_key.IsString()) {  
 
 
  
  
 
 	std::string configs_utf8_key = configs_key.As<Napi::String>();  
 
 
  
 
 
 	configs_string_key = std::string(* configs_utf8_key);  
 
 
 
  
 
 	configs_string_key = std::string(configs_utf8_key);  
 
 
  
  
 
 	if (configs_string_key.compare("global") == 0) {  
 
 
  
  
 
  config_type = 1;  
 
 
  
  
 
 	} else if (configs_string_key.compare("topic") == 0) {  
 
 
 
 
 
  
 
  
 Expand All 
  
 @@ -598,11 +592,11 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  }  
 
 
  
  
 
  
 
 
  
  
 
  Napi::Object object =  
 
 
  
 
 
 	configs_value-> ToObject(context );  
 
 
 
  
 
 	configs_value. ToObject();  
 
 
  
  
 
  Napi::Array property_names =  
 
 
  
 
 
 	object->GetOwnPropertyNames(context );  
 
 
 
  
 
 	object.GetPropertyNames( );  
 
 
  
  
 
  
 
 
  
 
 
  for (unsigned int i = 0; i < property_names-> Length(); ++i) {  
 
 
 
  
 
  for (unsigned int i = 0; i < property_names. Length(); ++i) {  
 
 
  
  
 
 	std::string errstr;  
 
 
  
  
 
 	std::string string_key;  
 
 
  
  
 
  
 
 
 
 
 
  
 
  
 Expand All 
  
 @@ -611,12 +605,12 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  
 
 
  
  
 
 	if (key.IsString()) {  
 
 
  
  
 
 	 std::string utf8_key = key.As<Napi::String>();  
 
 
  
 
 
 	 string_key = std::string(* utf8_key);  
 
 
 
  
 
 	 string_key = std::string(utf8_key);  
 
 
  
  
 
 	} else {  
 
 
  
  
 
 	 continue;  
 
 
  
  
 
 	}  
 
 
  
  
 
  
 
 
  
 
 
 	if (value-> IsFunction()) {  
 
 
 
  
 
 	if (value. IsFunction()) {  
 
 
  
  
 
 	 Napi::Function cb = value.As<Napi::Function>();  
 
 
  
  
 
 	 switch (config_type) {  
 
 
  
  
 
  case 1:  
 
 
 
 
 
  
 
  
 Expand All 
  
 @@ -641,7 +635,7 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  }  
 
 
  
  
 
  }  
 
 
  
  
 
  
 
 
  
 
 
  return env.True( );  
 
 
 
  
 
  return Napi::Boolean::From(env, true );  
 
 
  
  
 
  }  
 
 
  
  
 
  
 
 
  
  
 
  Napi::Value NodeSetOAuthBearerToken(const Napi::CallbackInfo &info) {  
 
 
 
 
 
  
 
  
 Expand All 
  
 @@ -661,60 +655,57 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  return env.Null();  
 
 
  
  
 
  }  
 
 
  
  
 
  
 
 
  
 
 
  if (!info[3].IsNullOrUndefined () && !info[3].IsArray()) {  
 
 
 
  
 
  if (!info[3].IsNull() && !info[3].IsUndefined () && !info[3].IsArray()) {  
 
 
  
  
 
  Napi::Error::New(env, "4th parameter must be an extensions array or null").ThrowAsJavaScriptException();  
 
 
  
  
 
  return env.Null();  
 
 
  
  
 
  }  
 
 
  
  
 
  
 
 
  
  
 
  // Get string pointer for the token  
 
 
  
 
 
  std::string tokenUtf8 = info[0].As<Napi::String>(.To<Napi::String>() );  
 
 
  
 
 
  std::string token(* tokenUtf8);  
 
 
 
  
 
  std::string tokenUtf8 = info[0].As<Napi::String>().Utf8Value( );  
 
 
 
  
 
  std::string token(tokenUtf8);  
 
 
  
  
 
  
 
 
  
  
 
  // Get the lifetime_ms  
 
 
  
  
 
  int64_t lifetime_ms = info[1].As<Napi::Number>().Int64Value();  
 
 
  
  
 
  
 
 
  
  
 
  // Get string pointer for the principal_name  
 
 
  
  
 
  std::string principal_nameUtf8 =   
 
 
  
 
 
  info[2].As<Napi::String>(.To<Napi::String>() );  
 
 
  
 
 
  std::string principal_name(* principal_nameUtf8);  
 
 
 
  
 
  info[2].As<Napi::String>().Utf8Value( );  
 
 
 
  
 
  std::string principal_name(principal_nameUtf8);  
 
 
  
  
 
  
 
 
  
  
 
  // Get the extensions (if any)  
 
 
  
  
 
  std::list<std::string> extensions;  
 
 
  
 
 
  if (!info[3].IsNullOrUndefined ()) {  
 
 
 
  
 
  if (!info[3].IsNull() && !info[3].IsUndefined ()) {  
 
 
  
  
 
  Napi::Array extensionsArray = info[3].As<Napi::Array>();  
 
 
  
  
 
  extensions = v8ArrayToStringList(extensionsArray);  
 
 
  
  
 
  }  
 
 
  
  
 
  
 
 
  
 
 
  Connection* obj = ObjectWrap::Unwrap<Connection>(info.This()) ;  
 
 
 
  
 
  Connection* obj = this ;  
 
 
  
  
 
  Baton b =  
 
 
  
  
 
  obj->SetOAuthBearerToken(token, lifetime_ms, principal_name, extensions);  
 
 
  
  
 
  
 
 
  
  
 
  if (b.err() != RdKafka::ERR_NO_ERROR) {  
 
 
  
 
 
  Napi::Value errorObject = b.ToObject();  
 
 
  
 
 
  Napi::Error::New(env, errorObject).ThrowAsJavaScriptException();  
 
 
 
  
 
  b.ToError(env).ThrowAsJavaScriptException();  
 
 
  
  
 
  return env.Null();  
 
 
  
  
 
  }  
 
 
  
  
 
  
 
 
  
  
 
  return env.Null();  
 
 
  
  
 
  }  
 
 
  
  
 
  
 
 
  
 
 
  static  Napi::Value NodeSetOAuthBearerTokenFailure(const Napi::CallbackInfo &info) {  
 
 
 
  
 
  Napi::Value NodeSetOAuthBearerTokenFailure(const Napi::CallbackInfo &info) {  
 
 
  
  
 
  Napi::Env env = info.Env();  
 
 
  
  
 
  if (!info[0].IsString()) {  
 
 
  
  
 
  Napi::Error::New(env, "1st parameter must be an error string").ThrowAsJavaScriptException();  
 
 
  
  
 
  return env.Null();  
 
 
  
  
 
  }  
 
 
  
  
 
  
 
 
  
  
 
  // Get string pointer for the error string  
 
 
  
 
 
  std::string errstrUtf8 = info[0].As<Napi::String>(.To<Napi::String>() );  
 
 
  
 
 
  std::string errstr(* errstrUtf8);  
 
 
 
  
 
  std::string errstrUtf8 = info[0].As<Napi::String>().Utf8Value( );  
 
 
 
  
 
  std::string errstr(errstrUtf8);  
 
 
  
  
 
  
 
 
  
 
 
  Connection* obj = ObjectWrap::Unwrap<Connection>(info.This());  
 
 
  
 
 
  Baton b = obj->SetOAuthBearerTokenFailure(errstr);  
 
 
 
  
 
  Baton b = this->SetOAuthBearerTokenFailure(errstr);  
 
 
  
  
 
  
 
 
  
  
 
  if (b.err() != RdKafka::ERR_NO_ERROR) {  
 
 
  
 
 
  Napi::Value errorObject = b.ToObject();  
 
 
  
 
 
  Napi::Error::New(env, errorObject).ThrowAsJavaScriptException();  
 
 
 
  
 
  b.ToError(env).ThrowAsJavaScriptException();  
 
 
  
  
 
  return env.Null();  
 
 
  
  
 
  }  
 
 
  
  
 
  
 
 
 
 
 
  
 
  
 Expand All 
  
 @@ -723,11 +714,9 @@ template <class T> class Connection : public Napi::ObjectWrap<T> { 
  
 
  
  
 
  
 
 
  
  
 
  Napi::Value NodeName(const Napi::CallbackInfo &info) {  
 
 
  
  
 
  Napi::Env env = info.Env();  
 
 
  
 
 
  Connection* obj = ObjectWrap::Unwrap<Connection>(info.This());  
 
 
  
 
 
  std::string name = obj->Name();  
 
 
  
 
 
  return Napi::New(env, name);  
 
 
  
 
 
  }  
 
 
  
 
 
  
 
 
 
  
 
  
 
 
 
  
 
  return Napi::String::From(env, this->Name());  
 
 
 
  
 
  }   
 
 
  
  
 
 };  
 
 
  
  
 
  
 
 
  
  
 
 } // namespace NodeKafka  
 
 
 
 
 
  
 
  
 Expand Down