AMA With Kevin Kho from the Fugue Project

Feb 23, 2023

We regularly invite ML practitioners and industry leaders to share their experiences with our Community. Want to ask questions to our next guest? Join BentoML Community Slack

AMA with Kevin Kho.png

We recently invited Kevin Kho. Kevin Kho is a maintainer for the Fugue project, an abstraction layer for distributed computing. Previously, he was an Open Source Community Engineer at Prefect, a workflow orchestration management system. Before working on data tooling, he was a data scientist for 4 years.

Key Takeaways:

•Fugue allows for decoupling of logic and execution, reducing the need to test on the production cluster. Using Fugue can reduce compute costs of the cluster by 50% and increase the number of Spark jobs submitted in the same time.

•The future of MLOps may be impacted by the increasing use of foundation models, which may require different modeling and maintenance approaches. Companies may host models as APIs, but serious ML may still require fine-tuning, which may lead to specialized companies in hosting models.

Can you discuss some of the key features and benefits of using Fugue?

1. Fugue allows data practitioners to focus on their business logic instead. We are then responsible for bringing it to Spark, Dask, or Ray. The problem is that data projects start out with tooling. Practitioners think if they’ll use Spark or Dask, or if the data is small enough, Pandas. We want data projects to start with business requirements and logic. By decoupling logic and execution, we also get some other benefits.

2. Testing Spark code requires a lot of spin-up. By using Fugue, you can test the Python/Pandas code on local. When ready, you can run on a cluster. Big data project development becomes faster and cheaper.

3. I used to be in a payroll team that wrote the business logic 2x. Once for Pandas and once for Spark. Payroll logic is very heavy of if-else statements, and we were writing the same thing twice to be used with multiple frameworks. Fugue allow you to consolidate this kind of code because the Pandas/Python version can be used on Spark also.

The most common interface of Fugue is the transform() function which distributes a single step. So just by adding a few lines of code, you can scale the execution of Python/Pandas code

So interesting to hear the back story of fugue. SCOPE was very powerful at its time for being able to embed and execute C# code in SQL queries.

Yeah so we collaborate with some open source projects like WhyLogs (Alessya’s project in the last AMA). They can make their projects run on top of Spark, Dask, and Ray just by adding a FugueBackend. We also did the same for PyCaret and Nixtla so that they can run on the different backends also with one implementation. (edited) 

At what stage do you think a team will start benefiting from using fugue, as opposed to writing framework-specific implementation in Spark or Ray?

We’d like to think all teams can benefit. Writing a framework-specific implementation instantly locks in your code to hardware also. Spark code needs to run on a Spark cluster (for the most part). At Lyft, we saw that decoupling logic and execution means that data scientists only need to test on the cluster when they are production ready. As Fugue got more adopted,

•compute costs of the cluster decreased by 50%

•there were 60% more Spark jobs submitted in the same time

There are no extra optimizations on Fugue. The decrease in cost is because people use the cluster when they are production-ready, as opposed to iterating on the cluster. With big data, mistakes can be very expensive and Fugue reduces mistakes on the cluster because users test on the local machine instead (with the Pandas backend). The number of Spark jobs speaks to making Spark more accessible for the average data scientist. The most recent StackOverflow survey showed there are 5x more Pandas users than Spark users. There is still a gap in using Spark because it’s quite hard to use. Fugue also helps eliminate a lot of the boilerplate code needed to work with Spark (schema, UDFs, etc.) 

Could you please expand on why iterating on the production cluster would increase the cost of the cluster so significantly?

Yep, let’s say I am developing on the cluster. The output of a function is not what I expect, so I edit the function and just run my script again. Test-driven development is actually hard in Spark because of boilerplate code. A lot of data practitioners don’t even bother with it. Testing with big data systems is tricky because there are tests on logic, and tests on execution. These become coupled together on Spark. But anyway, I edit my function and run the script again but it breaks later. Rinse and repeat the iteration process, I may have run the script on the cluster 5x or 6x before it’s working as intended. Fugue let’s that happen on the local machine and you can test your whole pipeline. The cluster can now just be used for fuller integration tests.

Have you found that any of the backends perform significantly better than others? Also, interested to know how you ended up settling on Arrow as opposed to any other serialization format. Is that just a product of Spark using it internally? Were there any other formats (other than pandas) that you considered?

This is a hard question because frameworks are always improving. In general:

1. Spark is the most scalable for data processing, can reach beyond 100GB with no problem.

2. Dask has lower overhead for smaller data and easier infrastructure management, but it requires special handling to scale

3. Ray has a solution called Ray Dataset but they call it a “last-mile” DataFrame and suggest you use the other two for preprocessing

It might not be an either-or thing. We might see Spark used for preprocessing and Ray or Dask for the machine learning piece. But if you are in a company that already uses one, the overhead of introducing other tools and infrastructure might not be worth it.

Note if you are on Dask, there are significant improvements in versions 2022.07 and 2022.11 regarding memory management.

We use Pandas for the local iteration engine. We don’t code the conversion ourselves because Spark does that. You can convert a Pandas DataFrame to Spark and the other way around. The reason we need Arrow is because Schema operations are explicit in Spark. You need to define the type of the new column you are creating. So if you define your logic in Pandas and use float, we need to bring that to Spark and find the appropriate type.

Quick question on the compute distribution you see among Spark, Dask, and Ray. Do you see more people moving their workload to Ray? Thank you.

I think we find people don’t directly code in Ray, but it’s more common to see it as a backend of something. For example, Prefect has a RayTaskRunner and Nixtla had a Ray backend before Fugue. So the more common thing was thing these open-source tools scale out their product by adding a Ray backend.

In that sense, if you strongly use the DataFrame semantics, it seems Spark and Databricks are still very strong. If you use the other distributed computing objects like Futures, then I think the competition is between Ray and Dask.

I see Ray used a lot in POCs also, but rarely for production workloads. I think the latest ChatGPT was trained on Ray though! I think Ray excels for a lot of these Deep Learning Distributed training type stuff because their architecture spins up a cache (think Redis) that holds state across workers. So you can train one big model across several machines.

On the Fugue level, we don’t touch distributed training. Instead, we normally train one model per partition. For example, we can train one model per US State. Just partition the data per state and perform the grid search for each one. Different models will be better for different states. This is also very convenient because you can treat each training as a “local problem” and the distributed nature is just orchestrating these “local problems”

I am curious about your opinion on future of mlops. With the recent excitement on foundation models. Do you see any changes in MLOps workflow that are going to cause by that?

That is an interesting question. I guess the direct effect is how we approach modeling of problems, right? Just two months ago, I saw a conference presentation about a company that automates customer support chat. They said they don’t use big models. They actually use many, many small models. Depending on responses, they load another model to interpret the new answer.

This made complete sense to me. But after ChatGPT came out, it’s starting to feel like we’ll be dealing with one big model as opposed to that multiple small model approach. I guess in theory, we lessen the maintenance work dealing with smaller, larger models, but I think it also becomes hard from the data scientist perspective. If one particular use case is bad, how do you tune that big big model?

Another thing to think about is that there will be companies that host these models as APIs. That may abstract a lot of the hosting of these models. But I don’t know, I feel for any serious ML, you can’t just use an off-the-shelf API. You likely need to fine tune the model, but I guess we still may see companies specializing in hosting models? I don’t know if that’s something BentoML would do.


* The discussion was lightly edited for better readability.