-
Notifications
You must be signed in to change notification settings - Fork 0
Comments
Draft: Add auto-generated plugins to import data to Splitgraph from all supported data sources (Airbyte, Singer Taps, etc), and add export plugin to export from Splitgraph to Seafowl via Splitgraph GraphQL API#20
Conversation
...n and start adapting it
...lugin` Keep interface and behavior exactly the same; tests and typechecks now pass, and CSV-specific behavior is contained only in `SplitgraphImportCSVPlugin`.
...this.DerivedClass` In the `withOptions` builder method, which is defined in the abstract base class but which returns an instance of the derived class (with an obviously unknown name), return an instance of the derived class by referencing its constructor with `Object.getPrototypeOf(this).constructor`, instead of forcing the derived class to store a reference to itself in a property variable. This is arguably a less hacky method, but still depends on the assumption that the derived class does not override the constructor (but if it does, it can always override the builder method as well).
Some functions were missing any annotation, and they are used only in derived classes in order for methods in the base class to call them, so there is no need for them to be public (which is the default when there is no annotation).
Instead of just `Csv`, use e.g. `CsvTableParamsSchema`, which makes them globally unique and also unique within each plugin (which has a generated interface for `TableParamsSchema`, `ParamsSchema`, and `CredentialsSchema`).
I was hoping this would also upgrade Babel to latest so that a bug where it throws a syntax error for TypeScript 5.0 feature of `const` type parameters would allow using them in files with GQL queries, but alas it does not fix it. Still, upgrade those packages.
This allows returning an interface from a factory function for creating classes (so that autogenerated code can simply call that factory function instead of implementing a class itself), where the static property of PluginName is known and inferrable ahead of time.
...r auto-generated plugins This doesn't actually work for auto-generated plugins yet, but it's almost there. Basically the auto-generated code just needs to call this function to create a class that can be instantiated just like SplitgraphImportCsvPlugin.
...td plugin This is currently manually created, but the idea is that a file like this will be auto-generated for each plugin. WIP
...thub If environment variable `VITE_TEST_GITHUB_PAT_SECRET` is defined, then run an integration test which ingests data from the Seafowl GitHub repository into a Splitgraph repo `madatdata-test-github-ingestion` under the namespace of the username associated with the integration test `API_KEY` and `API_SECRET`. Technically, it's hardcoded to be disabled right now so hot reload testing doesn't spam ingestion, but it works, e.g. see this repository which was ingested with the new code in `db-splitgraph.test.ts`: https://www.splitgraph.com/miles/madatdata-test-github-ingestion/latest/-/tables
...r its plugin directory Actually auto-generate the class for the plugin (as a script that calls a factory function to create the class) in the `plugin.ts` file within each plugin directory.
Same as recent change that added it to `ImportPlugin<PluginName>`
...rtPlugin` in `base-import-plugin``
...gin` base class Adopt the same pattern used for import plugin inheritance, so that we can create a plugin for exporting to Seafowl (as opposed to exporting to a parquet/csv file, like the current plugin), while re-using most of the code, particularly regarding waiting for jobs.
1ec77f3 to
fcdc15b
Compare
...o Seafowl We already supported "manually" exporting from Splitgraph to Seafowl by first exporting a `.parquet` file via the existing `ExportQueryPlugin`, and then importing it into Seafowl with `SeafowlImportFilePlugin`. However, the Splitgraph API supports the more robust use case of exporting one or multiple queries, tables and VDBs from Splitgraph to Seafowl, via the GraphQL mutation `exportToSeafowl` which is available with an authenticated request to the GraphQL API. This commit adds a plugin to call that API with one or multiple queries, tables or VDBs (with the types being fairly raw and maybe a bit cumbersome, albeit still equipped with full autocomplete and sufficient TypeDoc annotations), and to wait for the tasks to resolve before returning an object containing the result, including a list of passed jobs, and if applicable, a list of any failed jobs (since each export target creates a separate job, we need to wait for them separately, and each can fail or pass independently of the others). The result type is a bit of a mess, but it doesn't actually matter much since you don't really need to do anything with it, assuming error is not `null`, because that means the export was successful and you can proceed by simply querying the resultant data in Seafowl.
...f-hosted Seafowl instance
Keep this separate from the existing Seafowl integration tests, and
add new environment variables `VITE_TEST_SEAFOWL_EXPORT_DEST_{URL,DBNAME,SECRET}`
for the "export target" Seafowl instance to use in Seafowl integration tests. If
all of them are set, then the integration test will execute.
Write a simple test that exporting a single query works as expected. Note
this doesn't actually query the resulting data in the Seafowl instance; it's
just a start of a "good enough" integration test for this functionality. More
notably, it also doesn't test exporting tables, or vdbs, or exporting multiple
queries/vdbs/tables, or failure modes; future robust testing should include
mocked responses for unit testing that covers the "wait for all tasks" logic
which is most likely to have one (or a few) bugs lurking in it.
Use `splitgraph-` prefix where applicable, and be more accurate about export destination, e.g. `SplitgraphExportQueryToFilePlugin` instead of `ExportQueryPlugin`
Allow starting a task, getting a taskId, and not waiting for it, so that later can check whether it's completed (point of this is so a Vercel server function can do the checking and the polling can be triggered from the client).
When `defer: true`, the returned `taskIds` value will include a
dictionary of taskIds as returned by the GraphQL API with `{ tables, queries, vdbs }`,
and the consumer can take each of these individual IDs and check them individually
(the idea being that a UI might render e.g. each table separately and then check
for their status independently). There's not really a need for a bulk check
since it would ultimately amount to the same requests anyway, just a question
of whether the vercel backend or the client is doing the batching.
milesrichardson
commented
May 22, 2023
Some stuff in this PR, mainly around refactoring and adding features to import and export plugins:
There is now an import plugin for all 100+ data sources supported by Splitgraph (Airbyte connectors, Singer taps, etc.)
Plugins can be "installed" by including them in the list of plugins passed to the constructor of the database (e.g. new SeafowlDb({plugins: [...]}) or new SplitgraphDb({plugins: [...]}), but usually called indirectly through helper functions like makeSplitgraphHTTPContext):
const { db } = makeSplitgraphHTTPContext({ plugins: [ new AirbyteGithubImportPlugin(), ], authenticatedCredential: { apiKey: credential.apiKey, apiSecret: credential.apiSecret, anonymous: false, }, });
Each plugin has a name, in this case airbyte-github, which is then specified as the first parameter to db.exportData or db.importData or db.pollDeferredTask (see below). The rest of the parameters to that function are defined by the function implementation of the plugin and are typechecked/auto-completed accordingly.
For the auto-generated plugins, credentials and params are auto-completed and typechecked via auto-generated types which are created from the JSONSchema returned by the Splitgraph API.
For example, import a GitHub repository into a Splitgraph repository using the airbyte-github plugin:
// (helper function that adds `new AirbyteGithubImportPlugin()` to the list of plugins) const db = createRealDb(); // You need to be authenticated to Splitgraph to use this plugin // Get the token, to get your username, so that you can import to a new repository in your namespace const { username: namespace } = await fetchToken(db); await db.importData( "airbyte-github", { credentials: { credentials: { personal_access_token: GITHUB_PAT_SECRET, }, }, params: { repository: "splitgraph/seafowl", start_date: "2021-06-01T00:00:00Z", }, }, { namespace: namespace, repository: "madatdata-test-github-ingestion", tables: [ { name: "stargazers", options: { airbyte_cursor_field: ["starred_at"], airbyte_primary_key_field: [], }, schema: [], }, ], } );
If the repository does not exist yet, then it will be created. Otherwise, a new image will be created with the imported data. This uses the StartExternalRepositoryLoad mutation from the Splitgraph GraphQL API.
Splitgraph import and export plugins can be "deferred" with { defer: true } parameter
By default, importData an exportData will create the import/export jobs via the Splitgraph API, and then wait for them to complete by polling the job status. However, this might not always be desirable. For example, perhaps you are submitting the job from the server side because it requires a secret (like your Splitgraph API_SECRET, or your credential for whatever upstream data source you're importing, like a personal access token for airbyte-github). In this case, you don't want the server side request to block until the ingestion job is complete, because it would likely timeout or hit limits of your Edge Function TM vendor.
Now you can pass { defer: true } to the importData and exportData functions, and they will return immediately with a serialized task description containing a taskId (or multiple taskIds in the case of an export of table(s)/quer(y)(ies)/vdb(s) to Seafowl). Then, you can call pollDeferredTask with a parameter for taskId to check whether the job has completed yet. The idea is that you would have one Edge Function TM to submit the ingestion job, and another Edge Function TM to check whether a taskId has completed yet (effectively acting as a relatively dumb proxy to the Splitgraph API). Then the client side doesn't need any secrets, and it can poll the edge function with reasonable granularity.
Note: For Seafowl, we still have a SeafowlImportFilePlugin which "imports" a file to Seafowl by uploading it to the multi-part upload endpoint, but this plugin is not deferrable because it actually does the work of uploading the file directly to Seafowl. If you want a deferred export to Seafowl, you probably want to go through Splitgraph first (by uploading the file to Splitgraph, and then using Splitgraph to export to Seafowl; since the actual export happens on the Splitgraph backend, it's deferrable).
Example: Defer export of query result from Splitgraph to Seafowl
Execute a query on Splitgraph and export its result to a table in a Seafowl DB. This uses the Splitgraph API and can also export multiple tables, vdbs, or queries at once. A separate job with its own taskId is created and returned for each.
(Notice the { defer: true } in the last parameter)
// Export the result of this query from Splitgraph to a Seafowl instance const queryToExport = `SELECT a as int_val, string_agg(random()::text, '') as text_val FROM generate_series(1, 5) a, generate_series(1, 50) b GROUP BY a ORDER BY a;`; const res = await db.exportData( "export-to-seafowl", { queries: [ { source: { query: queryToExport, }, destination: { schema: "madatdata_testing", table: destTable, }, }, ], }, { seafowlInstance: { selfHosted: { url: SEAFOWL_DEST_URL, dbname: SEAFOWL_DEST_DBNAME, secret: SEAFOWL_DEST_SECRET, }, }, }, { defer: true } );
Then, check if it's completed yet (in this example it hasn't) (mind the messy types):
const startedTask = await db.pollDeferredTask("export-to-seafowl", { taskId: taskId as string, }); expect(startedTask.completed).toBe(false);
Example: Defer export of Splitgraph query to Parquet file
Execute a query on Splitgraph and export its result to a Parquet file at a publicly accessible, signed temporary URL that will expire in 24 hours:
const { taskId, response, } = await db.exportData( "export-query-to-file", { query: `SELECT a as int_val, string_agg(random()::text, '') as text_val FROM generate_series(1, 5) a, generate_series(1, 50) b GROUP BY a ORDER BY a;`, vdbId: "ddn", }, { format: "parquet", filename: "random-series", }, { defer: true } );
Check whether the export is complete:
const startedTask = await db.pollDeferredTask("export-query-to-file", { taskId: taskId as string, }); expect(startedTask.completed).toBe(false);
Once it's completed, the URL of the exported file will be available (just like it would be available in the return value of exportData when called without { defer: true }):
const shouldBeCompletedTask = await db.pollDeferredTask( "export-query-to-file", { taskId: response.taskId } ); const parquetFileURL = shouldBeCompletedTask.response?.output.url;
Example: Defer import of uploaded CSV to table on Splitgraph DDN
Upload a CSV to object storage (using a pre-signed upload URL from Splitgraph), then provide that URL to the Splitgraph API to "import" the CSV as a table on the Splitgraph DDN
(Each new export creates an image. You can see the results of the integration testing in my public repo at https://splitgraph.com/miles/dunno )
const { response, info, taskId } = await db.importData( "csv", { data: Buffer.from(`name;candies\r\nBob;5\r\nAlice;10`) }, { tableName: `irrelevant-${randSuffix()}`, namespace, repository: "dunno", tableParams: { delimiter: ";", }, }, { defer: true } );
Check whether it's complete - in this case you also need to provide a namespace and repository, but those parameters are typechecked (again, ignore the as string 👀):
const startedTask = await db.pollDeferredTask("csv", { taskId: taskId as string, namespace, repository: "dunno", }); expect(startedTask.completed).toBe(false); // we just started it, so it's not completed yet :)
... to avoid fixing init ergonomics
... quickly The test defers an export task, and then checks its status, which it expects to still be pending. Previously, a query exporting 5 rows to parquet was completing quickly enough that the test would sometimes fail because it expected the task to still be pending but it was complete. Export a query of 10,000 random rows instead, to make it take a bit longer...
...s create a single job Instead of receiving a task ID for each exported table from the request, we now receive a task ID representing the job of exporting all the tables.
If a function is passed as the query parameter, instead of a string, then call it to create the query. If an abort signal is passed, call it in the cleanup function of the hook. If it's not passed, then create one by default and call that. This solves an issue where first render could be empty if the query wasn't ready yet (e.g. some interpolated value like table name came from the URL which hasn't been parsed), and the abort signal implements the correct behavior for unpredictable mounting sequences (you're not supposed to rely on `useEffect` only executing once).
... that shouldn't be released in next version)
Migrate pull request from: splitgraph/madatdata#21 into its own repo, using `git-filter-repo` to include only commits from subdirectory `examples/nextjs-import-airbyte-github-export-seafowl/` ref: https://github.com/newren/git-filter-repo This commit adds the Yarn files necessary for running the example in an isolated repo (as opposed to as one of multiple examples in a shared multi-workspace `examples`), points the dependencies to `canary` versions (which reflect versions in splitgraph/madatdata#20), and also updates the readme with information for running in local development.
WIP
Example (initialization part is pseudocode as it needs to be cleaned up)
type-checking/autocomplete works based on which plugin is specified in the first parameter:
image
Result: https://www.splitgraph.com/miles/madatdata-test-github-ingestion/latest/-/tables