Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1622: Fix issues with nested context manager calls #1626

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion frictionless/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def analyze_table_resource(
# Iterate rows
columns_data: Dict[str, List[Any]] = {}
numeric = ["integer", "numeric", "number"]
with resource:
# Use a copy of the resource to avoid side effects (see #1622)
with resource.to_copy() as resource:
for row in resource.row_stream:
null_columns = 0
for field_name in row:
Expand Down
3 changes: 2 additions & 1 deletion frictionless/formats/csv/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def write_row_stream(self, source: TableResource):
"wt", delete=False, encoding=self.resource.encoding, newline=""
) as file:
writer = csv.writer(file, **options) # type: ignore
with source:
# Use a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
if self.resource.dialect.header:
writer.writerow(source.schema.field_names)
for row in source.row_stream:
Expand Down
3 changes: 2 additions & 1 deletion frictionless/formats/excel/parsers/xls.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ def write_row_stream(self, source: TableResource):
if isinstance(title, int):
title = f"Sheet {control.sheet}"
sheet = book.add_sheet(title)
with source:
# Write from a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
if self.resource.dialect.header:
for field_index, name in enumerate(source.schema.field_names):
sheet.write(0, field_index, name)
Expand Down
3 changes: 2 additions & 1 deletion frictionless/formats/excel/parsers/xlsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ def write_row_stream(self, source: TableResource):
if isinstance(title, int):
title = f"Sheet {control.sheet}"
sheet = book.create_sheet(title)
with source:
# Write from a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
if self.resource.dialect.header:
sheet.append(source.schema.field_names)
for row in source.row_stream:
Expand Down
3 changes: 2 additions & 1 deletion frictionless/formats/gsheets/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def write_row_stream(self, source: TableResource):
sh = gc.open_by_key(key)
wks = sh.worksheet_by_id(gid) if gid else sh[0] # type: ignore
data: List[Any] = []
with source:
# Use a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
data.append(source.schema.field_names)
for row in source.row_stream:
data.append(row.to_list())
Expand Down
3 changes: 2 additions & 1 deletion frictionless/formats/html/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def read_cell_stream_create(self) -> types.ICellStream:
# It will give us an ability to support HtmlDialect
def write_row_stream(self, source: TableResource):
html = "<html><body><table>\n"
with source:
# Use a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
html += "<tr>"
for name in source.schema.field_names:
html += f"<td>{name}</td>"
Expand Down
3 changes: 2 additions & 1 deletion frictionless/formats/inline/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def read_cell_stream_create(self): # type: ignore
def write_row_stream(self, source: TableResource):
data: List[Any] = []
control = InlineControl.from_dialect(self.resource.dialect)
with source:
# Write from a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
if self.resource.dialect.header and not control.keyed:
data.append(source.schema.field_names)
for row in source.row_stream:
Expand Down
3 changes: 2 additions & 1 deletion frictionless/formats/json/parsers/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def read_cell_stream_create(self) -> types.ICellStream:
def write_row_stream(self, source: TableResource):
data: List[Any] = []
control = JsonControl.from_dialect(self.resource.dialect)
with source:
# Write from a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
if self.resource.dialect.header and not control.keyed:
data.append(source.schema.field_names)
for row in source.row_stream:
Expand Down
3 changes: 2 additions & 1 deletion frictionless/formats/json/parsers/jsonl.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def write_row_stream(self, source: TableResource):
control = JsonControl.from_dialect(self.resource.dialect)
with tempfile.NamedTemporaryFile(delete=False) as file:
writer = platform.jsonlines.Writer(file)
with source:
# Write from a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
if self.resource.dialect.header and not control.keyed:
writer.write(source.schema.field_names)
for row in source.row_stream:
Expand Down
7 changes: 4 additions & 3 deletions frictionless/formats/ods/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,16 @@ def write_row_stream(self, source: TableResource):
file.close()
book = platform.ezodf.newdoc(doctype="ods", filename=file.name)
title = f"Sheet {control.sheet}"
# Get size
with source:
# Get size. Use a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
row_size = 1
col_size = len(source.schema.fields)
for _ in source.row_stream:
row_size += 1
book.sheets += platform.ezodf.Sheet(title, size=(row_size, col_size))
sheet = book.sheets[title]
with source:
# Write from a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
if self.resource.dialect.header:
for field_index, name in enumerate(source.schema.field_names):
sheet[(0, field_index)].set_value(name)
Expand Down
3 changes: 2 additions & 1 deletion frictionless/formats/pandas/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ def write_row_stream(self, source: TableResource):
data_rows: List[Tuple[Any]] = []
index_rows: List[Tuple[Any]] = []
fixed_types = {}
with source:
# Use a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
for row in source.row_stream:
data_values: List[Any] = []
index_values: List[Any] = []
Expand Down
3 changes: 2 additions & 1 deletion frictionless/formats/qsv/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def read_schema(self, resource: Resource) -> Schema:
command = [self.qsv_path, "stats", "--infer-dates", "--dates-whitelist", "all"]
process = sp.Popen(command, stdout=sp.PIPE, stdin=sp.PIPE)
# TODO: Use FileResource here (or future resource.stream_bytes())
with resource:
# Use a copy of the resource to avoid side effects (see #1622)
with resource.to_copy() as resource:
while True:
chunk = resource.read_bytes(size=BLOCK_SIZE)
if not chunk:
Expand Down
6 changes: 4 additions & 2 deletions frictionless/formats/spss/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def write_row_stream(self, source: TableResource):

