Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yoj transactions at ydb topics #80

Open
ansirotenko opened this issue Jul 4, 2024 · 6 comments
Open

Yoj transactions at ydb topics #80

ansirotenko opened this issue Jul 4, 2024 · 6 comments

Comments

@ansirotenko
Copy link

Transaction support for ydb partitions is expected to be available in the fall. In our project, we use yoj to work with our tables everywhere. It is required to marry yoj transactions and ydb-java-sdk transactions to be able to write topic messages (also commit offset at read) at the same transaction for the table. Currently it is possible to access current transaction of the txManager, but it is not possible to create ydb-java-sdk transaction from it. 2 main issues are statusFuture and sessionId could not be extracted from the yoj transaction. Also it would be great to have txId generated upfront, not lazily (if this is ok, im not sure).

Overall if yoj transaction would provide method like getYdbTransaction that would return instance of tech.ydb.common.transaction.YdbTransaction, it would be the most preferrable way

@nvamelichev
Copy link
Collaborator

nvamelichev commented Aug 23, 2024

Currently YOJ does not use YdbTransaction internally, at all. YdbTransaction has been introduced quite recently and is an experimental API (subject to sudden change) in the YDB SDK:

public interface Session implements AutoCloseable {
    // <...>

    @ExperimentalApi("New table transaction interfaces are experimental and may change without notice")
    TableTransaction createNewTransaction(TxMode var1);

    @ExperimentalApi("New table transaction interfaces are experimental and may change without notice")
    CompletableFuture<Result<TableTransaction>> beginTransaction(TxMode var1, BeginTxSettings var2);

    // <...>
}

