This week I have started looking at our core data engineering platform. I have previously worked with Spotify’s Luigi, but a while back news leaked out that they were moving away from it internally and moving to Lyft’s Flyte, so depending on it for the new product did not seem like a good idea. Eventually I came upon Dagster, a second-generation Python scheduling & orchestration framework. Importantly, it is Kubernetes-friendly and has lots of interesting plugins for various targets, notifications (e.g. PagerDuty, Slack) and more. It looked like it was worth doing a spike to see if it’s a fit for us. Note this is still underway, so for today’s geek out we will just get a flavor of it: there is extensive documentation for Dagster if you want to dive into it more.
Some weeks ago I had attended a great Blockchain NYC session about Moonstream, an open source blockchain indexer, so for the spike I decided to combine the two packages: write a data pipeline that loads data from Moonstream’s beta Python client using Dagster; and then dump out a Parquet file with the data. For the dataset itself I decided to load Uniswap V2 trades, a nice useful dataset for monitoring DeFi activity. The whole pipeline would look something like this when done:
Once a day Dagster would fire a run and then we’d hit Moonstream API to load the data. But just to keep things simple for today’s example we’ll just focus on loading to a Parquet file on disk, using all the core elements from Dagster to help structure it nicely; we’ll add the cloud services and the production deployment pieces later.
But before we get to code, let’s look at some concepts.
Moonstream is a multi-chain analytics platform. Its core is a set of indexers that scan Ethereum, Solana and Polygon (soon) and make them available via a Web interface and, in beta, a Python API, which we will make use of here.
To get started you need to create a Subscription. This can be thought of as a standing query on the indexer with a set of limited filters. For purposes of this post we will create one which references the main Uniswap V2 smart contract address on the Ethereum blockchain, as our objective is to query it for the last 24 hours of trades on the Uniswap DEX.
Once you have a subscription you can start receiving data via a Stream, which come in the form of a series of Events. Unlike many of the more-developed block explorers and blockchain analytics tools out there — Moonstream is open source, and still very new — currently the Web GUI does not provide a lot of detail on each event, but you can get the transaction hash and drop it into Etherscan to see detail quickly:
Dagster is a Python framework that lets you write small pieces of Python code representing elements in the data pipeline, and then stitch them together into sophisticated DAG structures with schedules attached so the whole graph of tasks gets run in the correct sequence. A DAG — a directed acyclic graph — can be thought of as a set of related nodes with arrows connecting them to illustrate the relationships. In the case of the DAG for Dagster, the relationships are data dependencies but the concept is general, and a DAG could just as well be interpersonal relationships represented on a platform like LinkedIn — i.e., a social graph.
In Dagster the lowest-level nodes are called Operators, and they can be assembled into Graphs, which are encapsulated as runnable Jobs. Note this separation between Jobs and Graphs is not strictly required — you can have a Job built out of just a single Operator if you want — it is important for more complex setups: a Job can be thought of as a particular instantiation of a Graph, recognizing that often the patterns of connecting together various parts of a data loading pipeline are shared across multiple loaders, just with different configurations.
Because Dagster aims to rely on plain Python code as much as possible, the process of chaining together operators in jobs can be achieved by simply piping the output of one method call into another. These jobs can in turn be run in multiple ways: programmatically, via a Python API; from the command line, with the Dagster CLI; or interactively in the Dagit Web UI.
For the purposes of this code we do not want to rely upon manually creating a subscription in the Moonstream GUI, so we are going to do this programmatically via the create_subscription() call on the Moonstream client, and then subsequently delete_subscription() when the code completes; note the example shown is not ideal in terms of error handling — this clean-up step should execute regardless of whether the download succeeds or fails, but Dagster does not formally have the idea of a clean-up block of code; you need to do this yourself with just a try / finally wrapped around the code.
Once we have a subscription we can compute UNIX epoch times for now and for 24 hours ago, which will be the input to the events() call. Moonstream does not yet support Python generators to iterate over arbitrary time windows, so we need to break up the query ourselves and then assemble it into the final output data. A word of caution: the Moonstream’s client does not support HTTP request retry internally, and so if there are transient service errors the entire job will bail out with an exception; we will deal with this when we create the MoonstreamClient resource below.
The initial version of the code looks like this — there’s a lot here, so we will unpack it piece-by-piece. First, there are two functions: one annotated @op and the other @graph — this is a key feature of Dagster, it uses annotations to identify the sub-components of your data loading job. In a more complex example the graph would of course have multiple operators, but for simplicity we are going to put the main logic into just a single operator.
The second important point, which we will cover more in the next section, is we have declared some required_resource_keys. This is effectively Dagster’s dependency injection mechanism: if those keys are not available in the context, the job will fail, and they can be retrieved. For instance, we can access the Moonstream client resource we will show later via context.resources.moonstream_client, using that same key.
The final important element is we declare an Out object in the annotation, and indicate we want an injected IOManager, parquet_io_manager, with a declared type UniswapV2TradeDataFrame — we will look at this code below as well, but for now the key thing to understand is that Dagster knows the expected data we are going to return, and also lets us inject custom code to capture that output and do something with it.
The rest of the code is just querying events in a range, and then enriching them a bit with data downloaded from Infutura and Etherscan using other resources. Note this code still needs a lot of work: all we do here is extract the smart contract swap call and that from and to tokens being swapped; importantly, we don’t yet parse out the token quantities exchanged or their values at the time of the swap: a real-world data pipeline loading Uniswap trade data would require all that enrichment as well, of course.
Using configurable resources
One thing you probably noticed above is that all input I/O — downloading of data — is done indirectly, through resource objects. But what’s a resource? A resource is any Python object defined by the job, and optionally each resource can have its own custom configuration block in JSON. This is important, because it means that for API keys and other secrets, we can inject these and also vary them by environment.
To do this, we need two pieces: a class which has the resource logic we want, in this case LiveMoonstreamClient, and a factory function annotated with @resource declaring our resource and the configuration key or keys that it requires: in this case we just need the Moonstream API key. Note we have also addressed retry and throttling here by using tenacity and token-bucket to gate our calls to the Moonstream API — so even though the underlying client doesn’t retry and allows unlimited calls, we can enforce this in the resource.
Adding DQ checks with dagster_pandas
As noted above, one of Dagster’s great features is inputs and outputs are type-safe, even if the underlying representation (like a Pandas DataFrame) is not normally strongly typed. This allows Dagster to apply validations to your data after it’s generated, enforcing data quality before the data gets piped to the next stage; this helps ensure our data pipelines are robust. For instance, above when we declared our UniswapV2TradeDataFrame type we required a categorical_column called swap_method and limited the values just to the Uniswap V2 smart contract methods for swaps — if the code has a bug and accidentally includes calls to, say, add or remove liquidity, the job will fail. This brittleness can be a good thing: if other parts of our system depend upon the generated data having a certain shape and composition, we can ensure we identify and stop problems upstream, before they potentially corrupt the system’s business logic.
Writing out Parquet with an IOManager
Dagster also abstracts out I/O for input and output — typically for generated files, e.g. Parquet files. For our application we want to take that DataFrame with Uniswap V2 trade data and simply write it out as Parquet on disk. We can inject an IOManager like this which takes care of that for us, and again following the pattern with Dagster, we can then have one or more factory functions annotated with @io_manager to produce different configuration and, where required, inject required resources.
Tying it all together with Jobs & Repositories
As covered above in concepts, Dagster distinguishes between a graph, which is kind of like a template for a job, and the job definition itself. A Job is a specific instance of a graph with a particular configuration. Similarly, a Repository is the next level up, a specific collection of Jobs to be deployed together. Like everything else in Dagster, they are all set up in annotated code:
Here we use the to_job() function to convert our factory functions into Dagster job definitions. This is point where we actually provide the resources and the configs, though note — in the partitioned job case— we can also inject other factory functions that generate configs for us. Similarly, we use the configured() function on our resource and IOManager factory functions to provide specific configurations for each of the jobs we want to define. This separation of various concerns, between operators, graphs, resource factories, I/O managers, jobs and the configuration that ties them together, is the most powerful part of Dagster, because it lets us write code once and only once, then customize it for, say, test vs. production environments.
Finally, we take advantage of one last concept in Dagster: the @repository. A repository is a collection of jobs. So here we will create two: a local one with our local job configuration, and a production one with our (partitioned, scheduled) hourly production load job:
Running our job
Dagit is a Web-based monitoring & management tool for Dagster jobs. You can spin up a simple test instance with dagit -f my_job_file.py and then go to the Launchpad tab and click Launch Run to kick off the job. As noted above you can do the same with dagster job execute -f my_job_file.py if you want to use the command line interface instead.
The Moonstream team just released V2 this week, and it should be a huge step forward for anyone looking to extract detailed data from smart contract calls; a lot of the hand-rolled code shown about with Infura and Etherscan can potentially be simplified. Importantly, this upgrade should dramatically improve the performance of our loader, because making those one-at-a-time calls to Infura is very expensive. Once we have bedded in the latest version of the code we may return to this topic to see what it looks like.
Note this sample code makes use of a number of libraries. For reference, the exact versions used are shown here:
Many thanks to Neeraj Khashyap on the Moonstream Discord (zomglings) who provided valuable help & sample code to get the beta Python client working, and to the whole Moonstream team for providing their open source platform for blockchain analytics.