# Write rows
with sav.SavWriter(self.resource.normpath, ioUtf8=True, **spss_schema) as writer: # type: ignore
with source:
# Use a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
for row in source.row_stream: # type: ignore
cells: List[Any] = []
for field in source.schema.fields: # type: ignore
Expand Down Expand Up @@ -130,7 +131,8 @@ def __write_convert_schema(self, source: TableResource):
"varTypes": {},
"formats": {},
}
with source:
# Use a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
# Add fields
sizes: Dict[str, int] = {}
mapping = self.__write_convert_type()
Expand Down
3 changes: 2 additions & 1 deletion frictionless/formats/sql/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ def write_package(self, package: Package):
for table in self.metadata.sorted_tables:
if package.has_table_resource(table.name):
resource = package.get_table_resource(table.name)
with resource:
# Use a copy of the resource to avoid side effects (see #1622)
with resource.to_copy() as resource:
self.write_row_stream(resource.row_stream, table_name=table.name)
return models.PublishResult(
url=self.engine.url.render_as_string(hide_password=True),
Expand Down
3 changes: 2 additions & 1 deletion frictionless/formats/sql/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def write_row_stream(self, source: TableResource):
adapter = SqlAdapter(engine, control=control)
if not adapter:
raise FrictionlessException(f"Not supported source: {self.resource.normpath}")
with source:
# Write from a copy to prevent side effects (see #1622)
with source.to_copy() as source:
adapter.write_schema(source.schema, table_name=control.table)
adapter.write_row_stream(source.row_stream, table_name=control.table)
3 changes: 2 additions & 1 deletion frictionless/formats/yaml/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def read_cell_stream_create(self) -> types.ICellStream:
def write_row_stream(self, source: TableResource):
data: List[Any] = []
control = YamlControl.from_dialect(self.resource.dialect)
with source:
# Write from a copy of the source to avoid side effects (see #1622)
with source.to_copy() as source:
if self.resource.dialect.header and not control.keyed:
data.append(source.schema.field_names)
for row in source.row_stream:
Expand Down
60 changes: 34 additions & 26 deletions frictionless/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,24 @@ def __attrs_post_init__(self):

def index(self) -> Optional[Report]:
self.prepare_resource()
with self.resource:
# Index is resouce-based operation not supporting FKs
if self.resource.schema.foreign_keys:
self.resource.schema.foreign_keys = []
self.create_table()
while True:
try:
return self.populate_table()
except Exception:
if self.fast and self.use_fallback:
self.fast = False
continue
self.delete_table()
raise

# Infer resource if needed
if self.resource.closed:
self.resource.infer()

# Index is resouce-based operation not supporting FKs
if self.resource.schema.foreign_keys:
self.resource.schema.foreign_keys = []
self.create_table()
while True:
try:
return self.populate_table()
except Exception:
if self.fast and self.use_fallback:
self.fast = False
continue
self.delete_table()
raise

def prepare_resource(self):
if self.qsv_path:
Expand Down Expand Up @@ -108,25 +112,29 @@ def populate_table_fast_sqlite(self):
sql_command = f".import '|cat -' \"{self.table_name}\""
command = ["sqlite3", "-csv", self.adapter.engine.url.database, sql_command]
process = subprocess.Popen(command, stdin=PIPE, stdout=PIPE)
for line_number, line in enumerate(self.resource.byte_stream, start=1):
if line_number > 1:
process.stdin.write(line) # type: ignore
self.report_progress(f"{self.resource.stats.bytes} bytes")
# Iterate over a copy of the resouce to avoid side effects (see #1622)
with self.resource.to_copy() as resource:
for line_number, line in enumerate(resource.byte_stream, start=1):
if line_number > 1:
process.stdin.write(line) # type: ignore
self.report_progress(f"{self.resource.stats.bytes} bytes")
process.stdin.close() # type: ignore
process.wait()

def populate_table_fast_postgresql(self):
database_url = self.adapter.engine.url.render_as_string(hide_password=False)
with platform.psycopg.connect(database_url) as connection:
with connection.cursor() as cursor:
query = 'COPY "%s" FROM STDIN CSV HEADER' % self.table_name
with cursor.copy(query) as copy: # type: ignore
while True:
chunk = self.resource.read_bytes(size=settings.BLOCK_SIZE)
if not chunk:
break
copy.write(chunk)
self.report_progress(f"{self.resource.stats.bytes} bytes")
# Iterate over a copy of the resouce to avoid side effects (see #1622)
with self.resource.to_copy() as resource:
query = 'COPY "%s" FROM STDIN CSV HEADER' % self.table_name
with cursor.copy(query) as copy: # type: ignore
while True:
chunk = resource.read_bytes(size=settings.BLOCK_SIZE)
if not chunk:
break
copy.write(chunk)
self.report_progress(f"{self.resource.stats.bytes} bytes")

def delete_table(self):
self.adapter.delete_resource(self.table_name)
Expand Down
52 changes: 50 additions & 2 deletions frictionless/resource/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def __attrs_post_init__(self):
# Internal
self.__loader: Optional[Loader] = None
self.__buffer: Optional[types.IBuffer] = None
self.__context_manager_entered: bool = False

# Detect resource
system.detect_resource(self)
Expand All @@ -257,11 +258,58 @@ def __attrs_post_init__(self):
# TODO: shall we guarantee here that it's at the beginning for the file?
# TODO: maybe it's possible to do type narrowing here?
def __enter__(self):
if self.closed:
self.open()
"""
Enters a context manager for the resource.
We need to be careful with contexts because they open and close the Resource
(and thus any underlying files) and we don't want to close a file that is
being used somewhere higher up the call stack.

e.g. if nested contexts were allowed then:

with Resource("in.csv") as resource:
with resource:
# use resource
resource.write("out.csv")

would result in errors because the second context would close the file
before the write happened. While the above code is obvious, similar
things can happen when composing steps in pipelines, calling petl code etc.
where the various functions may have no knowledge of each other.
See #1622 for more details.

So we only allow a single context to be open at a time, and raise an
exception if nested context is attempted. For similar reasons, we
also raise an exception if a context is attempted on an open resource.

The above code can be successfully written as:

with Resource("in.csv") as resource:
with resource.to_copy() as resource2:
use resource2:
resource.write("out.csv")

which keeps resource and resource2 as independent views on the same file.

Note that if you absolutely need to use a resource in a manner where you
don't care if it is "opened" multiple times and closed once then you
can directly use `open()` and `close()` but you also become responsible
for ensuring the file is closed at the correct time.
"""
if self.__context_manager_entered:
note = "Resource has previously entered a context manager (`with` statement) and does not support nested contexts. To use in a nested context use `to_copy()` then use the copy in the `with`."
raise FrictionlessException(note)
if not self.closed:
note = "Resource is currently open, and cannot be used in a `with` statement (which would reopen the file). To use `with` on an open Resouece, use to_copy() then use the copy in the `with`."
raise FrictionlessException(note)

self.__context_manager_entered = True

self.open()
return self

def __exit__(self, type, value, traceback): # type: ignore
# Mark the context manager as closed so that sequential contexts are allowed.
self.__context_manager_entered = False
self.close()

@property
Expand Down
10 changes: 7 additions & 3 deletions frictionless/resources/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ def __open_lookup(self):
self.__lookup[source_name][source_key] = set()
if not source_res:
continue
with source_res:
# Iterate on a copy to avoid side effects (see #1622)
with source_res.to_copy() as source_res:
for row in source_res.row_stream: # type: ignore
cells = tuple(row.get(field_name) for field_name in source_key) # type: ignore
if set(cells) == {None}: # type: ignore
Expand Down Expand Up @@ -641,12 +642,15 @@ def from_petl(view: Any, **options: Any):

def to_petl(self, normalize: bool = False):
"""Export resource as a PETL table"""
resource = self.to_copy()
# Store a copy of self to avoid side effects (see #1622)
self_copy = self.to_copy()

# Define view
class ResourceView(platform.petl.Table): # type: ignore
def __iter__(self): # type: ignore
with resource:
# Iterate over a copy of the resource so that each instance of the iterator is independent (see #1622)
# If we didn't do this, then different iterators on the same table would interfere with each other.
with self_copy.to_copy() as resource:
if normalize:
yield resource.schema.field_names
yield from (row.to_list() for row in resource.row_stream)
Expand Down
5 changes: 3 additions & 2 deletions frictionless/steps/table/table_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ def transform_resource(self, resource: Resource):

# Data
def data(): # type: ignore
with current:
for row in current.row_stream: # type: ignore
# Use a copy of the source to avoid side effects (see #1622)
with current.to_copy() as current_copy:
for row in current_copy.row_stream: # type: ignore
self.function(row) # type: ignore
yield row

Expand Down
5 changes: 3 additions & 2 deletions frictionless/steps/table/table_normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ class table_normalize(Step):
# Transform

def transform_resource(self, resource: Resource):
current = resource.to_copy()
resource_copy = resource.to_copy()

# Data
def data(): # type: ignore
with current:
# Yield from a copy to avoid side effects (see #1622)
with resource_copy.to_copy() as current:
yield current.header.to_list() # type: ignore
for row in current.row_stream: # type: ignore
yield row.to_list() # type: ignore
Expand Down
Loading