SandStorm是一个Linux下,简易的分布式实时处理系统。
整个分布式系统中,节点分为两种,一种是 President节点(Master节点),一种是Manager节点(Slave节点)。Manager节点负责执行 Executor任务。如下图所示
Executor又主要分为两种类型:
- SpoutExecutor 消息源执行器
- BoltExecutor 消息处理单元执行器
这个计算模型是参考自Storm分布式处理系统的:
Spout源源不断的产生消息,然后Spout会将数据传输给负责处理数据的BoltExecutor的Manager节点负责处理。
整个系统可以实时不断的运作,有别于MapReduce的批作业处理系统。
SandStorm底层网络是基于Epoll异步事件通知模型实现的,可以应对大量客户端请求并发访问的情况。
程序使用CMake(version >=3.9)进行构建,所以只需进入项目目录,执行
cmake
执行cmake命令就可以编译出makefile文件,然后执行
make
用make命令进行编译。
系统通过加载用户编写的任务程序来进行任务处理。
用户编写的整个任务模型称为Topology,一个用户编写的Topology中,需要实现Bolt和Spout两种组件。
以下以wordcount程序为例,看系统的简单使用。
编写HelloWorldSpout.cpp用以产生数据源,源源不断产生数据。
void HelloWorldSpout::Prepare(std::shared_ptr<sandstorm::collector::OutputCollector> outputCollector) { _outputCollector = outputCollector; _words = SplitString("Hello world there are some words we generate new sentence randomly", ' '); } void HelloWorldSpout::Cleanup() { } std::vector<std::string> HelloWorldSpout::DeclareFields() { return {"sentence"}; } void HelloWorldSpout::NextTuple() { static int32_t id = 0; int64_t currentMicroseconds = 0; ++id; std::vector<std::string> words(5); for (int32_t i = 0; i < 5; i++) { words[i] = _words[rand() % _words.size()]; } std::string sentence = JoinString(words); _outputCollector->Emit({ sentence, currentMicroseconds, id }); }
编写SplitSentenceBolt消息处理器,用来分解句子成单词。
void SplitSentenceBolt::Prepare(std::shared_ptr<sandstorm::collector::OutputCollector> outputCollector) { _outputCollector = outputCollector; } void SplitSentenceBolt::Cleanup() { } std::vector<std::string> SplitSentenceBolt::DeclareFields() { return {"word"}; } void SplitSentenceBolt::Execute(const sandstorm::base::Tuple &tuple) { std::string sentence = tuple[0].GetStringValue(); int64_t sourceMicroseconds = tuple[1].GetInt64Value(); int32_t id = tuple[2].GetInt32Value(); std::vector<std::string> words = SplitString(sentence, ' '); for (const std::string &word : words) { _outputCollector->Emit({word, sourceMicroseconds, id}); } }
编写WorldCountBolt消息处理器,用来统计单词。
void WordCountBolt::Prepare(std::shared_ptr<sandstorm::collector::OutputCollector> outputCollector) { _outputCollector = outputCollector; } void WordCountBolt::Cleanup() { } std::vector<std::string> WordCountBolt::DeclareFields() { return {"word", "count"}; } void WordCountBolt::Execute(const sandstorm::base::Tuple &tuple) { std::string word = tuple[0].GetStringValue(); int64_t sourceMicroseconds = tuple[1].GetInt64Value(); int32_t id = tuple[2].GetInt32Value(); auto wordCountIterator = _wordCounts.find(word); if (wordCountIterator == _wordCounts.end()) { _wordCounts.insert({word, 0}); wordCountIterator = _wordCounts.find(word); } wordCountIterator->second++; std::cout << word << ' ' << wordCountIterator->second << std::endl; _outputCollector->Emit({word, wordCountIterator->second}); int64_t currentMicroseconds = 0; }
最后我们要编写出Topology来封装整个任务模型
WorldCountTopology.cpp
sandstorm::topology::Topology *GetTopology() { sandstorm::topology::Topology *topology = new sandstorm::topology::Topology("word-count-topology"); topology->SetSpout("hello-world-spout", new HelloWorldSpout) .ParallismHint(1); topology->SetBolt("split-sentence-bolt", new SplitSentenceBolt) .Random("hello-world-spout") .ParallismHint(3); topology->SetBolt("word-count-bolt", new WordCountBolt) .Random("split-sentence-bolt") .ParallismHint(2); return topology; }
以上代码都在代码的Sample目录下,可以进行查看。