diff --git a/bigframes/core/array_value.py b/bigframes/core/array_value.py index a6c700a485..4b05781cb7 100644 --- a/bigframes/core/array_value.py +++ b/bigframes/core/array_value.py @@ -403,8 +403,23 @@ def project_window_op( never_skip_nulls: will disable null skipping for operators that would otherwise do so skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection """ + + return self.project_window_expr( + ex.UnaryAggregation(op, ex.deref(column_name)), + window_spec, + never_skip_nulls, + skip_reproject_unsafe, + ) + + def project_window_expr( + self, + expression: ex.Aggregation, + window: WindowSpec, + never_skip_nulls=False, + skip_reproject_unsafe: bool = False, + ): # TODO: Support non-deterministic windowing - if window_spec.is_row_bounded or not op.order_independent: + if window.is_row_bounded or not expression.op.order_independent: if self.node.order_ambiguous and not self.session._strictly_ordered: if not self.session._allows_ambiguity: raise ValueError( @@ -415,14 +430,13 @@ def project_window_op( "Window ordering may be ambiguous, this can cause unstable results." ) warnings.warn(msg, category=bfe.AmbiguousWindowWarning) - output_name = self._gen_namespaced_uid() return ( ArrayValue( nodes.WindowOpNode( child=self.node, - expression=ex.UnaryAggregation(op, ex.deref(column_name)), - window_spec=window_spec, + expression=expression, + window_spec=window, output_name=ids.ColumnId(output_name), never_skip_nulls=never_skip_nulls, skip_reproject_unsafe=skip_reproject_unsafe, diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index acfa399d75..4607928b78 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1012,16 +1012,34 @@ def apply_window_op( skip_null_groups: bool = False, skip_reproject_unsafe: bool = False, never_skip_nulls: bool = False, + ) -> typing.Tuple[Block, str]: + agg_expr = ex.UnaryAggregation(op, ex.deref(column)) + return self.apply_analytic( + agg_expr, + window_spec, + result_label, + skip_reproject_unsafe=skip_reproject_unsafe, + never_skip_nulls=never_skip_nulls, + skip_null_groups=skip_null_groups, + ) + + def apply_analytic( + self, + agg_expr: ex.Aggregation, + window: windows.WindowSpec, + result_label: Label, + *, + skip_reproject_unsafe: bool = False, + never_skip_nulls: bool = False, + skip_null_groups: bool = False, ) -> typing.Tuple[Block, str]: block = self if skip_null_groups: - for key in window_spec.grouping_keys: - block, not_null_id = block.apply_unary_op(key.id.name, ops.notnull_op) - block = block.filter_by_id(not_null_id).drop_columns([not_null_id]) - expr, result_id = block._expr.project_window_op( - column, - op, - window_spec, + for key in window.grouping_keys: + block = block.filter(ops.notnull_op.as_expr(key.id.name)) + expr, result_id = block._expr.project_window_expr( + agg_expr, + window, skip_reproject_unsafe=skip_reproject_unsafe, never_skip_nulls=never_skip_nulls, ) diff --git a/bigframes/core/groupby/dataframe_group_by.py b/bigframes/core/groupby/dataframe_group_by.py index f234bad126..a2c4cf2867 100644 --- a/bigframes/core/groupby/dataframe_group_by.py +++ b/bigframes/core/groupby/dataframe_group_by.py @@ -275,6 +275,27 @@ def count(self) -> df.DataFrame: def nunique(self) -> df.DataFrame: return self._aggregate_all(agg_ops.nunique_op) + @validations.requires_ordering() + def cumcount(self, ascending: bool = True) -> series.Series: + window_spec = ( + window_specs.cumulative_rows(grouping_keys=tuple(self._by_col_ids)) + if ascending + else window_specs.inverse_cumulative_rows( + grouping_keys=tuple(self._by_col_ids) + ) + ) + block, result_id = self._block.apply_analytic( + ex.NullaryAggregation(agg_ops.size_op), + window=window_spec, + result_label=None, + ) + result = series.Series(block.select_column(result_id)) - 1 + if self._dropna and (len(self._by_col_ids) == 1): + result = result.mask( + series.Series(block.select_column(self._by_col_ids[0])).isna() + ) + return result + @validations.requires_ordering() def cumsum(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: if not numeric_only: @@ -546,10 +567,12 @@ def _apply_window_op( ) columns, _ = self._aggregated_columns(numeric_only=numeric_only) block, result_ids = self._block.multi_apply_window_op( - columns, op, window_spec=window_spec + columns, + op, + window_spec=window_spec, ) - block = block.select_columns(result_ids) - return df.DataFrame(block) + result = df.DataFrame(block.select_columns(result_ids)) + return result def _resolve_label(self, label: blocks.Label) -> str: """Resolve label to column id.""" diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index f1d2bacf08..bc2e9cc385 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -383,14 +383,14 @@ def test_dataframe_groupby_multi_sum( @pytest.mark.parametrize( - ("operator"), + ("operator", "dropna"), [ - (lambda x: x.cumsum(numeric_only=True)), - (lambda x: x.cummax(numeric_only=True)), - (lambda x: x.cummin(numeric_only=True)), + (lambda x: x.cumsum(numeric_only=True), True), + (lambda x: x.cummax(numeric_only=True), True), + (lambda x: x.cummin(numeric_only=True), False), # Pre-pandas 2.2 doesn't always proeduce float. - (lambda x: x.cumprod().astype("Float64")), - (lambda x: x.shift(periods=2)), + (lambda x: x.cumprod().astype("Float64"), False), + (lambda x: x.shift(periods=2), True), ], ids=[ "cumsum", @@ -401,16 +401,44 @@ def test_dataframe_groupby_multi_sum( ], ) def test_dataframe_groupby_analytic( - scalars_df_index, scalars_pandas_df_index, operator + scalars_df_index, + scalars_pandas_df_index, + operator, + dropna, ): col_names = ["float64_col", "int64_col", "bool_col", "string_col"] - bf_result = operator(scalars_df_index[col_names].groupby("string_col")) - pd_result = operator(scalars_pandas_df_index[col_names].groupby("string_col")) + bf_result = operator( + scalars_df_index[col_names].groupby("string_col", dropna=dropna) + ) + pd_result = operator( + scalars_pandas_df_index[col_names].groupby("string_col", dropna=dropna) + ) bf_result_computed = bf_result.to_pandas() pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False) +@pytest.mark.parametrize( + ("ascending", "dropna"), + [ + (True, True), + (False, False), + ], +) +def test_dataframe_groupby_cumcount( + scalars_df_index, scalars_pandas_df_index, ascending, dropna +): + bf_result = scalars_df_index.groupby("string_col", dropna=dropna).cumcount( + ascending + ) + pd_result = scalars_pandas_df_index.groupby("string_col", dropna=dropna).cumcount( + ascending + ) + bf_result_computed = bf_result.to_pandas() + + pd.testing.assert_series_equal(pd_result, bf_result_computed, check_dtype=False) + + def test_dataframe_groupby_size_as_index_false( scalars_df_index, scalars_pandas_df_index ): diff --git a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py index 4fb8498932..ebfbfa8830 100644 --- a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py +++ b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py @@ -718,7 +718,6 @@ def max( def cumcount(self, ascending: bool = True): """ Number each item in each group from 0 to the length of that group - 1. - (DataFrameGroupBy functionality is not yet available.) **Examples:**