Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit d5e78ee

Browse files
WIP: Multiquery, Timestamp Support, Test Refactor (#6)
* Multiquery, Timestamp Support, Much improved Integration Tests
1 parent 129e2ed commit d5e78ee

File tree

13 files changed

+824
-438
lines changed

13 files changed

+824
-438
lines changed

‎.dockerignore‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
target/

‎CHANGELOG.md‎

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Changelog
2+
3+
All notable changes to this project will be documented in this file.
4+
5+
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6+
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7+
8+
## [Unreleased]
9+
10+
## [0.0.3] - 2019年07月14日
11+
12+
### Added
13+
14+
- Possibility to run multiple queries in one. See the Integration Tests in `tests/integration_tests.rs` for examples.
15+
- Ability to specify Timestamp for write queries
16+
17+
### Changed
18+
19+
- You now have to borrow a query when passing it to the `query` method
20+
21+
## [0.0.2] - 2019年07月23日
22+
23+
### Changed
24+
25+
- URLEncode Query before sending it to InfluxDB, which caused some empty returns (#5)
26+
- Improved Test Coverage: There's now even more tests verifying correctness of the crate (#5)
27+
- It's no longer necessary to supply a wildcard generic when working with serde*integration: `client.json_query::<Weather>(query)` instead of `client.json_query::<Weather, *>(query)`
28+
29+
[unreleased]: https://github.com/Empty2k12/influxdb-rust/compare/v0.0.3...HEAD
30+
[0.0.3]: https://github.com/Empty2k12/influxdb-rust/compare/v0.0.2...v0.0.3
31+
[0.0.2]: https://github.com/Empty2k12/influxdb-rust/releases/tag/v0.0.2

‎Cargo.lock‎

Lines changed: 284 additions & 230 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "influxdb"
3-
version = "0.0.2"
3+
version = "0.0.3"
44
authors = ["Gero Gerke <11deutron11@gmail.com>"]
55
edition = "2018"
66
description = "InfluxDB Driver for Rust"

‎Dockerfile‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
FROM xd009642/tarpaulin
2+
3+
RUN wget https://dl.influxdata.com/influxdb/releases/influxdb_1.7.6_amd64.deb
4+
RUN dpkg -i influxdb_1.7.6_amd64.deb
5+
RUN INFLUXDB_HTTP_BIND_ADDRESS=9999 influxd > $HOME/influx.log 2>&1 &
6+
7+
WORKDIR /volume
8+
9+
CMD cargo build && cargo tarpaulin

‎README.md‎

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,27 @@ Pull requests are always welcome.
3737

3838
- Reading and Writing to InfluxDB
3939
- Optional Serde Support for Deserialization
40+
- Running multiple queries in one request (e.g. `SELECT * FROM weather_berlin; SELECT * FROM weather_london`)
4041

4142
## Planned Features
4243

43-
- Running multiple queries in one request (e.g. `SELECT * FROM weather_berlin; SELECT * FROM weather_london`)
4444
- Read Query Builder instead of supplying raw queries
4545
- Authentication against InfluxDB
46-
- Methods for setting time and time precision in a query
46+
- `#[derive(InfluxDbWritable)]`
4747

4848
## Quickstart
4949

5050
Add the following to your `Cargo.toml`
5151

5252
```toml
53-
influxdb = "0.0.2"
53+
influxdb = "0.0.3"
5454
```
5555

5656
For an example with using Serde deserialization, please refer to [serde_integration](crate::integrations::serde_integration)
5757

5858
```rust
59-
use influxdb::query::InfluxDbQuery;
59+
use influxdb::query::{InfluxDbQuery, Timestamp};
6060
use influxdb::client::InfluxDbClient;
61-
use serde::Deserialize;
6261
use tokio::runtime::current_thread::Runtime;
6362

6463
// Create a InfluxDbClient with URL `http://localhost:8086`
@@ -68,7 +67,7 @@ let client = InfluxDbClient::new("http://localhost:8086", "test");
6867
// Let's write something to InfluxDB. First we're creating a
6968
// InfluxDbWriteQuery to write some data.
7069
// This creates a query which writes a new measurement into a series called `weather`
71-
let write_query = InfluxDbQuery::write_query("weather")
70+
let write_query = InfluxDbQuery::write_query(Timestamp::NOW, "weather")
7271
.add_field("temperature", 82);
7372

7473
// Since this library is async by default, we're going to need a Runtime,
@@ -78,14 +77,14 @@ let mut rt = Runtime::new().expect("Unable to create a runtime");
7877

7978
// To actually submit the data to InfluxDB, the `block_on` method can be used to
8079
// halt execution of our program until it has been completed.
81-
let write_result = rt.block_on(client.query(write_query));
80+
let write_result = rt.block_on(client.query(&write_query));
8281
assert!(write_result.is_ok(), "Write result was not okay");
8382

8483
// Reading data is as simple as writing. First we need to create a query
8584
let read_query = InfluxDbQuery::raw_read_query("SELECT _ FROM weather");
8685

8786
// Again, we're blocking until the request is done
88-
let read_result = rt.block_on(client.query(read_query));
87+
let read_result = rt.block_on(client.query(&read_query));
8988

9089
assert!(read_result.is_ok(), "Read result was not ok");
9190

@@ -101,4 +100,4 @@ in the repository.
101100

102101
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
103102

104-
@ 2019 Gero Gerke, All rights reserved.
103+
@ 2019 Gero Gerke, All rights reserved.

‎src/client/mod.rs‎

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@ use reqwest::r#async::{Client, Decoder};
2121
use std::mem;
2222

2323
use crate::error::InfluxDbError;
24-
use crate::query::{InfluxDbQuery, QueryType};
24+
use crate::query::read_query::InfluxDbReadQuery;
25+
use crate::query::write_query::InfluxDbWriteQuery;
26+
use crate::query::InfluxDbQuery;
2527

2628
use url::form_urlencoded;
2729

30+
use std::any::Any;
31+
2832
/// Internal Representation of a Client
2933
pub struct InfluxDbClient {
3034
url: String,
@@ -108,21 +112,20 @@ impl InfluxDbClient {
108112
///
109113
/// ```rust
110114
/// use influxdb::client::InfluxDbClient;
111-
/// use influxdb::query::InfluxDbQuery;
115+
/// use influxdb::query::{InfluxDbQuery, Timestamp};
112116
///
113117
/// let client = InfluxDbClient::new("http://localhost:8086", "test");
114118
/// let _future = client.query(
115-
/// InfluxDbQuery::write_query("weather")
119+
/// &InfluxDbQuery::write_query(Timestamp::NOW, "weather")
116120
/// .add_field("temperature", 82)
117121
/// );
118122
/// ```
119-
pub fn query<Q>(&self, q: Q) -> Box<dyn Future<Item = String, Error = InfluxDbError>>
123+
pub fn query<Q>(&self, q: &Q) -> Box<dyn Future<Item = String, Error = InfluxDbError>>
120124
where
121-
Q: InfluxDbQuery,
125+
Q: Any + InfluxDbQuery,
122126
{
123127
use futures::future;
124128

125-
let q_type = q.get_type();
126129
let query = match q.build() {
127130
Err(err) => {
128131
let error = InfluxDbError::InvalidQueryError {
@@ -133,35 +136,38 @@ impl InfluxDbClient {
133136
Ok(query) => query,
134137
};
135138

136-
let client = match q_type {
137-
QueryType::ReadQuery => {
138-
let read_query = query.get();
139-
let encoded: String = form_urlencoded::Serializer::new(String::new())
140-
.append_pair("db", self.database_name())
141-
.append_pair("q", &read_query)
142-
.finish();
143-
let http_query_string = format!(
144-
"{url}/query?{encoded}",
145-
url = self.database_url(),
146-
encoded = encoded
147-
);
148-
149-
if read_query.contains("SELECT") || read_query.contains("SHOW") {
150-
Client::new().get(http_query_string.as_str())
151-
} else {
152-
Client::new().post(http_query_string.as_str())
153-
}
139+
let any_value = q as &dyn Any;
140+
141+
let client = if let Some(_) = any_value.downcast_ref::<InfluxDbReadQuery>() {
142+
let read_query = query.get();
143+
let encoded: String = form_urlencoded::Serializer::new(String::new())
144+
.append_pair("db", self.database_name())
145+
.append_pair("q", &read_query)
146+
.finish();
147+
let http_query_string = format!(
148+
"{url}/query?{encoded}",
149+
url = self.database_url(),
150+
encoded = encoded
151+
);
152+
if read_query.contains("SELECT") || read_query.contains("SHOW") {
153+
Client::new().get(http_query_string.as_str())
154+
} else {
155+
Client::new().post(http_query_string.as_str())
154156
}
155-
QueryType::WriteQuery => Client::new()
157+
} else if let Some(write_query) = any_value.downcast_ref::<InfluxDbWriteQuery>() {
158+
Client::new()
156159
.post(
157160
format!(
158-
"{url}/write?db={db}",
161+
"{url}/write?db={db}{precision_str}",
159162
url = self.database_url(),
160163
db = self.database_name(),
164+
precision_str = write_query.get_precision_modifier()
161165
)
162166
.as_str(),
163167
)
164-
.body(query.get()),
168+
.body(query.get())
169+
} else {
170+
unreachable!()
165171
};
166172

167173
Box::new(

‎src/integrations/serde_integration.rs‎

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
//! `name`, InfluxDB provides alongside query results.
88
//!
99
//! ```rust,no_run
10-
//! use influxdb::query::InfluxDbQuery;
10+
//! use futures::prelude::*;
1111
//! use influxdb::client::InfluxDbClient;
12+
//! use influxdb::query::InfluxDbQuery;
1213
//! use serde::Deserialize;
1314
//!
1415
//! #[derive(Deserialize)]
1516
//! struct WeatherWithoutCityName {
16-
//! temperature: i32
17+
//! temperature: i32,
1718
//! }
1819
//!
1920
//! #[derive(Deserialize)]
@@ -24,16 +25,26 @@
2425
//!
2526
//! let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
2627
//! let client = InfluxDbClient::new("http://localhost:8086", "test");
27-
//! let query = InfluxDbQuery::raw_read_query("SELECT temperature FROM /weather_[a-z]*$/ WHERE time > now() - 1m ORDER BY DESC");
28-
//! let _result = rt.block_on(client.json_query::<WeatherWithoutCityName>(query))
28+
//! let query = InfluxDbQuery::raw_read_query(
29+
//! "SELECT temperature FROM /weather_[a-z]*$/ WHERE time > now() - 1m ORDER BY DESC",
30+
//! );
31+
//! let _result = rt
32+
//! .block_on(client.json_query(query))
33+
//! .map(|mut db_result| db_result.deserialize_next::<WeatherWithoutCityName>())
2934
//! .map(|it| {
3035
//! it.map(|series_vec| {
3136
//! series_vec
37+
//! .series
3238
//! .into_iter()
3339
//! .map(|mut city_series| {
34-
//! let city_name = city_series.name.split("_").collect::<Vec<&str>>().remove(2);
35-
//! Weather { weather: city_series.values.remove(0), city_name: city_name.to_string() }
36-
//! }).collect::<Vec<Weather>>()
40+
//! let city_name =
41+
//! city_series.name.split("_").collect::<Vec<&str>>().remove(2);
42+
//! Weather {
43+
//! weather: city_series.values.remove(0),
44+
//! city_name: city_name.to_string(),
45+
//! }
46+
//! })
47+
//! .collect::<Vec<Weather>>()
3748
//! })
3849
//! });
3950
//! ```
@@ -56,6 +67,8 @@ use crate::query::InfluxDbQuery;
5667

5768
use url::form_urlencoded;
5869

70+
use futures::future::Either;
71+
5972
#[derive(Deserialize)]
6073
#[doc(hidden)]
6174
struct _DatabaseError {
@@ -64,14 +77,30 @@ struct _DatabaseError {
6477

6578
#[derive(Deserialize, Debug)]
6679
#[doc(hidden)]
67-
pub struct DatabaseQueryResult<T> {
68-
pub results: Vec<InfluxDbReturn<T>>,
80+
pub struct DatabaseQueryResult {
81+
pub results: Vec<serde_json::Value>,
82+
}
83+
84+
impl DatabaseQueryResult {
85+
pub fn deserialize_next<T: 'static>(
86+
&mut self,
87+
) -> impl Future<Item = InfluxDbReturn<T>, Error = InfluxDbError>
88+
where
89+
T: DeserializeOwned,
90+
{
91+
match serde_json::from_value::<InfluxDbReturn<T>>(self.results.remove(0)) {
92+
Ok(item) => futures::future::result(Ok(item)),
93+
Err(err) => futures::future::err(InfluxDbError::DeserializationError {
94+
error: format!("could not deserialize: {}", err),
95+
}),
96+
}
97+
}
6998
}
7099

71100
#[derive(Deserialize, Debug)]
72101
#[doc(hidden)]
73102
pub struct InfluxDbReturn<T> {
74-
pub series: Option<Vec<InfluxDbSeries<T>>>,
103+
pub series: Vec<InfluxDbSeries<T>>,
75104
}
76105

77106
#[derive(Deserialize, Debug)]
@@ -82,13 +111,10 @@ pub struct InfluxDbSeries<T> {
82111
}
83112

84113
impl InfluxDbClient {
85-
pub fn json_query<T:'static>(
114+
pub fn json_query(
86115
&self,
87116
q: InfluxDbReadQuery,
88-
) -> Box<dyn Future<Item = Option<Vec<InfluxDbSeries<T>>>, Error = InfluxDbError>>
89-
where
90-
T: DeserializeOwned,
91-
{
117+
) -> impl Future<Item = DatabaseQueryResult, Error = InfluxDbError> {
92118
use futures::future;
93119

94120
let query = q.build().unwrap();
@@ -113,13 +139,11 @@ impl InfluxDbClient {
113139
"Only SELECT and SHOW queries supported with JSON deserialization",
114140
),
115141
};
116-
return Box::new(
117-
future::err::<Option<Vec<InfluxDbSeries<T>>>, InfluxDbError>(error),
118-
);
142+
return Either::B(future::err::<DatabaseQueryResult, InfluxDbError>(error));
119143
}
120144
};
121145

122-
Box::new(
146+
Either::A(
123147
client
124148
.send()
125149
.and_then(|mut res| {
@@ -137,9 +161,9 @@ impl InfluxDbClient {
137161
});
138162
} else {
139163
// Json has another structure, let's try actually parsing it to the type we're deserializing
140-
let from_slice = serde_json::from_slice::<DatabaseQueryResult<T>>(&body);
164+
let from_slice = serde_json::from_slice::<DatabaseQueryResult>(&body);
141165

142-
let mutdeserialized = match from_slice {
166+
let deserialized = match from_slice {
143167
Ok(deserialized) => deserialized,
144168
Err(err) => {
145169
return futures::future::err(InfluxDbError::DeserializationError {
@@ -148,7 +172,7 @@ impl InfluxDbClient {
148172
}
149173
};
150174

151-
return futures::future::result(Ok(deserialized.results.remove(0).series));
175+
return futures::future::result(Ok(deserialized));
152176
}
153177
}),
154178
)

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /