Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support filter over indexes on nested fields #380

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

andrei-ionescu
Copy link
Contributor

@andrei-ionescu andrei-ionescu commented Mar 11, 2021

What is the context for this pull request?

What changes were proposed in this pull request?

This PR adds support for filtering over nested fields using indexes created over nested fields.

Given the nestedDataset dataset with schema

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- nested: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |    |-- nst: struct (nullable = true)
 |    |    |-- field1: string (nullable = true)
 |    |    |-- field2: string (nullable = true)

and the following data

+---+-----+-----------------+
|id |name |nested           |
+---+-----+-----------------+
|2  |name2|[va2, [wa2, wb2]]|
|1  |name1|[va1, [wa1, wb1]]|
+---+-----+-----------------+

And the following search/filter query

df.filter(df("nested.nst.field1") === "wa1").select("id", "name", "nested.nst.field2")

The optimized and spark plans without index are

Project [id#100, name#101, nested#102.nst.field2]
+- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
   +- Relation[id#100,name#101,nested#102] parquet
Project [id#100, name#101, nested#102.nst.field2]
+- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
   +- FileScan parquet [id#100,name#101,nested#102] Batched: false, Format: Parquet, 
        Location: InMemoryFileIndex[file:/..../tableN2], PartitionFilters: [], 
        PushedFilters: [IsNotNull(nested)], ReadSchema:
        struct<id:int,name:string,nested:struct<field1:string,nst:struct<field1:string,field2:string>>>

The transformed optimized and spark plans should look like

Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
+- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
   +- Relation[__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3]
        Hyperspace(Type: CI, Name: idx_nested, LogVersion: 1)
Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
+- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
   +- FileScan parquet [__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3] 
        Batched: false, Format: Parquet, 
        Location: InMemoryFileIndex[file:/..../spark_warehouse/indexes/idx_nested/v__=0],
        PartitionFilters: [], PushedFilters: [IsNotNull(__hs_nested.nested.nst.field1)], ReadSchema: 
        struct<__hs_nested.nested.nst.field1:string,id:int,name:string,__hs_nested.nested.nst.field2:string>

Complexities

Transforming the plan

Filters inside the plan must be modified to accomodate the index schema not the data schema - the flattened schema not the nested field. Instead of accessing the field with GetStructField(GetStructField(AttributeReference)) it must directly access with AttributeReference.

Given the query plan

Project [id#100, name#101, nested#102.nst.field2]
+- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
   +- Relation[id#100,name#101,nested#102] parquet

The filter will be modified from

Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))

to

Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))

The projection from

Project [id#100, name#101, nested#102.nst.field2]

to

Project [id#1, name#2, __hs_nested.nested.nst.field2#3]

The relation from

Relation[id#100,name#101,nested#102] parquet

to

Relation[__hs_nested.nested.nst.field1#0, id#1,name#2, __hs_nested.nested.nst.field2#3] 
  Hyperspace(Type: CI, Name: idx_nested, LogVersion: 1)

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Existing unit and integration tests for not breaking the existing functionalities.
  • Unit and integration test added for the new functionalities.

@andrei-ionescu
Copy link
Contributor Author

@sezruby, @imback82 Here is the second PR that adds support for filtering over nested fields with indexing.

@andrei-ionescu andrei-ionescu force-pushed the nested_fields_filter branch 2 times, most recently from d077c61 to 8c10382 Compare March 29, 2021 13:50
@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Mar 31, 2021

@sezruby, @imback82: Could you check why the builds are failing with:

[error] [launcher] download failed: org.scala-sbt#main;0.13.18!main.jar
[error] [launcher] download failed: org.scala-sbt#compiler-interface;0.13.18!compiler-interface.jar
[error] [launcher] error during sbt launcher: error retrieving required libraries
  (see /home/vsts/.sbt/boot/update.log for complete log)
[error] [launcher] could not retrieve sbt 0.13.18
##[error]Bash exited with code '1'.
Finishing: Running $sbt clean

Is there a mechanism to trigger the build once more on this PR? Something like commenting a keyword to trigger the build again?

@sezruby
Copy link
Collaborator

sezruby commented Mar 31, 2021

Is there a mechanism to trigger the build once more on this PR? Something like commenting a keyword to trigger the build again?

You can trigger via "re-run" in Checks tab

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Mar 31, 2021

@sezruby I don't see any "re-run" in the Checks tab. I don't have same rights as you. 😃

@andrei-ionescu
Copy link
Contributor Author

@sezruby, @imback82 Can you have another look on the PR so that we can advance with it? Thanks!

@sezruby
Copy link
Collaborator

sezruby commented Apr 2, 2021

Actually I'm waiting for #393 because of the conflict.
@andrei-ionescu Could you review the change again? Since SchemaUtil is removed in that change, I wonder if it's ok or not.

@andrei-ionescu
Copy link
Contributor Author

@sezruby I didn't know that #393 is a prerequisite for this. If so, let's integrate feedback on that PR and get it merged so that I can advance with this one.

Remember we agreed to split the support for nested fields feature into multiple PRs so it can move faster. So, let's move faster 😁.

* @param project Project to check if it's supported.
* @return True if the given project is a supported relation.
*/
protected[rules] def hasNestedColumns(project: Project, index: IndexLogEntry): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to be a good candidate for a unit test

* @param filter Filter to check if it's supported.
* @return True if the given project is a supported relation.
*/
protected[rules] def hasNestedColumns(filter: Filter, index: IndexLogEntry): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to be a good candidate for a unit test

* @param fieldName The name of the field to search for.
* @return An [[ExprId]] if that could be found in the plan otherwise [[None]].
*/
private def getExprId(plan: LogicalPlan, fieldName: String): Option[ExprId] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to be a good candidate for a unit test

* @param exp The Spark Catalyst expression from which to extract names.
* @return A set of distinct field names.
*/
def extractNamesFromExpression(exp: Expression): ExtractedNames = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to be a good candidate for a unit test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add tests to all methods in here.


import com.microsoft.hyperspace.util.ResolverUtils

object PlanUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name sounds general, but the actual code seems mostly about nested fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code work for any kind of field. Is needed by the nested fields feature because it needs to do more transformations. In my initial implementation the methods here were used for flat fields too.

}
}

var toRemove = Seq.empty[String]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to remove the unnecessary isnull check, right? If someone without the knowledge that this code is about supporting nested fields, the meaning of this name could be vague.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Given this plan:

Project [Date#89, nested#94.leaf.cnt AS cnt#336, nested#94.leaf.id AS id#337]
+- Filter ((isnotnull(nested#94) && (nested#94.leaf.cnt > 10)) && (nested#94.leaf.id = leaf_id9))
   +- Relation[Date#89,RGUID#90,Query#91,imprs#92,clicks#93,nested#94] parquet

It will be transformed into:

Project [Date#380 AS Date#381, __hs_nested.nested.leaf.cnt#379 AS cnt#382, __hs_nested.nested.leaf.id#378 AS id#383]
+- Filter ((__hs_nested.nested.leaf.cnt#379 > 10) && (__hs_nested.nested.leaf.id#378 = leaf_id9))
   +- Relation[__hs_nested.nested.leaf.id#378,__hs_nested.nested.leaf.cnt#379,Date#380] Hyperspace(Type: CI, Name: filterNestedIndex, LogVersion: 1)

The isnotnull(nested#94) is no longer suitable and is removed in the transformFilter method because:

  • The nested field name is not part of the index (index contains __hs_nested.nested.leaf.cnt and __hs_nested.nested.leaf.id)
  • The isnotnull construct checks for a nested field to not be null because trying to access the leaves when it's null would end up in exceptions
  • The values stored in the index are not nested, they are flat
  • In the example above, we cannot just rename the isnotnull attribute reference as we have two fields instead of one (__hs_nested.nested.leaf.cnt and __hs_nested.nested.leaf.id)

I'll add more info on the method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add both isnotnull(__hs_nested.nested.leaf.cnt) & isnotnull(__hs_nested.nested.leaf.id)?
Since it seems Spark automatically adds isnotnull for project columns and filter condition columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can. I'll try it.

@clee704
Copy link
Contributor

clee704 commented Apr 19, 2021

Maybe a dumb question, but why should we rename the fields? Can't we just keep the original nested structures in index data?

* @param useBucketSpec Option whether to use BucketSpec for reading index data.
* @return A transformed [[LogicalRelation]].
*/
private[rules] def transformRelation(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some NOTE to reviewers comments, for example which part is changed in this file?

* @param relation Relation with which given indexes are compared.
* @return Active indexes built for this plan.
*/
def getCandidateIndexes(
Copy link
Collaborator

@sezruby sezruby Apr 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I will refactor this soon with HyperspaceOneRule, but could you move this function back to RuleUtils? and rename this class as IndexPlanApplyHelper (& IndexPlanWithNestedApplyHelper)

Also you can use spark from FilterIndexRule or JoinIndexRule as they extend ActiveSparkSession.

Copy link
Collaborator

@sezruby sezruby May 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/microsoft/hyperspace/pull/380/files#r617166273
Please address this comment - to reduce the diff.

Frankly speaking, the change became quite big - it's around 3000 lines of diff.
Though most of changes are from existing codebase, we cannot check them line by line.
So I wonder if we could split this PR into 2 PRs.. 1) refactoring only 2) add IndexPlanWithNestedApplyHelper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sezruby

  1. I rather not put back getCandidateIndexes as the method requires spark being given to it. The current implementation is consistent as the methods that don't require spark are in RuleUtils while the other are part of the helper instances.
  2. There 3000 lined due to extensive tests that I added to the verify the plan transformation. In a way or another someone should review it.
  3. "Frankly speaking", I already did a lot of extra work complying with your requests of splitting in multiple PRs. Now, once more this request to split PR to this same feature. This is very frustrating for me and looks like you're using any opportunity to push this back. I would understand the split into multiple PRs for new features but I though we did come to an agreement at the time we decided to split the feature in 4-5 other PRs. And this is only the second PR 😞.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1=>
It will be refactored out after rule refactoring. Just put it back to reduce the diff...
You can use spark from FilterIndexRule / JoinIndexRule, as before.
Also no need to getCandidate function in this class. (IndexPlanApplyHelper ?) - they have different role.

2/3=>
I understand your frustration, but the changes modify wide range of codebase and also core part.
This refactoring was not considered at the time we agreed and that's why this change became huge.
And you know small & explicit changes can be merged quickly, in general.. and splitting PR is also helping to trace the history.

Sorry but I think we should do that. If you are not up for it, I'll push a refactoring PR(no nested related code) based on your PR, until early next week.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sezruby I want to correct you. The code does not add lots of changes. If you remember - and to help you remember, here is a previous discussion a month back, #380 (comment) - we agreed to have separate classes for the already existing code and the nested features. So now we have BaseRuleHelper and NestedRuleHelper. From the old RuleUtils I extracted all the methods that need spark into the BaseRuleHelper and kept the methods that don't need spark in RuleUtils as static methods. So, from the old RuleUtils now we have 2 files: a stripped down RuleUtils and the BaseRuleHelper. The classes have the code unchanged from the old RuleUtils, only the method signatures are changed now that spark is a property inside the instance.

The new things are inside NestedRuleHelper which extends some methods from BaseRuleHelper to accommodate the nested fields support.

This is exactly what we agreed on that comment.

There is no other refactor here than this what I explained above refactor that does NOT make any sense without the next changes.

I propose to merge try merge this and you create the changes that you want on top of it.

@imback82, @rapoth: What is your opinion?

}
}

var toRemove = Seq.empty[String]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add both isnotnull(__hs_nested.nested.leaf.cnt) & isnotnull(__hs_nested.nested.leaf.id)?
Since it seems Spark automatically adds isnotnull for project columns and filter condition columns.

* @param repl The replacement Spark Catalyst [[Expression]].
* @return A new Spark Catalyst [[Expression]].
*/
def replaceInSearchQuery(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about replaceExpression

@sezruby
Copy link
Collaborator

sezruby commented Apr 21, 2021

Maybe a dumb question, but why should we rename the fields? Can't we just keep the original nested structures in index data?

@clee704 That's a good question. One reason could be that we store index data as "indexed columns" ++ "included columns". For example, if indexed columns are nested.a, b and included columns are nested.c, d, then it's not possible to define the column schema with that order because of the same field name - nested. (But I wonder the order of columns in index data does really matter. )
And maybe another reason might be there're several issues on handling nested columns itself in Spark & to minimize design change in Hyperspace.

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Apr 23, 2021

@sezruby, @clee704, @imback82: The datasets I use to create indexes over contains deep nested fields with many children.

For example, this is a half of the schema of one of my datasets and has about 600 nested fields in one of its structs:

root
 |-- ts: timestamp (nullable = true)
 |-- id: string (nullable = true)
 |-- xs: struct (nullable = true)
 |    |-- an: struct (nullable = true)
 |    |    |-- ev1_100: struct (nullable = true)
 |    |    |    |-- e1: struct (nullable = true)
 |    |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- e2: struct (nullable = true)
 |    |    |    |    |-- value: double (nullable = true)
 |    |    |      ...
 |    |    |    |-- e100: struct (nullable = true)
 |    |    |    |    |-- value: double (nullable = true)
 |    |    |-- ev101_200: struct (nullable = true)
 |    |    |    |-- e101: struct (nullable = true)
 |    |    |    |    |-- value: double (nullable = true)
 |    |    |      ...
 |    |    |    |-- e200: struct (nullable = true)
 |    |    |    |    |-- value: double (nullable = true)
 |    |    |-- ev201_300: struct (nullable = true)
 |    |    |    |-- e201: struct (nullable = true)
 |    |    |    |    |-- value: double (nullable = true)
 |    |    |      ...
 |    |    |    |-- e300: struct (nullable = true)
 |    |    |    |    |-- value: double (nullable = true)
 |    |    |-- ev301_400: struct (nullable = true)
 |    |    |    |-- e301: struct (nullable = true)
 |    |    |    |    |-- value: double (nullable = true)
 |    |    |      ...
 |    |    |    |-- e400: struct (nullable = true)
 |    |    |    |    |-- value: double (nullable = true)
 |    |    |-- dim: struct (nullable = true)
 |    |    |    |-- v: struct (nullable = true)
 |    |    |    |    |-- v1: string (nullable = true)
 |    |    |    |    |-- v2: string (nullable = true)
 |    |    |    |      ...
 |    |    |    |    |-- v99: string (nullable = true)
 |    |    |    |    |-- v100: string (nullable = true)
 |    |    |    |-- p: struct (nullable = true)
 |    |    |    |    |-- p1: string (nullable = true)
 |    |    |    |    |-- p2: string (nullable = true)
 |    |    |    |      ...
 |    |    |    |    |-- p_99: string (nullable = true)
 |    |    |    |    |-- p_100: string (nullable = true)
 |    |    |    |-- hiers: struct (nullable = true)
 |    |    |    |    |-- h1: struct (nullable = true)
 |    |    |    |    |    |-- values: array (nullable = true)
 |    |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |    |-- delimiter: string (nullable = true)
 |    |    |    |      ...
 |    |    |    |-- lists: struct (nullable = true)
 |    |    |    |    |-- l1: struct (nullable = true)
 |    |    |    |    |    |-- list: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- value: string (nullable = true)
 |    |    |    |    |-- l2: struct (nullable = true)
 |    |    |    |    |    |-- list: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- value: string (nullable = true)
 |    |    |    |      ...
 |-- sch: struct (nullable = true)
 |    |-- se: string (nullable = true)
 |    |-- keys: string (nullable = true)

I want to index over the xs.an.dim.p.p_99 nested field. Are you proposing to keep the whole xs struct in the index?

If so, I don't think that is efficient at all and I'll give a few reasons:

  1. Storage cost — It will take a lot of space
  2. Performance cost — It will be less usable due to response time when used in queries
  3. Management cost — It will be harder to maintain up to date

Performance and cost wise I can say that the current approach — extracting/flattening the nested field — is what is needed in Hyperspace even though the implementation code gets more complex.

FYI: I'm currently a bit busy with some other work assignment but I'll get back to integrating your feedback soon.

@clee704
Copy link
Contributor

clee704 commented Apr 23, 2021

@andrei-ionescu Since we store data in a parquet format, which is efficient in storing nested data in a columnar format, does it matter? For a parquet file, a dataset with a single column "a" or a single nested column "a.b.c.d.e" should make no difference, I guess.

@andrei-ionescu
Copy link
Contributor Author

It does matter! Even if parquet stores data in an efficient columnar way, there are issues with parquet and nested fields in the way the predicate push down works - or to say it better, it doesn't work. This is the performance cost. There is the storage cost too that is not negligible either - if you are forced to store half of the dataset in an index. Lastly but not the least, the management cost (rebuild index, incremental refresh, etc) will incur additional time that will make index harder update and keep in sync with the data itself - if the index is not easily (fast & efficient) updatable then it's not usable in these big data fast moving datasets. These costs are not acceptable for the cases of big nested datasets.

@rapoth, @imback82, @sezruby, @apoorvedave1, @clee704: Here is an article I wrote on what Adobe is doing on big data and what's the scale we work with: https://medium.com/adobetech/high-throughput-ingestion-with-iceberg-ccf7877a413f. Or if you prefer video here is a presentation on the same matter: https://www.dremio.com/subsurface/high-frequency-small-files-vs-slow-moving-datasets.

@imback82
Copy link
Contributor

I don't think we can "bucket by" nested fields either. BTW, predicate pushdown for nested fields should be supported in Spark 3.0: apache/spark@cb0db21

@clee704 is your concerned addressed?

@andrei-ionescu
Copy link
Contributor Author

That's right @imback82! I totally forgot about the bucketing done in Hyperspace. That is not possible over nested fields.

@clee704
Copy link
Contributor

clee704 commented Apr 26, 2021

I want to index over the xs.an.dim.p.p_99 nested field. Are you proposing to keep the whole xs struct in the index?

Can't we just store the indexed/included nested fields, ignoring others? Also, it seems the problem is not inherent to the nested fields, but due to a lack of proper support in the framework.

Anyway, it seems Spark lacks support for nested fields and that's a good enough reason that justifies renaming.

@andrei-ionescu
Copy link
Contributor Author

@sezruby, @clee704, @imback82 I'll get back to this PR very soon. I've been side tracked by another assignment but I'll get back to it integrating the feedback and adding the needed UTs.

@andrei-ionescu andrei-ionescu force-pushed the nested_fields_filter branch 3 times, most recently from 5730140 to a69a8b6 Compare May 14, 2021 09:52
@andrei-ionescu
Copy link
Contributor Author

@imback82, @sezruby, @clee704: Sorry for being out of this PR for a few weeks. Now, I got some time and I'm back to it. Here are the changes:

  • Rebased it over Spark 3 support and resolved conflicts
  • Integrated your feedback
  • Added PlanUtilsTest and NestedRuleHelperTest tests to validate changes on the plan

Please have another look.

* @param relation Relation with which given indexes are compared.
* @return Active indexes built for this plan.
*/
def getCandidateIndexes(
Copy link
Collaborator

@sezruby sezruby May 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/microsoft/hyperspace/pull/380/files#r617166273
Please address this comment - to reduce the diff.

Frankly speaking, the change became quite big - it's around 3000 lines of diff.
Though most of changes are from existing codebase, we cannot check them line by line.
So I wonder if we could split this PR into 2 PRs.. 1) refactoring only 2) add IndexPlanWithNestedApplyHelper

@andrei-ionescu
Copy link
Contributor Author

@imback82, @rapoth: How can we get this moving? It's more than 3 months now...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[PROPOSAL]: Index nested fields
5 participants