We can try to implement YdbTransaction interface in YdbRepositoryTransaction (e.g. YdbRepositoryTransaction.getYdbTransactionWrapper(), but this comes with limitations:

  • getStatusFuture() will throw UnsupportedOperationException. Also, implementing TableTransaction is out of the question: we don't want to give users abilities to circumvent YOJ in performing queries in YDB transaction
  • getSessionId() semantics if YDB session is not yet acquired or has been invalidated: it's complicated. Some possibilities:
    • return null
    • throw IllegalStateException
    • try to acquire new session, then return its ID
      • But what do you do if the session has been invalidated (typically by an error, like transaction locks invalidated, that has destroyed the session in YDB)? Likely return an error...
  • getId() will return null transaction ID if YDB transaction is not yet established. This seems to be the way it is implemented in YDB SDK, so this should be fine.

@ansirotenko
Copy link
Author

If you implement any solution that would allow to reuse yoj transactions to work with ydb topics, that would be good enough

nvamelichev added a commit that referenced this issue Sep 12, 2024
nvamelichev added a commit that referenced this issue Sep 26, 2024
@nvamelichev
Copy link
Collaborator

nvamelichev commented Sep 26, 2024

@ansirotenko Please try out v2.5.7, it has YdbRepositoryTransaction.toSdkTransaction() which returns a (very limited, very simple) YdbTransaction implementation:
https://github.com/ydb-platform/yoj-project/releases/tag/v2.5.7

If this wrapper is sufficient to reuse YOJ transaction to also work with YDB topics, that's great!
(Otherwise, it will be a large rework of the YOJ core to use YdbTransaction inside instead of earlier YDB SDK methods that take e.g. TxControl.)

@ansirotenko
Copy link
Author

Unfortunately it doesnt work. got following exception.

java.lang.IllegalArgumentException: Transaction is not active. Can only write topic messages in already running transactions from other services.

My code looks like this.

        MyWriter writer = writers.get(partitionId);

        var transaction = (YdbRepositoryTransaction<?>)Tx.Current.get().getRepositoryTransaction();
        var sdkTransaction = transaction.toSdkTransaction();

        byte[] data = SerializationUtils.serialize(message);
        writer.write(data, sdkTransaction).join();
    public CompletableFuture<WriteAck> write(byte[] data, YdbTransaction transaction) {
        Message message = Message.of(data);

        try {
            return asyncWriter.send(message, SendSettings.newBuilder()
                    .setTransaction(transaction)
                    .build())
                .whenComplete((result, ex) -> {
                    if (ex != null) {
                        log.error("Exception on writing to topic: ", ex);
                    } else {
                        switch (result.getState()) {
                            case WRITTEN:
                                WriteAck.Details details = result.getDetails();
                                StringBuilder str = new StringBuilder("Success");
                                if (details != null) {
                                    str.append(", offset: ").append(details.getOffset());
                                }
                                log.debug(str.toString());
                                break;
                            case ALREADY_WRITTEN:
                                log.warn("Topic message has already been written");
                                break;
                            default:
                                break;
                        }
                    }
                });
        } catch (QueueOverflowException e) {
            throw new RuntimeException(e);
        }
    }

@nvamelichev
Copy link
Collaborator

Oh. It seems that the YDB transaction has not been initiated by YOJ, probably because no statements (neither reads nor writes) have been executed. (IllegalStateExcception is thrown if YdbTransaction.isActive() returns false, which it does if YdbTransaction.getId() == null, see https://github.com/ydb-platform/ydb-java-sdk/blob/29ff1aded14d81bc7fc291583b82494e4fb702cc/topic/src/main/java/tech/ydb/topic/settings/ReceiveSettings.java#L70).

YOJ transactions are intentionally lazy (to e.g. execute some data preparation before allocating database resources). Please run any statement in transaction (do anything with a YOJ Table, for example) before invoking writer.write(data, sdkTransaction).join();. This should help. You can check the sdkTransaction.isActive() and sdkTransaction.getId() to see whether a YDB transaction has been allocated by YOJ.

@ansirotenko
Copy link
Author

@nvamelichev I have followed your suggestion. Indeed, If I add any statement before writing to topic in same transaction, as a result no exception will be thrown. But apparently operation hangs forever. Im executing this experiment at unit test.

Unit test code

txManager.tx(
    () -> txManager.tx(() -> {
        db.myTable().save(myItem);
        
        MyWriter writer = writers.get(partitionId);

        var transaction = (YdbRepositoryTransaction<?>)Tx.Current.get().getRepositoryTransaction();
        var sdkTransaction = transaction.toSdkTransaction();

        byte[] data = SerializationUtils.serialize(message);
        writer.write(data, sdkTransaction).join();
    })
);

...

 public CompletableFuture<WriteAck> write(byte[] data, YdbTransaction transaction) {
      Message message = Message.of(data);

      try {
          return asyncWriter.send(message, SendSettings.newBuilder()
                  .setTransaction(transaction)
                  .build())
              .whenComplete((result, ex) -> {
                  if (ex != null) {
                      log.error("Exception on writing to topic: ", ex);
                  } else {
                      switch (result.getState()) {
                          case WRITTEN:
                              WriteAck.Details details = result.getDetails();
                              StringBuilder str = new StringBuilder("Success");
                              if (details != null) {
                                  str.append(", offset: ").append(details.getOffset());
                              }
                              log.debug(str.toString());
                              break;
                          case ALREADY_WRITTEN:
                              log.warn("Topic message has already been written");
                              break;
                          default:
                              break;
                      }
                  }
              });
      } catch (QueueOverflowException e) {
          throw new RuntimeException(e);
      }
  }

ydb docker image is latest. tried to remove and pull image again before experiment. Image describe gives following

        "Id": "sha256:41209b686d7bec78434ccb3dfbb7f1ae949f0fdbdbcf8bd88bda6faca114dbd9",
        "RepoTags": [
            "cr.yandex/yc/yandex-docker-local-ydb:latest"
        ],
        "RepoDigests": [
            "cr.yandex/yc/yandex-docker-local-ydb@sha256:f39237e6bab018c2635af765faa893061fc359892e53f7b51d40ab503b90846f"
        ],

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants