As of the moment, we have this query to retrieve the zip files containing the delta FF data since the last full batch. However, not every zip file contains the lipper ids that we want to target. Therefore, we will need to create a new index that allows us to query only the files that are relevant to the lipper ids provided.
- update the lambda function to read the incoming delta files content.
The file names have this format
flow_feed_004875.xml, while the content would look like
<FlowsHistory Id="40301418">
...content
</FlowsHistory>
<FlowsHistory Id="40301419">
...content
</FlowsHistory>
Where the ids are lipper ids. We are interested in the list of lipper ids in each file.
with the list of the lipper ids from each delta file, insert new file records for the delta files. with 1 extra attribute:LGFFDeltaLipperIds, type is a List, containing all the lipper ids that the delta file contains.
Look at a sample in staging table
[{'LGFFDeltaLipperIds': {'L': [{'S': '40301418'}, {'S': '40301419'}]},
'FileID': {'S': 'flow_feed_004875'},
'FileDir': {'S': 'test _dir'}}]
Update the query used in ff to filter the delta files that contain the lipper ids provided. we should use FilterExpression to see if LGFFDeltaLipperIds contains the lipper id that we look for. Example query:
In [21]: client.query(
...: TableName=table_name,
...: KeyConditionExpression='FileID = :file_id',
...: FilterExpression="contains(LGFFDeltaLipperIds, :lipper_id)",
...: ExpressionAttributeValues={
...: ':file_id': {'S': file_id},
...: ':lipper_id': {'S': '40301418'}
...: }
...: )['Items']
Update the process TargetedLipperFundFlowsDeltaImporter to use the new filter process.
Test in staging with a few new records to see if we can insert the records with the new index column and retrieve the records based on the lipper ids provided.