Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hashing functions/objects defined elsewhere #717

Closed
satra opened this issue Oct 22, 2023 · 43 comments
Closed

hashing functions/objects defined elsewhere #717

satra opened this issue Oct 22, 2023 · 43 comments
Labels
question Further information is requested

Comments

@satra
Copy link
Contributor

satra commented Oct 22, 2023

with current master after hashing changes were incorporated.

FAILED pydra_ml/tests/test_classifier.py::test_classifier - pydra.utils.hash.UnhashableError: Cannot hash object {'permute': True, 'model': (Pipeline(steps=[('...
FAILED pydra_ml/tests/test_classifier.py::test_regressor - pydra.utils.hash.UnhashableError: Cannot hash object {'permute': True, 'model': (Pipeline(steps=[('...

in pydra_ml almost everything was marked as type Any, with the old hashing function figuring out things. i saw this as something a general scripter would do. i can go and do stricter type annotation (i have tried this too - see below), but i wouldn't expect a general user to do that when importing a function from an arbitrary library.

what's the minimal thing one can do to fix this on the user side, so that arbitrary functions could be imported in pydra? i suspect this may actually require changes to the hashing. an approach may be to pickle the object and generate the byte stream, which seems like a sensible fallback in a local setting instead of UnhashableError.

in this particular case, the scikit-learn Pipeline object is being used, which is an arbitrarily nested object of objects. it's the input to this function. an equivalent consideration would be someone decided to a pass a pydra workflow as an input to a function.

https://github.com/nipype/pydra-ml/blob/b58ad3d488857716df74c5917d5ad11729c25258/pydra_ml/tasks.py#L129

def get_feature_importance(*,
                           permute: bool,
                           model: tuple[Pipeline, list, list],
                           gen_feature_importance: bool = True):
@satra satra added the question Further information is requested label Oct 22, 2023
@effigies
Copy link
Contributor

This shouldn't be tied to type annotations, apart from files, but actual runtime types. Hashing the cloudpickle as a fallback for types we don't know how to hash seems reasonable.

@effigies
Copy link
Contributor

Looking at the old function:

def hash_function(obj):
    """Generate hash of object."""
    return sha256(str(obj).encode()).hexdigest()

I see how that was so successful. :-) I do think it would be good to use something more like cloudpickle than str(). str() of many objects will be sensitive to changes in interpreter and insensitive to changes in contents.

@effigies
Copy link
Contributor

Oh, and finally, the fastest way to enable hashing on a currently-unhashable object would be:

import cloudpickle as cp
from pydra.utils.hash import register_serializer, Cache

@register_serializer
def bytes_repr_Pipeline(obj: Pipeline, cache: Cache):
    yield cp.dumps(obj)

@satra
Copy link
Contributor Author

satra commented Oct 22, 2023

thanks @effigies for the register code. that helps.

in this particular instance str(obj) works because a scikit-learn's pipeline is reproducible from it's repr:

Pipeline(steps=[('std', StandardScaler()),
                ('MLPClassifier', MLPClassifier(alpha=1, max_iter=1000))])

but a general pipeline could indeed use the cloudpickle.

should i leave this open for adding a fallback option?

ps. also we still haven't solved function hashing in general across putatively similar environments (say same versions of libraries installed in different operating system environments) - but that's a different challenge.

@satra
Copy link
Contributor Author

satra commented Oct 22, 2023

actually the register code by itself doesn't work. as the object is in a dict and perhaps there is no recursion there:

{'permute': True, 'model': (Pipeline(steps=[('std', StandardScaler()),
                ('MLPClassifier', MLPClassifier(alpha=1, max_iter=1000))]), [0, 2, 3], [1, 10, 12]), 
'gen_feature_importance': False, 
'_func': b'\x80\x05\x95-\x00\x00\x00\x00\x00\x00\x00\x8c\x0epydra_ml.tasks\x94\x8c\x16get_feature_importance\x94\x93\x94.'}

also i just injected that register in pydra_ml rather than pydra, which i think is the right thing to do.

@effigies
Copy link
Contributor

There should be recursion. We built it that way.

@satra
Copy link
Contributor Author

satra commented Oct 22, 2023

hmmm. this whole thing goes to hash_object

(Pdb) u
> /Users/satra/software/nipype/pydra/pydra/utils/hash.py(63)hash_function()
-> return hash_object(obj).hex()
(Pdb) obj
{'permute': True, 'model': (Pipeline(steps=[('std', StandardScaler()),
                ('MLPClassifier', MLPClassifier(alpha=1, max_iter=1000))]), [0, 2, 3, 4, 5, 6, 7, 8, 9, 11, 13, 16, 18, 19, 20, 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, 33, 34, 35, 36, 38, 39, 40, 41, 42, 43, 44, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 65, 67, 68, 69, 70, 72, 73, 74, 76, 77, 79, 80, 81, 82, 83, 84, 86, 87, 88, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 104, 105, 106, 109, 110, 111, 112, 113, 114, 115, 116, 117, 119, 120, 121, 122, 123, 124, 125, 126, 128, 129, 130, 131, 133, 135, 136, 137, 138, 139, 141, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 158, 160, 161, 162, 163, 164, 166, 167, 168, 169, 171, 173, 174, 176, 177, 178, 180, 181, 182, 183, 184, 185, 186, 187, 189, 190, 191, 192, 193, 195, 197, 198, 199, 200, 201, 202, 203, 204, 206, 207, 208, 209, 212, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 225, 226, 227, 228, 229, 230, 232, 234, 236, 237, 238, 240, 241, 242, 243, 244, 245, 246, 248, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 265, 266, 267, 269, 270, 271, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 299, 300, 302, 303, 304, 305, 306, 307, 309, 311, 312, 313, 314, 315, 316, 317, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 331, 332, 333, 334, 335, 336, 338, 339, 341, 342, 343, 344, 346, 347, 349, 351, 352, 355, 357, 359, 360, 361, 362, 363, 365, 366, 367, 368, 369, 370, 371, 373, 374, 375, 376, 377, 378, 379, 380, 381, 383, 384, 386, 387, 388, 390, 392, 393, 394, 395, 396, 397, 398, 399, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 415, 418, 419, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 433, 435, 436, 437, 438, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456, 459, 460, 461, 462, 464, 467, 469, 470, 472, 474, 475, 476, 477, 478, 479, 480, 481, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493, 494, 495, 496, 497, 498, 499, 501, 502, 503, 505, 506, 507, 508, 509, 510, 511, 513, 514, 517, 520, 521, 522, 523, 524, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 539, 540, 541, 542, 543, 544, 545, 547, 548, 549, 550, 551, 552, 553, 554, 555, 556, 557, 558, 559, 561, 563, 565, 568], [1, 10, 12, 14, 15, 17, 21, 31, 37, 45, 46, 64, 66, 71, 75, 78, 85, 89, 90, 102, 103, 107, 108, 118, 127, 132, 134, 140, 142, 157, 159, 165, 170, 172, 175, 179, 188, 194, 196, 205, 210, 211, 213, 224, 231, 233, 235, 239, 247, 249, 250, 263, 264, 268, 272, 283, 298, 301, 308, 310, 318, 319, 330, 337, 340, 345, 348, 350, 353, 354, 356, 358, 364, 372, 382, 385, 389, 391, 400, 401, 412, 413, 414, 416, 417, 420, 421, 432, 434, 439, 457, 458, 463, 465, 466, 468, 471, 473, 482, 500, 504, 512, 515, 516, 518, 519, 525, 538, 546, 560, 562, 564, 566, 567]), 'gen_feature_importance': False, '_func': b'\x80\x05\x95-\x00\x00\x00\x00\x00\x00\x00\x8c\x0epydra_ml.tasks\x94\x8c\x16get_feature_importance\x94\x93\x94.'}

@satra
Copy link
Contributor Author

satra commented Oct 22, 2023

this branch has the changes: nipype/pydra-ml#59

and i'm just running this test to work through the changes:

pytest --pdb pydra_ml/tests/test_classifier.py::test_classifier

@satra
Copy link
Contributor Author

satra commented Oct 22, 2023

actually nevermind. the env still seemed to have pydra 0.22. checking with 0.23.alpha now.

@satra
Copy link
Contributor Author

satra commented Oct 22, 2023

the issue persists.

@tclose
Copy link
Contributor

tclose commented Oct 22, 2023

I think we probably need to provide a better debugging experience when hashing fails. Has the registration worked properly or is there an error inside the registered function, @satra?

@satra
Copy link
Contributor Author

satra commented Oct 23, 2023

at least in pdb when i try to run the node it says UnhashableException and the place where it places the exception is the hash_object function which receives the entire dictionary object (see #717 (comment)).

since the Pipeline object is an input in other places, i do think the registration is working. i.e. i can make things not work by inserting something in the registration function.

@effigies
Copy link
Contributor

The code is:

def hash_object(obj: object) -> Hash:
"""Hash an object
Constructs a byte string that uniquely identifies the object,
and returns the hash of that string.
Base Python types are implemented, including recursive lists and
dicts. Custom types can be registered with :func:`register_serializer`.
"""
try:
return hash_single(obj, Cache({}))
except Exception as e:
raise UnhashableError(f"Cannot hash object {obj!r}") from e

With the raise ... from construct, the default behavior should be to see the chain of exceptions. Are we catching this exception somewhere else and re-raising it without context?

@effigies
Copy link
Contributor

@satra I can't reproduce. I'm getting an entirely different error on your PR:

$ pydra_ml/tests/test_classifier.py F
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> captured stderr >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
100%|██████████| 114/114 [00:00<00:00, 273.40it/s]
100%|██████████| 114/114 [00:00<00:00, 282.79it/s]
100%|██████████| 114/114 [00:00<00:00, 271.81it/s]
100%|██████████| 114/114 [00:00<00:00, 268.36it/s]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

tmpdir = local('/tmp/pytest-of-chris/pytest-10/test_classifier0')

    def test_classifier(tmpdir):
        clfs = [
            ("sklearn.neural_network", "MLPClassifier", {"alpha": 1, "max_iter": 1000}),
            [
                ["sklearn.impute", "SimpleImputer"],
                ["sklearn.preprocessing", "StandardScaler"],
                ["sklearn.naive_bayes", "GaussianNB", {}],
            ],
        ]
        csv_file = os.path.join(os.path.dirname(__file__), "data", "breast_cancer.csv")
        inputs = {
            "filename": csv_file,
            "x_indices": range(10),
            "target_vars": ("target",),
            "group_var": None,
            "n_splits": 2,
            "test_size": 0.2,
            "clf_info": clfs,
            "permute": [True, False],
            "gen_feature_importance": False,
            "gen_permutation_importance": False,
            "permutation_importance_n_repeats": 5,
            "permutation_importance_scoring": "accuracy",
            "gen_shap": True,
            "nsamples": 15,
            "l1_reg": "aic",
            "plot_top_n_shap": 16,
            "metrics": ["roc_auc_score", "accuracy_score"],
        }
        wf = gen_workflow(inputs, cache_dir=tmpdir)
>       results = run_workflow(wf, "cf", {"n_procs": 1})

pydra_ml/tests/test_classifier.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
pydra_ml/classifier.py:175: in run_workflow
    sub(runnable=wf)
../pydra-tmp/pydra/engine/submitter.py:42: in __call__
    self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
../../../mambaforge/envs/pydra-dev/lib/python3.11/asyncio/base_events.py:653: in run_until_complete
    return future.result()
../pydra-tmp/pydra/engine/submitter.py:71: in submit_from_call
    await self.expand_runnable(runnable, wait=True, rerun=rerun)
../pydra-tmp/pydra/engine/submitter.py:128: in expand_runnable
    await asyncio.gather(*futures)
../pydra-tmp/pydra/engine/helpers.py:586: in load_and_run_async
    await task._run(submitter=submitter, rerun=rerun, **kwargs)
../pydra-tmp/pydra/engine/core.py:1237: in _run
    result.output = self._collect_outputs()
../pydra-tmp/pydra/engine/core.py:1365: in _collect_outputs
    val_out = val.get_value(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = LazyOutField(name='feature_importance', field='feature_importance', type=pydra.engine.specs.StateArray[typing.List[typing.Any]], splits=frozenset({(('ml_wf.clf_info',), ('ml_wf.permute',))}), cast_from=None)
wf = <pydra.engine.core.Workflow object at 0x7f4873b5efd0>, state_index = None

    def get_value(
        self, wf: "pydra.Workflow", state_index: ty.Optional[int] = None
    ) -> ty.Any:
        """Return the value of a lazy field.
    
        Parameters
        ----------
        wf : Workflow
            the workflow the lazy field references
        state_index : int, optional
            the state index of the field to access
    
        Returns
        -------
        value : Any
            the resolved value of the lazy-field
        """
        from ..utils.typing import TypeParser  # pylint: disable=import-outside-toplevel
    
        node = getattr(wf, self.name)
        result = node.result(state_index=state_index)
        if result is None:
>           raise RuntimeError(
                f"Could not find results of '{node.name}' node in a sub-directory "
                f"named '{node.checksum}' in any of the cache locations:\n"
                + "\n".join(str(p) for p in set(node.cache_locations))
            )
E           RuntimeError: Could not find results of 'feature_importance' node in a sub-directory named 'FunctionTask_553735ecdd5564cc0b0913c68a4fa342' in any of the cache locations:
E           /tmp/pytest-of-chris/pytest-10/test_classifier0

../pydra-tmp/pydra/engine/specs.py:1012: RuntimeError

Can move this comment to nipype/pydra-ml#59 if you'd prefer.

@satra
Copy link
Contributor Author

satra commented Oct 23, 2023

that's the error, and if you pdb it, move up one slot in the stack and try to run the node variable node(), it gives an unhashableexception error. the fact that it can't find the results suggests something is off with hashing. we could move the comment there, but that test works with 0.22 and doesn't even with the changes in that PR with 0.23. i released version 0.6 yesterday pinning to 0.22. so your call where we should discuss it.

@effigies
Copy link
Contributor

I really can't reproduce this:

>>>>>>>>>>>>>>>>>>>>>>> PDB post_mortem (IO-capturing turned off) >>>>>>>>>>>>>>>>>>>>>>>>
> /home/chris/Projects/nipype/pydra-tmp/pydra/engine/specs.py(1012)get_value()
-> raise RuntimeError(
(Pdb) node.checksum
'FunctionTask_5223246ff6af31c41f6fd9a6373f0f6e'
(Pdb) node.result()
(Pdb) node()
[Result(output=Output(feature_importance=[]), runtime=None, errored=False), Result(output=Output(feature_importance=[]), runtime=None, errored=False)]
(Pdb) node.result()
[Result(output=Output(feature_importance=[]), runtime=None, errored=False), Result(output=Output(feature_importance=[]), runtime=None, errored=False)]

Attempting to look at node() from any other stack level failed. I don't know why it's failing, I don't know why I'm getting something different from you. I've tried with the current master and with 0.23a. I can run hash_function(node.inputs.model):

(Pdb) hash_function(node.inputs.model)
'7fff85b269abee56eed987e50e4d203f'
(Pdb) node.inputs.model
StateArray((Pipeline(steps=[('std', StandardScaler()),
                ('MLPClassifier', MLPClassifier(alpha=1, max_iter=1000))]), [0, 2, 3, 4, 5, 6, 7, 8, 9, 11, 13, 16, 18, 19, 20, 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, 33, 34, 35, 36, 38, 39, 40, 41, 42, 43, 44, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 65, 67, 68, 69, 70, 72, 73, 74, 76, 77, 79, 80, 81, 82, 83, 84, 86, 87, 88, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 104, 105, 106, 109, 110, 111, 112, 113, 114, 115, 116, 117, 119, 120, 121, 122, 123, 124, 125, 126, 128, 129, 130, 131, 133, 135, 136, 137, 138, 139, 141, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 158, 160, 161, 162, 163, 164, 166, 167, 168, 169, 171, 173, 174, 176, 177, 178, 180, 181, 182, 183, 184, 185, 186, 187, 189, 190, 191, 192, 193, 195, 197, 198, 199, 200, 201, 202, 203, 204, 206, 207, 208, 209, 212, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 225, 226, 227, 228, 229, 230, 232, 234, 236, 237, 238, 240, 241, 242, 243, 244, 245, 246, 248, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 265, 266, 267, 269, 270, 271, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 299, 300, 302, 303, 304, 305, 306, 307, 309, 311, 312, 313, 314, 315, 316, 317, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 331, 332, 333, 334, 335, 336, 338, 339, 341, 342, 343, 344, 346, 347, 349, 351, 352, 355, 357, 359, 360, 361, 362, 363, 365, 366, 367, 368, 369, 370, 371, 373, 374, 375, 376, 377, 378, 379, 380, 381, 383, 384, 386, 387, 388, 390, 392, 393, 394, 395, 396, 397, 398, 399, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 415, 418, 419, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 433, 435, 436, 437, 438, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456, 459, 460, 461, 462, 464, 467, 469, 470, 472, 474, 475, 476, 477, 478, 479, 480, 481, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493, 494, 495, 496, 497, 498, 499, 501, 502, 503, 505, 506, 507, 508, 509, 510, 511, 513, 514, 517, 520, 521, 522, 523, 524, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 539, 540, 541, 542, 543, 544, 545, 547, 548, 549, 550, 551, 552, 553, 554, 555, 556, 557, 558, 559, 561, 563, 565, 568], [1, 10, 12, 14, 15, 17, 21, 31, 37, 45, 46, 64, 66, 71, 75, 78, 85, 89, 90, 102, 103, 107, 108, 118, 127, 132, 134, 140, 142, 157, 159, 165, 170, 172, 175, 179, 188, 194, 196, 205, 210, 211, 213, 224, 231, 233, 235, 239, 247, 249, 250, 263, 264, 268, 272, 283, 298, 301, 308, 310, 318, 319, 330, 337, 340, 345, 348, 350, 353, 354, 356, 358, 364, 372, 382, 385, 389, 391, 400, 401, 412, 413, 414, 416, 417, 420, 421, 432, 434, 439, 457, 458, 463, 465, 466, 468, 471, 473, 482, 500, 504, 512, 515, 516, 518, 519, 525, 538, 546, 560, 562, 564, 566, 567]), (Pipeline(steps=[('std', StandardScaler()),
                ('MLPClassifier', MLPClassifier(alpha=1, max_iter=1000))]), [0, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 15, 16, 17, 19, 20, 21, 22, 24, 25, 26, 28, 29, 30, 33, 35, 37, 38, 39, 40, 42, 43, 45, 46, 47, 48, 49, 50, 51, 54, 55, 58, 59, 60, 61, 62, 65, 68, 71, 72, 73, 74, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 87, 88, 89, 90, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 161, 162, 163, 164, 165, 166, 167, 169, 170, 171, 172, 173, 174, 175, 177, 178, 179, 180, 181, 182, 183, 184, 185, 187, 188, 189, 190, 191, 193, 194, 195, 196, 197, 198, 199, 200, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 228, 229, 230, 231, 233, 234, 235, 236, 237, 238, 239, 241, 242, 243, 246, 247, 248, 249, 250, 252, 253, 254, 255, 257, 258, 260, 261, 262, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 278, 279, 281, 283, 284, 285, 286, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 299, 301, 302, 304, 306, 307, 308, 309, 310, 312, 313, 314, 316, 317, 318, 319, 320, 321, 322, 323, 325, 326, 327, 328, 329, 330, 331, 332, 333, 337, 338, 339, 340, 341, 343, 344, 345, 346, 348, 349, 350, 352, 353, 354, 355, 356, 357, 358, 361, 362, 363, 364, 365, 367, 368, 369, 370, 371, 372, 373, 375, 377, 378, 379, 380, 381, 382, 383, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 400, 401, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 416, 418, 419, 421, 424, 425, 429, 430, 431, 432, 433, 435, 437, 438, 439, 441, 442, 446, 447, 449, 450, 451, 453, 454, 455, 459, 462, 463, 464, 465, 466, 467, 470, 471, 472, 473, 474, 475, 477, 479, 481, 482, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493, 494, 496, 497, 498, 499, 501, 502, 503, 505, 506, 507, 508, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519, 520, 521, 522, 523, 525, 527, 528, 529, 530, 531, 533, 535, 537, 538, 541, 542, 543, 544, 545, 546, 547, 548, 549, 550, 551, 552, 553, 554, 556, 559, 561, 563, 564, 566, 568], [1, 4, 14, 18, 23, 27, 31, 32, 34, 36, 41, 44, 52, 53, 56, 57, 63, 64, 66, 67, 69, 70, 75, 86, 91, 107, 108, 124, 137, 160, 168, 176, 186, 192, 201, 202, 227, 232, 240, 244, 245, 251, 256, 259, 263, 277, 280, 282, 287, 298, 300, 303, 305, 311, 315, 324, 334, 335, 336, 342, 347, 351, 359, 360, 366, 374, 376, 384, 399, 402, 415, 417, 420, 422, 423, 426, 427, 428, 434, 436, 440, 443, 444, 445, 448, 452, 456, 457, 458, 460, 461, 468, 469, 476, 478, 480, 495, 500, 504, 509, 524, 526, 532, 534, 536, 539, 540, 555, 557, 558, 560, 562, 565, 567]))

@satra
Copy link
Contributor Author

satra commented Oct 23, 2023

thanks @effigies for trying. i also don't know why we are seeing different outcomes. i'll dig in some more tomorrow morning, but you are getting the error that it couldn't find the results right? i get that too and then when i try to run the node, it crashes. i also don't know why that error surfaces only with 0.23. if you have a 0.22 + 0.6 that error doesn't surface. however, a lot has changed between 0.22 and 0.23.

@effigies
Copy link
Contributor

you are getting the error that it couldn't find the results right?

Correct, but running node() in the debugger seems to work. So at a guess what's happening is that we're getting one hash when the node are run (with split state) and a different one after combining. Or possibly we are failing to perform a re-run that aggregates the results of the two sub-tasks before attempting to look up the aggregated result.

@djarecka
Copy link
Collaborator

djarecka commented Oct 23, 2023

@satra - I can try to reproduce and debug as well, but can you write your python version. and you're running this locally on your osx, right?

@satra
Copy link
Contributor Author

satra commented Oct 23, 2023

i have tried with python 3.10 and 3.11 (same errors) on osx 14 (m1 chip).

@djarecka
Copy link
Collaborator

@satra - do you expect wf.fit_clf.lzout.model to be the same with each run?

@satra
Copy link
Contributor Author

satra commented Oct 26, 2023

i don't think so. it's based on the clf_info (a split) and the split indices (a nested split).

@djarecka
Copy link
Collaborator

so it has a random component?

@satra
Copy link
Contributor Author

satra commented Oct 26, 2023 via email

@djarecka
Copy link
Collaborator

yes, of course, sorry, didn't think long enough about the workflow and your answer... for moment I got confused why my checksums keep changing...

@effigies
Copy link
Contributor

Would it help debugging to add a random seed that could be fixed?

@djarecka
Copy link
Collaborator

djarecka commented Nov 7, 2023

I think it's not using random seed and yet still is changing

@djarecka
Copy link
Collaborator

djarecka commented Nov 7, 2023

@tclose - I've created this branch for testing: https://github.com/djarecka/pydra-ml/tree/newpydra_test

I removed some tasks that are not needed to get the error and also simplified the spliters part. I kept the commented part just so it's easy to see what has been removed from the original code. The error I'm getting is:

E           RuntimeError: Could not find results of 'feature_importance' node in a sub-directory named 'FunctionTask_3384430a0229b2b6770771fae01902aa' in any of the cache locations:

I removed the connections to the following nodes, but I kept ("feature_importance", wf.feature_importance.lzout.feature_importance), from the workflow output to have the error, since this is a problem after the task is run, when the results have to be collected.

You can see that that this task changes hash not only between the time it is run and the results have to be collected, but it is different every time the workflow is run. I understand that since random_state is set to 0 this should not be the case.

@djarecka
Copy link
Collaborator

djarecka commented Nov 21, 2023

@tclose , @satra : it looks like everything is fine if I remove from the pipeline ["sklearn.impute", "SimpleImputer"],, i.e. remove this line
Any idea why this element of the pipeline can cause the problem?

@satra
Copy link
Contributor Author

satra commented Nov 21, 2023

it's perhaps a function of how it gets checksummed. a Pipeline object is a composable object. but pydra has no direct control of what that pipeline object looks like. that's completely a user specification. but i don't know why and it wouldn't be a solution for this particular problem. i'm assuming you are testing in new cache locations each time. you could also try sending a deepcopy of the Pipeline object in case somehow that object is not threadsafe for some reason.

@djarecka
Copy link
Collaborator

yes, I'm testing in new locations.

but just to be clear, I still keep other things in the pipeline... At the beginning I thought that the problem is just with Pipeline object, but since it sometimes worked I just started removing some elements...

I'm wondering what should be the best solution for hashing the pipeline?

@tclose
Copy link
Contributor

tclose commented Feb 22, 2024

Just picking this up now, what do these Pipeline objects look like @satra? When you say that they are supplied by the user, are they completely arbitrary? Where do the pipelines in the test come from?

If they are arbitrary and have stochastic elements then there isn't much that we can do is there? We need the user to supply a hasher that can pick out the stable attributes and make a hash from them, don't we?

@satra
Copy link
Contributor Author

satra commented Feb 22, 2024

@tclose - the original hash (sha256(str(obj).encode()).hexdigest()) should work on Pipeline as the string representation of a Pipeline is quite deterministic. see: #717 (comment)

hence the question of how to associate that hashing function with the Pipeline object. even @djarecka's experiments could be subsumed if i could associate that hash and in changes in imputer objects should not have an effect. but in general it doesn't seem to be calling the registered function whenever a Pipeline object is present.

@satra
Copy link
Contributor Author

satra commented Feb 22, 2024

more generally, debugging improvements could help here.

@tclose
Copy link
Contributor

tclose commented Feb 23, 2024

more generally, debugging improvements could help here.

I have implemented debugging improvements on this issue in #698, which detect that the hash actually changes during the execution of the task. I dug into it with the debugger and can confirm that your Pipeline bytes_repr method is being used @satra.

I have narrowed things down a bit and it is a bit strange. It turns out that the hash of a Pipeline with a SimpleImputer step isn't stable between deepcopies of the Pipeline object, i.e. hash_function(pipeline) != hash_function(deepcopy(pipeline)). This is despite cloudpickle being used to create the hashes. Further to this pipeline != deepcopy(pipeline), which seems a little unfair.

When a task is run a deepcopy of the inputs is stored away to guard against inner states of objects being updated while the task runs, however, this assumes that the hash is stable across deepcopies. What would cause a deepcopy to be different from the original?... (will keep digging)

@tclose
Copy link
Contributor

tclose commented Feb 23, 2024

more generally, debugging improvements could help here.

I have implemented debugging improvements on this issue in #698, which detect that the hash actually changes during the execution of the task. I dug into it with the debugger and can confirm that your Pipeline bytes_repr method is being used @satra.

I have narrowed things down a bit and it is a bit strange. It turns out that the hash of a Pipeline with a SimpleImputer step isn't stable between deepcopies of the Pipeline object, i.e. hash_function(pipeline) != hash_function(deepcopy(pipeline)). This is despite cloudpickle being used to create the hashes. Further to this pipeline != deepcopy(pipeline), which seems a little unfair.

When a task is run a deepcopy of the inputs is stored away to guard against inner states of objects being updated while the task runs, however, this assumes that the hash is stable across deepcopies. What would cause a deepcopy to be different from the original?... (will keep digging)

Actually, the deepcopy inequality is just because the Imputer classes don't implement an __eq__ method. Not sure why the cloudpickle would be different though, I was thinking there might a function being set as an attribute somewhere but couldn't find it in the scikit-learn code anywhere.

To work with these "poorly behaved" types in general, perhaps what we should do is replace the inputs with the deepcopied version for the task run, and then replace them with the original afterwards. We could also perhaps check to see if the deepcopied version doesn't equal the original and raise a warning.

@tclose
Copy link
Contributor

tclose commented Feb 23, 2024

In that branch I have changed the way that TaskBase._modify_inputs() is defined so the actual original inputs are returned instead of a deepcopy and that resolves the initial problem you ran into. However, there is another deepcopy in TaskBase.checksum_states() which causes problems with a downstream node, and it is a bit too late at night here to start trying to unpick it. Can we get away with using the original objects instead of a deepcopy of them?

See

pydra/pydra/engine/core.py

Lines 271 to 314 in 1720ba6

def checksum_states(self, state_index=None):
"""
Calculate a checksum for the specific state or all of the states of the task.
Replaces lists in the inputs fields with a specific values for states.
Used to recreate names of the task directories,
Parameters
----------
state_index :
TODO
"""
if is_workflow(self) and self.inputs._graph_checksums is attr.NOTHING:
self.inputs._graph_checksums = [nd.checksum for nd in self.graph_sorted]
if state_index is not None:
inputs_copy = deepcopy(self.inputs)
for key, ind in self.state.inputs_ind[state_index].items():
val = self._extract_input_el(
inputs=inputs_copy, inp_nm=key.split(".")[1], ind=ind
)
setattr(inputs_copy, key.split(".")[1], val)
# setting files_hash again in case it was cleaned by setting specific element
# that might be important for outer splitter of input variable with big files
# the file can be changed with every single index even if there are only two files
input_hash = inputs_copy.hash
if is_workflow(self):
con_hash = hash_function(self._connections)
# TODO: hash list is not used
hash_list = [input_hash, con_hash] # noqa: F841
checksum_ind = create_checksum(
self.__class__.__name__, self._checksum_wf(input_hash)
)
else:
checksum_ind = create_checksum(self.__class__.__name__, input_hash)
return checksum_ind
else:
checksum_list = []
if not hasattr(self.state, "inputs_ind"):
self.state.prepare_states(self.inputs, cont_dim=self.cont_dim)
self.state.prepare_inputs()
for ind in range(len(self.state.inputs_ind)):
checksum_list.append(self.checksum_states(state_index=ind))
return checksum_list

@tclose
Copy link
Contributor

tclose commented Feb 23, 2024

Actually, couldn't help myself and had another look and if I'm reading it right I think a straight copy should be sufficient instead of a deepcopy. So I have pushed those changes to my branch and the test seems to get past that point and error somewhere else

@satra
Copy link
Contributor Author

satra commented Feb 23, 2024

@tclose - thanks so much for digging into this. setting the deepcopy issue aside, if you use the original hasher sha256(str(obj).encode()).hexdigest() instead of the cloudpickle one, does it work. i'm hoping str(deepcopy(obj)) == str(obj) in this case. and this would be a situation where the repr of the Pipeline object is sufficient for a check.

to the question of deepcopies inside pydra, we did not want the functions wrapped by tasks changing inputs, which many functions are prone to doing, especially for mutable things like dictionaries and lists. and since split/combine also operate on the inputs and could get arbitrary inputs and functions, deepcopies seemed safest to prevent the mutation. i.e. functions can do whatever they want, but we send into a function cannot be mutated in pydra.

@tclose
Copy link
Contributor

tclose commented Feb 24, 2024

@tclose - thanks so much for digging into this. setting the deepcopy issue aside, if you use the original hasher sha256(str(obj).encode()).hexdigest() instead of the cloudpickle one, does it work. i'm hoping str(deepcopy(obj)) == str(obj) in this case. and this would be a situation where the repr of the Pipeline object is sufficient for a check.

to the question of deepcopies inside pydra, we did not want the functions wrapped by tasks changing inputs, which many functions are prone to doing, especially for mutable things like dictionaries and lists. and since split/combine also operate on the inputs and could get arbitrary inputs and functions, deepcopies seemed safest to prevent the mutation. i.e. functions can do whatever they want, but we send into a function cannot be mutated in pydra.

Hi @satra, I have done that and created a PR onto your PR nipype/pydra-ml#61. Both tests pass now. Note that the tests also pass using the cloudpickle bytes_repr if you use my #698, Pydra branch once you add the other change in that PR.

@satra
Copy link
Contributor Author

satra commented Feb 24, 2024

thanks @tclose for helping debug this.

@satra satra closed this as completed Feb 24, 2024
@satra
Copy link
Contributor Author

satra commented Feb 24, 2024

regarding the deepcopy changes, i'll leave it to you and other developers to review and add.

@djarecka
Copy link
Collaborator

thanks @tclose for finding and fixing the issue! the deepcopy issue is concerning...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants