Skip to content

Commit

Permalink
feat: Update to dozer 0.2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Nov 3, 2023
1 parent ebceb91 commit cb528c9
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 76 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ user = IngestRequest(
schema_name="users",
typ=0,
old=None,
new=Record(values=[Value(int_value=1), Value(string_value="vivek")]),
new=[Value(int_value=1), Value(string_value="vivek")],
seq_no=1
)
ingestor.ingest(user)
Expand Down
10 changes: 3 additions & 7 deletions protos/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,8 @@ message CountResponse {

// Request for `OnEvent`.
message OnEventRequest {
// The event type to subscribe to.
dozer.types.EventType type = 1;
// The name of the endpoint to subscribe to.
string endpoint = 2;
// JSON filter string.
optional string filter = 3;
// The endpoints to subscribe to. Key is the endpoint name, value is the filter.
map<string, dozer.types.EventFilter> endpoints = 1;
}

// Request for `getFields`.
Expand All @@ -81,7 +77,7 @@ message QueryResponse {
// The list of field definitions.
repeated dozer.types.FieldDefinition fields = 1;
// The list of record data.
repeated dozer.types.RecordWithId records = 2;
repeated dozer.types.Record records = 2;
}

// Request for `getEndpoints`.
Expand Down
4 changes: 2 additions & 2 deletions protos/ingest.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ message IngestRequest {
// The operation type.
dozer.types.OperationType typ = 2;
// Old record data, only applicable for UPDATE type.
optional dozer.types.Record old = 3;
repeated dozer.types.Value old = 3;
// New record data.
dozer.types.Record new = 4;
repeated dozer.types.Value new = 4;

uint32 seq_no = 5;
}
Expand Down
33 changes: 22 additions & 11 deletions protos/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ enum EventType {
DELETE_ONLY = 3; // Only DELETE events.
}

// Event filter.
message EventFilter {
// The event type to subscribe to.
dozer.types.EventType type = 1;
// JSON filter string.
optional string filter = 3;
}

// The event types.
enum OperationType {
INSERT = 0; // INSERT operation.
Expand All @@ -28,8 +36,6 @@ message Operation {
optional Record old = 2;
// New record data.
Record new = 3;
// New record id, only applicable for INSERT type.
optional uint64 new_id = 4;
// Name of the endpoint that this event is from.
string endpoint_name = 5;
}
Expand All @@ -38,16 +44,10 @@ message Operation {
message Record {
// The list of field values.
repeated Value values = 1;
// The record id in cache.
uint64 id = 2;
// Records with same primary key will have increasing version.
uint32 version = 2;
}

// A record with its id in cache.
message RecordWithId {
// The record id.
uint64 id = 1;
// The record data.
Record record = 2;
uint32 version = 3;
}

// Supported data types in Dozer.
Expand Down Expand Up @@ -126,3 +126,14 @@ message Value {
google.protobuf.Value json_value = 14; // JSON type.
};
}
message SchemasResponse {
map<string, Schema> schemas = 1;
map<string, string> errors = 2;
}

message Schema {
// The list of indexes of the keys that are used as the primary index.
repeated int32 primary_index = 1;
// The list of field definitions.
repeated FieldDefinition fields = 2;
}
5 changes: 4 additions & 1 deletion pydozer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pydozer.common_pb2_grpc import CommonGrpcServiceStub
from pydozer.common_pb2 import QueryRequest, OnEventRequest, QueryResponse, CountResponse
from pydozer.health_pb2 import HealthCheckRequest, HealthCheckResponse
from pydozer.types_pb2 import EventFilter

DOZER_API_URL = os.getenv("DOZER_API_URL", "0.0.0.0:50051")

Expand Down Expand Up @@ -109,7 +110,9 @@ def on_event(self, request={}):
Args:
request (OnEventRequest): Optionally accepts a filter
"""
_req = OnEventRequest(endpoint=self.endpoint)
_req = OnEventRequest(endpoints={
self.endpoint: EventFilter()
})
for key, value in request.items():
setattr(_req, key, value)

Expand Down
34 changes: 19 additions & 15 deletions pydozer/common_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions pydozer/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,18 @@ def map_value(col, typ) -> Value:
return Value()


def map_record(schema_name: str, row: list, types, idx) -> Record:
def map_record(schema_name: str, row: list, types, idx) -> IngestRequest:
values = []
assert len(row) == len(types), "Row and types must be the same length"
for i in range(len(row)):
val = map_value(row[i], types[i])
values.append(val)

rec = Record(values=values, version=1)
return IngestRequest(
schema_name=schema_name,
typ=0,
old=None,
new=rec,
new=values,
seq_no=idx
)

Expand Down
2 changes: 1 addition & 1 deletion pydozer/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def ingest_raw(self, request) -> IngestResponse:
schema_name="users",
typ=0,
old=None,
new=Record(values=[Value(int_value=1), Value(string_value="vivek")]),
new=[Value(int_value=1), Value(string_value="vivek")],
seq_no=1
)
ingestor.ingest(user)
Expand Down
28 changes: 14 additions & 14 deletions pydozer/ingest_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cb528c9

Please sign in to comment.