Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update to dozer 0.2.1 #11

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ user = IngestRequest(
schema_name="users",
typ=0,
old=None,
new=Record(values=[Value(int_value=1), Value(string_value="vivek")]),
new=[Value(int_value=1), Value(string_value="vivek")],
seq_no=1
)
ingestor.ingest(user)
Expand Down
10 changes: 3 additions & 7 deletions protos/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,8 @@ message CountResponse {

// Request for `OnEvent`.
message OnEventRequest {
// The event type to subscribe to.
dozer.types.EventType type = 1;
// The name of the endpoint to subscribe to.
string endpoint = 2;
// JSON filter string.
optional string filter = 3;
// The endpoints to subscribe to. Key is the endpoint name, value is the filter.
map<string, dozer.types.EventFilter> endpoints = 1;
}

// Request for `getFields`.
Expand All @@ -81,7 +77,7 @@ message QueryResponse {
// The list of field definitions.
repeated dozer.types.FieldDefinition fields = 1;
// The list of record data.
repeated dozer.types.RecordWithId records = 2;
repeated dozer.types.Record records = 2;
}

// Request for `getEndpoints`.
Expand Down
4 changes: 2 additions & 2 deletions protos/ingest.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ message IngestRequest {
// The operation type.
dozer.types.OperationType typ = 2;
// Old record data, only applicable for UPDATE type.
optional dozer.types.Record old = 3;
repeated dozer.types.Value old = 3;
// New record data.
dozer.types.Record new = 4;
repeated dozer.types.Value new = 4;

uint32 seq_no = 5;
}
Expand Down
33 changes: 22 additions & 11 deletions protos/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ enum EventType {
DELETE_ONLY = 3; // Only DELETE events.
}

// Event filter.
message EventFilter {
// The event type to subscribe to.
dozer.types.EventType type = 1;
// JSON filter string.
optional string filter = 3;
}

// The event types.
enum OperationType {
INSERT = 0; // INSERT operation.
Expand All @@ -28,8 +36,6 @@ message Operation {
optional Record old = 2;
// New record data.
Record new = 3;
// New record id, only applicable for INSERT type.
optional uint64 new_id = 4;
// Name of the endpoint that this event is from.
string endpoint_name = 5;
}
Expand All @@ -38,16 +44,10 @@ message Operation {
message Record {
// The list of field values.
repeated Value values = 1;
// The record id in cache.
uint64 id = 2;
// Records with same primary key will have increasing version.
uint32 version = 2;
}

// A record with its id in cache.
message RecordWithId {
// The record id.
uint64 id = 1;
// The record data.
Record record = 2;
uint32 version = 3;
}

// Supported data types in Dozer.
Expand Down Expand Up @@ -126,3 +126,14 @@ message Value {
google.protobuf.Value json_value = 14; // JSON type.
};
}
message SchemasResponse {
map<string, Schema> schemas = 1;
map<string, string> errors = 2;
}

message Schema {
// The list of indexes of the keys that are used as the primary index.
repeated int32 primary_index = 1;
// The list of field definitions.
repeated FieldDefinition fields = 2;
}
5 changes: 4 additions & 1 deletion pydozer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pydozer.common_pb2_grpc import CommonGrpcServiceStub
from pydozer.common_pb2 import QueryRequest, OnEventRequest, QueryResponse, CountResponse
from pydozer.health_pb2 import HealthCheckRequest, HealthCheckResponse
from pydozer.types_pb2 import EventFilter

DOZER_API_URL = os.getenv("DOZER_API_URL", "0.0.0.0:50051")

Expand Down Expand Up @@ -109,7 +110,9 @@ def on_event(self, request={}):
Args:
request (OnEventRequest): Optionally accepts a filter
"""
_req = OnEventRequest(endpoint=self.endpoint)
_req = OnEventRequest(endpoints={
self.endpoint: EventFilter()
})
for key, value in request.items():
setattr(_req, key, value)

Expand Down
34 changes: 19 additions & 15 deletions pydozer/common_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions pydozer/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,18 @@ def map_value(col, typ) -> Value:
return Value()


def map_record(schema_name: str, row: list, types, idx) -> Record:
def map_record(schema_name: str, row: list, types, idx) -> IngestRequest:
values = []
assert len(row) == len(types), "Row and types must be the same length"
for i in range(len(row)):
val = map_value(row[i], types[i])
values.append(val)

rec = Record(values=values, version=1)
return IngestRequest(
schema_name=schema_name,
typ=0,
old=None,
new=rec,
new=values,
seq_no=idx
)

Expand Down
2 changes: 1 addition & 1 deletion pydozer/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def ingest_raw(self, request) -> IngestResponse:
schema_name="users",
typ=0,
old=None,
new=Record(values=[Value(int_value=1), Value(string_value="vivek")]),
new=[Value(int_value=1), Value(string_value="vivek")],
seq_no=1
)
ingestor.ingest(user)
Expand Down
28 changes: 14 additions & 14 deletions pydozer/ingest_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading